[jboss-cvs] JBossAS SVN: r86130 - in trunk: testsuite/src/main/org/jboss/test/cluster/defaultcfg/test and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Mar 19 17:37:59 EDT 2009
Author: bstansberry at jboss.com
Date: 2009-03-19 17:37:59 -0400 (Thu, 19 Mar 2009)
New Revision: 86130
Added:
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ClusterLockState.java
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/NonGloballyExclusiveClusterLockSupport.java
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/YieldingGloballyExclusiveClusterLockSupport.java
trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/YieldingGloballyExclusiveClusterLockSupportUnitTestCase.java
Removed:
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/AbstractClusterLockManager.java
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ClusterLockManager.java
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ExclusiveClusterLockManager.java
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LockState.java
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ReadWriteClusterLockManager.java
Modified:
trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LocalLockHandler.java
trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/ReadWriteClusteredLockManagerUnitTestCase.java
trunk/testsuite/src/main/org/jboss/test/cluster/lock/ClusteredLockManagerTestBase.java
Log:
[JBAS-5552] Refactor the cluster-wide locking
Deleted: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/AbstractClusterLockManager.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/AbstractClusterLockManager.java 2009-03-19 20:48:17 UTC (rev 86129)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/AbstractClusterLockManager.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -1,567 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.ha.framework.server.lock;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.jboss.ha.framework.interfaces.ClusterNode;
-import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
-import org.jboss.ha.framework.server.lock.LockState.State;
-import org.jboss.logging.Logger;
-
-/**
- * Base class for cluster-wide lock implementations.
- *
- * @author Brian Stansberry
- */
-public abstract class AbstractClusterLockManager implements HAMembershipListener, ClusterLockManager
-{
- public static final Class<?>[] REMOTE_LOCK_TYPES = new Class[]{Serializable.class, ClusterNode.class, long.class};
- public static final Class<?>[] RELEASE_REMOTE_LOCK_TYPES = new Class[]{Serializable.class, ClusterNode.class};
-
- /**
- * Object the HAPartition can invoke on. This class is static as
- * an aid in unit testing.
- */
- public static class RpcTarget
- {
- private final AbstractClusterLockManager mgr;
-
- private RpcTarget(AbstractClusterLockManager mgr)
- {
- this.mgr = mgr;
- }
-
- public RemoteLockResponse remoteLock(Serializable categoryName, ClusterNode caller, long timeout)
- {
- return mgr.remoteLock(categoryName, caller, timeout);
- }
-
- public void releaseRemoteLock(Serializable categoryName, ClusterNode caller)
- {
- mgr.releaseRemoteLock(categoryName, caller);
- }
- }
-
- protected final Logger log = Logger.getLogger(getClass());
-
- private final ConcurrentMap<Serializable, LockState> lockStates = new ConcurrentHashMap<Serializable, LockState>();
- private final ConcurrentMap<ClusterNode, Set<LockState>> lockStatesByOwner = new ConcurrentHashMap<ClusterNode, Set<LockState>>();
- private ClusterNode me;
- private final String serviceHAName;
- private final HAPartition partition;
- private final LocalLockHandler localHandler;
- private final List<ClusterNode> members = new CopyOnWriteArrayList<ClusterNode>();
-// private final boolean supportLocalOnly;
- private RpcTarget rpcTarget;
-
- public AbstractClusterLockManager(String serviceHAName,
- HAPartition partition,
- LocalLockHandler handler)
- {
- if (serviceHAName == null)
- {
- throw new IllegalArgumentException("serviceHAName is null");
- }
- if (partition == null)
- {
- throw new IllegalArgumentException("partition is null");
- }
- if (handler == null)
- {
- throw new IllegalArgumentException("localHandler is null");
- }
-
- this.partition = partition;
- this.localHandler = handler;
- this.serviceHAName = serviceHAName;
- }
-
- // -------------------------------------------------------------- Properties
-
- public HAPartition getPartition()
- {
- return partition;
- }
-
- public String getServiceHAName()
- {
- return serviceHAName;
- }
-
- public LocalLockHandler getLocalHandler()
- {
- return localHandler;
- }
-
- // ------------------------------------------------------ ClusterLockManager
-
- public boolean lock(Serializable lockId, long timeout)
- {
- if (this.rpcTarget == null)
- {
- throw new IllegalStateException("Must call start() before first call to lock()");
- }
- LockState category = getCategory(lockId, true);
-
- long left = timeout > 0 ? timeout : Long.MAX_VALUE;
- long start = System.currentTimeMillis();
- while (left > 0)
- {
- // Another node we lost to who should take precedence
- // over ourself in competition for the lock
- ClusterNode superiorCompetitor = null;
-
- // Only continue if category is unlocked
- if (category.state.compareAndSet(LockState.State.UNLOCKED, LockState.State.REMOTE_LOCKING))
- {
- // Category state is now REMOTE_LOCKING, so other nodes will fail
- // in attempts to acquire on this node unless the caller is "superior"
-
- boolean success = false;
- try
- {
- // Get the lock on all other nodes in the cluster
-
- @SuppressWarnings("unchecked")
- ArrayList rsps = partition.callMethodOnCluster(getServiceHAName(),
- "remoteLock", new Object[]{lockId, me, new Long(left)},
- REMOTE_LOCK_TYPES, true);
-
- boolean remoteLocked = true;
- if (rsps != null)
- {
- for (Object rsp : rsps)
- {
- if ((rsp instanceof RemoteLockResponse) == false)
- {
- remoteLocked = false;
- }
- else if (((RemoteLockResponse) rsp).flag != RemoteLockResponse.Flag.OK)
- {
- remoteLocked = false;
- if (superiorCompetitor == null)
- {
- superiorCompetitor = getSuperiorCompetitor(((RemoteLockResponse) rsp).holder);
- }
- }
- }
- }
- else if (members.size() ==0 || (members.size() == 1 && members.contains(me)))
- {
- // no peers
- remoteLocked = true;
- }
-
- if (remoteLocked)
- {
- // Bail if someone else locked our node while we were locking
- // others.
- if (category.state.compareAndSet(LockState.State.REMOTE_LOCKING, LockState.State.LOCAL_LOCKING))
- {
- // Now we are in LOCAL_LOCKING phase which will cause
- // us to reject incoming remote requests
-
- // Now see if we can lock our own node
- long localTimeout = left - (System.currentTimeMillis() - start);
- if (getLock(lockId, category, me, localTimeout).flag == RemoteLockResponse.Flag.OK)
- {
- success = true;
- return true;
- }
- // else either 1) there was a race with a remote caller or
- // 2) some other activity locally is preventing our
- // acquisition of the local lock. Either way back off
- // and then retry
- }
-
- // Find out if we couldn't acquire because someone with
- // precedence has the lock
- superiorCompetitor = getSuperiorCompetitor(category.getHolder());
- }
- }
- catch (RuntimeException e)
- {
- throw e;
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- finally
- {
- if (!success)
- {
- cleanup(lockId, category);
- }
- }
- }
-
- // We failed for some reason. Pause to let things clear before trying again.
- // If we've detected we are competing with someone else who is
- // "superior" pause longer to let them proceed first.
- long backoff = computeBackoff(timeout, start, left, superiorCompetitor == null);
- if (backoff > 0)
- {
- try
- {
- Thread.sleep(backoff);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- break;
- }
- }
-
- if (category.state.get() == LockState.State.INVALID)
- {
- // Someone invalidated our category; get a new one
- category = getCategory(lockId, true);
- }
-
- long now = System.currentTimeMillis();
- left -= (now - start);
- start = now;
- }
-
- return false;
- }
-
- public String getPartitionName()
- {
- return partition.getPartitionName();
- }
-
- public ClusterNode getLocalClusterNode()
- {
- return this.me;
- }
-
- public List<ClusterNode> getCurrentView()
- {
- return new ArrayList<ClusterNode>(members);
- }
-
- public void start() throws Exception
- {
- this.me = this.partition.getClusterNode();
- this.localHandler.setLocalNode(this.me);
-
- this.rpcTarget = new RpcTarget(this);
- this.partition.registerRPCHandler(this.serviceHAName, this.rpcTarget);
- this.partition.registerMembershipListener(this);
-
- @SuppressWarnings("unchecked")
- Vector allMembers = this.partition.getCurrentView();
- membershipChanged(new Vector<ClusterNode>(), allMembers, allMembers);
- }
-
- public void stop() throws Exception
- {
- if (this.rpcTarget != null)
- {
- this.partition.unregisterRPCHandler(this.serviceHAName, this.rpcTarget);
- this.rpcTarget = null;
- this.partition.unregisterMembershipListener(this);
- Vector<ClusterNode> dead = new Vector<ClusterNode>(members);
- Vector<ClusterNode> empty = new Vector<ClusterNode>();
- membershipChanged(dead, empty, empty);
- this.me = null;
- }
- }
-
- // ---------------------------------------------------- HAMembershipListener
-
- @SuppressWarnings("unchecked")
- public synchronized void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
- {
- this.members.clear();
- this.members.addAll(allMembers);
-
- Set<ClusterNode> toClean = lockStatesByOwner.keySet();
- toClean.removeAll(this.members);
- for (ClusterNode deadMember : toClean)
- {
- Set<LockState> deadMemberLocks = lockStatesByOwner.remove(deadMember);
- if (deadMemberLocks != null)
- {
- synchronized (deadMemberLocks)
- {
- // We're going to iterate and make a call that removes from set,
- // so iterate over a copy to avoid ConcurrentModificationException
- Set<LockState> copy = new HashSet<LockState>(deadMemberLocks);
- for (LockState lockState : copy)
- {
- releaseRemoteLock(lockState.lockId, (ClusterNode) deadMember);
- }
- }
- }
-
- }
- }
-
- // --------------------------------------------------------------- Protected
-
- protected abstract RemoteLockResponse handleLockSuccess(Serializable categoryName, LockState category, ClusterNode caller);
-
- protected abstract LockState getCategory(Serializable categoryName);
-
- protected abstract RemoteLockResponse yieldLock(Serializable categoryName, LockState category, ClusterNode caller, long timeout);
-
- protected void recordLockHolder(LockState category, ClusterNode caller)
- {
- if (category.holder != null)
- {
- Set<LockState> memberLocks = getLocksHeldByMember(category.holder);
- synchronized (memberLocks)
- {
- memberLocks.remove(category);
- }
- }
-
- if (me.equals(caller) == false)
- {
- Set<LockState> memberLocks = getLocksHeldByMember(caller);
- synchronized (memberLocks)
- {
- memberLocks.add(category);
- }
- }
-
- category.lock(caller);
- }
-
- protected LockState getCategory(Serializable categoryName, boolean create)
- {
- LockState category = lockStates.get(categoryName);
- if (category == null && create)
- {
- category = new LockState(categoryName);
- LockState existing = lockStates.putIfAbsent(categoryName, category);
- if (existing != null)
- {
- category = existing;
- }
- }
- return category;
- }
-
- protected void removeLockState(Serializable id, LockState category)
- {
- lockStates.remove(id, category);
- }
-
- // ----------------------------------------------------------------- Private
-
- /**
- * Called by a remote node via RpcTarget.
- */
- private RemoteLockResponse remoteLock(Serializable categoryName, ClusterNode caller, long timeout)
- {
- RemoteLockResponse response = null;
- LockState category = getCategory(categoryName);
- if (category == null)
- {
- // unknown == OK
- return new RemoteLockResponse(RemoteLockResponse.Flag.OK);
- }
-
- switch (category.state.get())
- {
- case UNLOCKED:
- // Remote callers can race for the local lock
- response = getLock(categoryName, category, caller, timeout);
- break;
- case REMOTE_LOCKING:
- if (me.equals(caller))
- {
- log.warn("Received remoteLock call from self");
- response = new RemoteLockResponse(RemoteLockResponse.Flag.OK);
- }
- else if (getSuperiorCompetitor(caller) == null)
- {
- // I want the lock and I take precedence
- response = new RemoteLockResponse(RemoteLockResponse.Flag.REJECT, me);
- }
- else
- {
- // I want the lock but caller takes precedence; let
- // them acquire local lock and I'll fail
- response = getLock(categoryName, category, caller, timeout);
- }
- break;
- case LOCAL_LOCKING:
- // I've gotten OK responses from everyone and am about
- // to acquire local lock, so reject caller
- response = new RemoteLockResponse(RemoteLockResponse.Flag.REJECT, me);
- break;
- case LOCKED:
- // See if I have the lock and will give it up
- response = yieldLock(categoryName, category, caller, timeout);
- break;
- case INVALID:
- // We've given up the lock since we got the category and the
- // thread that caused that is trying to discard the unneeded
- // category.
- // Call in again and see what our current state is
- response = remoteLock(categoryName, caller, timeout);
- break;
- }
-
- return response;
- }
-
- /**
- * Called by a remote node via RpcTarget.
- */
- private void releaseRemoteLock(Serializable categoryName, ClusterNode caller)
- {
- LockState category = getCategory(categoryName, false);
- if (category != null && category.state.get() == State.LOCKED)
- {
- if (caller.equals(localHandler.getLockHolder(categoryName)))
- {
- // Throw away the category as a cleanup exercise
- category.invalidate();
- localHandler.releaseLock(categoryName, caller);
- Set<LockState> memberLocks = getLocksHeldByMember(caller);
- synchronized (memberLocks)
- {
- memberLocks.remove(category);
- }
- removeLockState(categoryName, category);
- }
- }
-
- }
-
- /** See if <code>caller</code> comes before us in the members list */
- private ClusterNode getSuperiorCompetitor(ClusterNode caller)
- {
- if (caller == null)
- return null;
-
- for (ClusterNode node : members)
- {
- if (me.equals(node))
- {
- break;
- }
- else if (caller.equals(node))
- {
- return caller;
- }
- }
-
- return null;
- }
-
- /**
- * Always call this with a lock on the Category.
- */
- protected RemoteLockResponse getLock(Serializable categoryName, LockState category,
- ClusterNode caller, long timeout)
- {
- RemoteLockResponse response;
- try
- {
- localHandler.tryLock(categoryName, caller, timeout);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- log.error("Caught InterruptedException; Failing request by " + caller + " to lock " + categoryName);
- return new RemoteLockResponse(RemoteLockResponse.Flag.FAIL, localHandler.getLockHolder(categoryName));
- }
- catch (TimeoutException t)
- {
- return new RemoteLockResponse(RemoteLockResponse.Flag.FAIL, t.getOwner());
- }
-
- response = handleLockSuccess(categoryName, category, caller);
- return response;
- }
-
- private Set<LockState> getLocksHeldByMember(ClusterNode member)
- {
- Set<LockState> memberCategories = lockStatesByOwner.get(member);
- if (memberCategories == null)
- {
- memberCategories = new HashSet<LockState>();
- Set<LockState> existing = lockStatesByOwner.putIfAbsent(member, memberCategories);
- if (existing != null)
- {
- memberCategories = existing;
- }
- }
- return memberCategories;
- }
-
- /** Back out of a failed attempt by the local node to lock */
- private void cleanup(Serializable categoryName, LockState category)
- {
- try
- {
- partition.callMethodOnCluster(getServiceHAName(), "releaseRemoteLock", new Object[]{categoryName, me}, RELEASE_REMOTE_LOCK_TYPES, true);
- }
- catch (RuntimeException e)
- {
- throw e;
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed releasing remote lock", e);
- }
- finally
- {
- if (category.state.compareAndSet(LockState.State.REMOTE_LOCKING, LockState.State.UNLOCKED) == false)
- {
- category.state.compareAndSet(LockState.State.LOCAL_LOCKING, LockState.State.UNLOCKED);
- }
- }
- }
-
- private static long computeBackoff(long initialTimeout, long start, long left, boolean superiorCompetitor)
- {
- long remain = left - (System.currentTimeMillis() - start);
- // Don't spam the cluster
- if (remain < Math.min(initialTimeout / 5, 15))
- {
- return remain;
- }
-
- long max = superiorCompetitor ? 100 : 250;
- long min = remain / 3;
- return Math.min(max, min);
- }
-}
Copied: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java (from rev 84208, trunk/cluster/src/main/org/jboss/ha/framework/server/lock/AbstractClusterLockManager.java)
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java (rev 0)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -0,0 +1,571 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.framework.server.lock;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.HAPartition;
+import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
+import org.jboss.ha.framework.server.lock.ClusterLockState.State;
+import org.jboss.logging.Logger;
+
+/**
+ * Base class for cluster-wide lock implementations.
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision:$
+ */
+public abstract class AbstractClusterLockSupport implements HAMembershipListener
+{
+ public static final Class<?>[] REMOTE_LOCK_TYPES = new Class[]{Serializable.class, ClusterNode.class, long.class};
+ public static final Class<?>[] RELEASE_REMOTE_LOCK_TYPES = new Class[]{Serializable.class, ClusterNode.class};
+
+ /**
+ * Object the HAPartition can invoke on. This class is static as
+ * an aid in unit testing.
+ */
+ public static class RpcTarget
+ {
+ private final AbstractClusterLockSupport mgr;
+
+ private RpcTarget(AbstractClusterLockSupport mgr)
+ {
+ this.mgr = mgr;
+ }
+
+ public RemoteLockResponse remoteLock(Serializable categoryName, ClusterNode caller, long timeout)
+ {
+ return mgr.remoteLock(categoryName, caller, timeout);
+ }
+
+ public void releaseRemoteLock(Serializable categoryName, ClusterNode caller)
+ {
+ mgr.releaseRemoteLock(categoryName, caller);
+ }
+ }
+
+ protected final Logger log = Logger.getLogger(getClass());
+
+ private final ConcurrentMap<Serializable, ClusterLockState> lockStates = new ConcurrentHashMap<Serializable, ClusterLockState>();
+ private final ConcurrentMap<ClusterNode, Set<ClusterLockState>> lockStatesByOwner = new ConcurrentHashMap<ClusterNode, Set<ClusterLockState>>();
+ private ClusterNode me;
+ private final String serviceHAName;
+ private final HAPartition partition;
+ private final LocalLockHandler localHandler;
+ private final List<ClusterNode> members = new CopyOnWriteArrayList<ClusterNode>();
+// private final boolean supportLocalOnly;
+ private RpcTarget rpcTarget;
+
+ public AbstractClusterLockSupport(String serviceHAName,
+ HAPartition partition,
+ LocalLockHandler handler)
+ {
+ if (serviceHAName == null)
+ {
+ throw new IllegalArgumentException("serviceHAName is null");
+ }
+ if (partition == null)
+ {
+ throw new IllegalArgumentException("partition is null");
+ }
+ if (handler == null)
+ {
+ throw new IllegalArgumentException("localHandler is null");
+ }
+
+ this.partition = partition;
+ this.localHandler = handler;
+ this.serviceHAName = serviceHAName;
+ }
+
+ // -------------------------------------------------------------- Properties
+
+ public HAPartition getPartition()
+ {
+ return partition;
+ }
+
+ public String getServiceHAName()
+ {
+ return serviceHAName;
+ }
+
+ public LocalLockHandler getLocalHandler()
+ {
+ return localHandler;
+ }
+
+ // ------------------------------------------------------ ClusterLockManager
+
+ public boolean lock(Serializable lockId, long timeout)
+ {
+ if (this.rpcTarget == null)
+ {
+ throw new IllegalStateException("Must call start() before first call to lock()");
+ }
+ ClusterLockState category = getClusterLockState(lockId, true);
+
+ long left = timeout > 0 ? timeout : Long.MAX_VALUE;
+ long start = System.currentTimeMillis();
+ while (left > 0)
+ {
+ // Another node we lost to who should take precedence
+ // over ourself in competition for the lock
+ ClusterNode superiorCompetitor = null;
+
+ // Only continue if category is unlocked
+ if (category.state.compareAndSet(ClusterLockState.State.UNLOCKED, ClusterLockState.State.REMOTE_LOCKING))
+ {
+ // Category state is now REMOTE_LOCKING, so other nodes will fail
+ // in attempts to acquire on this node unless the caller is "superior"
+
+ boolean success = false;
+ try
+ {
+ // Get the lock on all other nodes in the cluster
+
+ @SuppressWarnings("unchecked")
+ ArrayList rsps = partition.callMethodOnCluster(getServiceHAName(),
+ "remoteLock", new Object[]{lockId, me, new Long(left)},
+ REMOTE_LOCK_TYPES, true);
+
+ boolean remoteLocked = true;
+ if (rsps != null)
+ {
+ for (Object rsp : rsps)
+ {
+ if ((rsp instanceof RemoteLockResponse) == false)
+ {
+ remoteLocked = false;
+ }
+ else if (((RemoteLockResponse) rsp).flag != RemoteLockResponse.Flag.OK)
+ {
+ remoteLocked = false;
+ if (superiorCompetitor == null)
+ {
+ superiorCompetitor = getSuperiorCompetitor(((RemoteLockResponse) rsp).holder);
+ }
+ }
+ }
+ }
+ else if (members.size() ==0 || (members.size() == 1 && members.contains(me)))
+ {
+ // no peers
+ remoteLocked = true;
+ }
+
+ if (remoteLocked)
+ {
+ // Bail if someone else locked our node while we were locking
+ // others.
+ if (category.state.compareAndSet(ClusterLockState.State.REMOTE_LOCKING, ClusterLockState.State.LOCAL_LOCKING))
+ {
+ // Now we are in LOCAL_LOCKING phase which will cause
+ // us to reject incoming remote requests
+
+ // Now see if we can lock our own node
+ long localTimeout = left - (System.currentTimeMillis() - start);
+ if (getLock(lockId, category, me, localTimeout).flag == RemoteLockResponse.Flag.OK)
+ {
+ success = true;
+ return true;
+ }
+ // else either 1) there was a race with a remote caller or
+ // 2) some other activity locally is preventing our
+ // acquisition of the local lock. Either way back off
+ // and then retry
+ }
+
+ // Find out if we couldn't acquire because someone with
+ // precedence has the lock
+ superiorCompetitor = getSuperiorCompetitor(category.getHolder());
+ }
+ }
+ catch (RuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ if (!success)
+ {
+ cleanup(lockId, category);
+ }
+ }
+ }
+
+ // We failed for some reason. Pause to let things clear before trying again.
+ // If we've detected we are competing with someone else who is
+ // "superior" pause longer to let them proceed first.
+ long backoff = computeBackoff(timeout, start, left, superiorCompetitor == null);
+ if (backoff > 0)
+ {
+ try
+ {
+ Thread.sleep(backoff);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ if (category.state.get() == ClusterLockState.State.INVALID)
+ {
+ // Someone invalidated our category; get a new one
+ category = getClusterLockState(lockId, true);
+ }
+
+ long now = System.currentTimeMillis();
+ left -= (now - start);
+ start = now;
+ }
+
+ return false;
+ }
+
+ public abstract void unlock(Serializable lockId);
+
+ public String getPartitionName()
+ {
+ return partition.getPartitionName();
+ }
+
+ public ClusterNode getLocalClusterNode()
+ {
+ return this.me;
+ }
+
+ public List<ClusterNode> getCurrentView()
+ {
+ return new ArrayList<ClusterNode>(members);
+ }
+
+ public void start() throws Exception
+ {
+ this.me = this.partition.getClusterNode();
+ this.localHandler.setLocalNode(this.me);
+
+ this.rpcTarget = new RpcTarget(this);
+ this.partition.registerRPCHandler(this.serviceHAName, this.rpcTarget);
+ this.partition.registerMembershipListener(this);
+
+ @SuppressWarnings("unchecked")
+ Vector allMembers = this.partition.getCurrentView();
+ membershipChanged(new Vector<ClusterNode>(), allMembers, allMembers);
+ }
+
+ public void stop() throws Exception
+ {
+ if (this.rpcTarget != null)
+ {
+ this.partition.unregisterRPCHandler(this.serviceHAName, this.rpcTarget);
+ this.rpcTarget = null;
+ this.partition.unregisterMembershipListener(this);
+ Vector<ClusterNode> dead = new Vector<ClusterNode>(members);
+ Vector<ClusterNode> empty = new Vector<ClusterNode>();
+ membershipChanged(dead, empty, empty);
+ this.me = null;
+ }
+ }
+
+ // ---------------------------------------------------- HAMembershipListener
+
+ @SuppressWarnings("unchecked")
+ public synchronized void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
+ {
+ this.members.clear();
+ this.members.addAll(allMembers);
+
+ Set<ClusterNode> toClean = lockStatesByOwner.keySet();
+ toClean.removeAll(this.members);
+ for (ClusterNode deadMember : toClean)
+ {
+ Set<ClusterLockState> deadMemberLocks = lockStatesByOwner.remove(deadMember);
+ if (deadMemberLocks != null)
+ {
+ synchronized (deadMemberLocks)
+ {
+ // We're going to iterate and make a call that removes from set,
+ // so iterate over a copy to avoid ConcurrentModificationException
+ Set<ClusterLockState> copy = new HashSet<ClusterLockState>(deadMemberLocks);
+ for (ClusterLockState lockState : copy)
+ {
+ releaseRemoteLock(lockState.lockId, (ClusterNode) deadMember);
+ }
+ }
+ }
+
+ }
+ }
+
+ // --------------------------------------------------------------- Protected
+
+ protected abstract RemoteLockResponse handleLockSuccess(ClusterLockState lockState, ClusterNode caller);
+
+ protected abstract ClusterLockState getClusterLockState(Serializable categoryName);
+
+ protected abstract RemoteLockResponse yieldLock(ClusterLockState lockState, ClusterNode caller, long timeout);
+
+ protected void recordLockHolder(ClusterLockState category, ClusterNode caller)
+ {
+ if (category.holder != null)
+ {
+ Set<ClusterLockState> memberLocks = getLocksHeldByMember(category.holder);
+ synchronized (memberLocks)
+ {
+ memberLocks.remove(category);
+ }
+ }
+
+ if (me.equals(caller) == false)
+ {
+ Set<ClusterLockState> memberLocks = getLocksHeldByMember(caller);
+ synchronized (memberLocks)
+ {
+ memberLocks.add(category);
+ }
+ }
+
+ category.lock(caller);
+ }
+
+ protected ClusterLockState getClusterLockState(Serializable categoryName, boolean create)
+ {
+ ClusterLockState category = lockStates.get(categoryName);
+ if (category == null && create)
+ {
+ category = new ClusterLockState(categoryName);
+ ClusterLockState existing = lockStates.putIfAbsent(categoryName, category);
+ if (existing != null)
+ {
+ category = existing;
+ }
+ }
+ return category;
+ }
+
+ protected void removeLockState(ClusterLockState lockState)
+ {
+ lockStates.remove(lockState.lockId, lockState);
+ }
+
+ // ----------------------------------------------------------------- Private
+
+ /**
+ * Called by a remote node via RpcTarget.
+ */
+ private RemoteLockResponse remoteLock(Serializable categoryName, ClusterNode caller, long timeout)
+ {
+ RemoteLockResponse response = null;
+ ClusterLockState category = getClusterLockState(categoryName);
+ if (category == null)
+ {
+ // unknown == OK
+ return new RemoteLockResponse(RemoteLockResponse.Flag.OK);
+ }
+
+ switch (category.state.get())
+ {
+ case UNLOCKED:
+ // Remote callers can race for the local lock
+ response = getLock(categoryName, category, caller, timeout);
+ break;
+ case REMOTE_LOCKING:
+ if (me.equals(caller))
+ {
+ log.warn("Received remoteLock call from self");
+ response = new RemoteLockResponse(RemoteLockResponse.Flag.OK);
+ }
+ else if (getSuperiorCompetitor(caller) == null)
+ {
+ // I want the lock and I take precedence
+ response = new RemoteLockResponse(RemoteLockResponse.Flag.REJECT, me);
+ }
+ else
+ {
+ // I want the lock but caller takes precedence; let
+ // them acquire local lock and I'll fail
+ response = getLock(categoryName, category, caller, timeout);
+ }
+ break;
+ case LOCAL_LOCKING:
+ // I've gotten OK responses from everyone and am about
+ // to acquire local lock, so reject caller
+ response = new RemoteLockResponse(RemoteLockResponse.Flag.REJECT, me);
+ break;
+ case LOCKED:
+ // See if I have the lock and will give it up
+ response = yieldLock(category, caller, timeout);
+ break;
+ case INVALID:
+ // We've given up the lock since we got the category and the
+ // thread that caused that is trying to discard the unneeded
+ // category.
+ // Call in again and see what our current state is
+ response = remoteLock(categoryName, caller, timeout);
+ break;
+ }
+
+ return response;
+ }
+
+ /**
+ * Called by a remote node via RpcTarget.
+ */
+ private void releaseRemoteLock(Serializable categoryName, ClusterNode caller)
+ {
+ ClusterLockState category = getClusterLockState(categoryName, false);
+ if (category != null && category.state.get() == State.LOCKED)
+ {
+ if (caller.equals(localHandler.getLockHolder(categoryName)))
+ {
+ // Throw away the category as a cleanup exercise
+ category.invalidate();
+ localHandler.unlockFromCluster(categoryName, caller);
+ Set<ClusterLockState> memberLocks = getLocksHeldByMember(caller);
+ synchronized (memberLocks)
+ {
+ memberLocks.remove(category);
+ }
+ removeLockState(category);
+ }
+ }
+
+ }
+
+ /** See if <code>caller</code> comes before us in the members list */
+ private ClusterNode getSuperiorCompetitor(ClusterNode caller)
+ {
+ if (caller == null)
+ return null;
+
+ for (ClusterNode node : members)
+ {
+ if (me.equals(node))
+ {
+ break;
+ }
+ else if (caller.equals(node))
+ {
+ return caller;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Always call this with a lock on the Category.
+ */
+ protected RemoteLockResponse getLock(Serializable categoryName, ClusterLockState category,
+ ClusterNode caller, long timeout)
+ {
+ RemoteLockResponse response;
+ try
+ {
+ localHandler.lockFromCluster(categoryName, caller, timeout);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ log.error("Caught InterruptedException; Failing request by " + caller + " to lock " + categoryName);
+ return new RemoteLockResponse(RemoteLockResponse.Flag.FAIL, localHandler.getLockHolder(categoryName));
+ }
+ catch (TimeoutException t)
+ {
+ return new RemoteLockResponse(RemoteLockResponse.Flag.FAIL, t.getOwner());
+ }
+
+ response = handleLockSuccess(category, caller);
+ return response;
+ }
+
+ private Set<ClusterLockState> getLocksHeldByMember(ClusterNode member)
+ {
+ Set<ClusterLockState> memberCategories = lockStatesByOwner.get(member);
+ if (memberCategories == null)
+ {
+ memberCategories = new HashSet<ClusterLockState>();
+ Set<ClusterLockState> existing = lockStatesByOwner.putIfAbsent(member, memberCategories);
+ if (existing != null)
+ {
+ memberCategories = existing;
+ }
+ }
+ return memberCategories;
+ }
+
+ /** Back out of a failed attempt by the local node to lock */
+ private void cleanup(Serializable categoryName, ClusterLockState category)
+ {
+ try
+ {
+ partition.callMethodOnCluster(getServiceHAName(), "releaseRemoteLock", new Object[]{categoryName, me}, RELEASE_REMOTE_LOCK_TYPES, true);
+ }
+ catch (RuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed releasing remote lock", e);
+ }
+ finally
+ {
+ if (category.state.compareAndSet(ClusterLockState.State.REMOTE_LOCKING, ClusterLockState.State.UNLOCKED) == false)
+ {
+ category.state.compareAndSet(ClusterLockState.State.LOCAL_LOCKING, ClusterLockState.State.UNLOCKED);
+ }
+ }
+ }
+
+ private static long computeBackoff(long initialTimeout, long start, long left, boolean superiorCompetitor)
+ {
+ long remain = left - (System.currentTimeMillis() - start);
+ // Don't spam the cluster
+ if (remain < Math.min(initialTimeout / 5, 15))
+ {
+ return remain;
+ }
+
+ long max = superiorCompetitor ? 100 : 250;
+ long min = remain / 3;
+ return Math.min(max, min);
+ }
+}
Property changes on: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
___________________________________________________________________
Name: svn:keywords
+
Name: svn:mergeinfo
+
Deleted: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ClusterLockManager.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ClusterLockManager.java 2009-03-19 20:48:17 UTC (rev 86129)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ClusterLockManager.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -1,75 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.ha.framework.server.lock;
-
-import java.io.Serializable;
-
-/**
- *
- * @author Brian Stansberry
- *
- */
-public interface ClusterLockManager
-{
- /**
- * Acquire across the cluster the lock for the given category within the
- * specified number of milliseconds
- *
- * @param lockId the id of the lock
- * @param timeout max number of milliseconds to wait to acquire the lock
- *
- * @return <code>true</code> if the lock was acquired, <code>false</code>
- * otherwise
- */
- boolean lock(Serializable lockId, long timeout);
-
- /**
- * Release across the cluster the lock for the given category.
- *
- * @param lockId the id of the lock
- */
- void unlock(Serializable lockId);
-
- /**
- * Gets the name of the cluster partition.
- *
- * @return the partition name. Will not return <code>null</code>.
- */
- String getPartitionName();
-
- /**
- * Bring this object into a state where it is ready to handle
- * normal requests.
- *
- * @throws Exception
- */
- void start() throws Exception;
-
- /**
- * Perform any cleanup work. After this method is invoked normal lock/unlock
- * requests should not be accepted.
- *
- * @throws Exception
- */
- void stop() throws Exception;
-
-}
\ No newline at end of file
Copied: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ClusterLockState.java (from rev 84208, trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LockState.java)
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ClusterLockState.java (rev 0)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ClusterLockState.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.framework.server.lock;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.jboss.ha.framework.interfaces.ClusterNode;
+
+/**
+ *
+ * @author Brian Stansberry
+ *
+ * @version $Revision:$
+ */
+public class ClusterLockState
+{
+ /** Lock status of a category */
+ public enum State
+ {
+ /** No one has the lock and local node is not attempting to acquire */
+ UNLOCKED,
+ /** Local node is attempting to acquire lock across cluster */
+ REMOTE_LOCKING,
+ /** Local node has lock across cluster, now attempting locally */
+ LOCAL_LOCKING,
+ /** The lock is held locally */
+ LOCKED,
+ /**
+ * This object has been removed from the categories map and
+ * should be discarded
+ */
+ INVALID
+ }
+ final Serializable lockId;
+ final AtomicReference<ClusterLockState.State> state = new AtomicReference<ClusterLockState.State>(State.UNLOCKED);
+ ClusterNode holder;
+
+ ClusterLockState(Serializable lockId)
+ {
+ if (lockId == null)
+ {
+ throw new IllegalArgumentException("lockId is null");
+ }
+ this.lockId = lockId;
+ }
+
+ public synchronized ClusterNode getHolder()
+ {
+ return holder;
+ }
+
+ public synchronized void invalidate()
+ {
+ this.state.set(State.INVALID);
+ this.holder = null;
+ }
+
+ public synchronized void lock(ClusterNode holder)
+ {
+ this.state.set(State.LOCKED);
+ this.holder = holder;
+ }
+
+ public synchronized void release()
+ {
+ if (this.state.compareAndSet(State.LOCKED, State.UNLOCKED))
+ {
+ this.holder = null;
+ }
+ }
+}
\ No newline at end of file
Property changes on: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ClusterLockState.java
___________________________________________________________________
Name: svn:keywords
+
Name: svn:mergeinfo
+
Deleted: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ExclusiveClusterLockManager.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ExclusiveClusterLockManager.java 2009-03-19 20:48:17 UTC (rev 86129)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ExclusiveClusterLockManager.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -1,105 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.ha.framework.server.lock;
-
-import java.io.Serializable;
-
-import org.jboss.ha.framework.interfaces.ClusterNode;
-import org.jboss.ha.framework.interfaces.HAPartition;
-
-/**
- * TODO consider dividing this into two classes, one for the "supportsLocalOnly"
- * usage and one for other usage.
- *
- * @author Brian Stansberry
- */
-public class ExclusiveClusterLockManager extends AbstractClusterLockManager
-{
- public ExclusiveClusterLockManager(String serviceHAName,
- HAPartition partition,
- LocalLockHandler handler)
- {
- super(serviceHAName, partition, handler);
- }
-
- // ------------------------------------------------------ ClusterLockManager
-
- public void unlock(Serializable lockId)
- {
- ClusterNode myself = getLocalClusterNode();
- if (myself == null)
- {
- throw new IllegalStateException("Must call start() before first call to unlock()");
- }
-
- LockState category = getCategory(lockId, false);
-
- if (category != null && myself.equals(category.getHolder()))
- {
- category.invalidate();
- getLocalHandler().releaseLock(lockId, myself);
- removeLockState(lockId, category);
- }
- }
-
- // --------------------------------------------------------------- Protected
-
- @Override
- protected LockState getCategory(Serializable categoryName)
- {
- return getCategory(categoryName, false);
- }
-
- @Override
- protected RemoteLockResponse yieldLock(Serializable categoryName, LockState category, ClusterNode caller, long timeout)
- {
- if (getLocalClusterNode().equals(category.getHolder()))
- {
- return getLock(categoryName, category, caller, timeout);
- }
- else
- {
- return new RemoteLockResponse(RemoteLockResponse.Flag.REJECT, category.getHolder());
- }
- }
-
- @Override
- protected RemoteLockResponse handleLockSuccess(Serializable categoryName, LockState category, ClusterNode caller)
- {
- if (getLocalClusterNode().equals(caller))
- {
- recordLockHolder(category, caller);
- }
- else
- {
- // Caller succeeded, but since this node doesn't hold the
- // lock we don't want to hold the category in our map any longer
- category.invalidate();
- removeLockState(categoryName, category);
- }
- return new RemoteLockResponse(RemoteLockResponse.Flag.OK);
- }
-
-
- // ----------------------------------------------------------------- Private
-}
Added: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java (rev 0)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -0,0 +1,221 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.framework.server.lock;
+
+import java.io.Serializable;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.HAPartition;
+
+/**
+ * @author Brian Stansberry
+ *
+ * @version $Revision:$
+ */
+public class LocalAndClusterLockManager
+{
+
+ private class LocalLock
+ {
+ private volatile ClusterNode holder;
+ private final AtomicBoolean locked = new AtomicBoolean(false);
+ private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
+
+
+ private void lock(ClusterNode caller, long timeout) throws TimeoutException
+ {
+ long deadline = System.currentTimeMillis() + timeout;
+ boolean wasInterrupted = false;
+ Thread current = Thread.currentThread();
+ waiters.add(current);
+
+ // Block while not first in queue or cannot acquire lock
+ while (waiters.peek() != current ||
+ !locked.compareAndSet(false, true))
+ {
+ LockSupport.parkUntil(deadline);
+ if (Thread.interrupted()) // ignore interrupts while waiting
+ wasInterrupted = true;
+ if (System.currentTimeMillis() >= deadline)
+ break;
+ }
+
+ try
+ {
+ if (locked.get())
+ {
+ holder = caller;
+ }
+ else
+ {
+ throw new TimeoutException(this.holder);
+ }
+ }
+ finally
+ {
+ waiters.remove();
+ if (wasInterrupted) // reassert interrupt status on exit
+ current.interrupt();
+ }
+ }
+
+ private void unlock(ClusterNode caller)
+ {
+ if (caller.equals(holder))
+ {
+ locked.set(false);
+ LockSupport.unpark(waiters.peek());
+ }
+ }
+
+
+ }
+
+ /** Handles callbacks from the cluster lock support object */
+ private class ClusterHandler implements LocalLockHandler
+ {
+ // ----------------------------------------------------- LocalLockHandler
+
+ public ClusterNode getLocalNode(ClusterNode localNode)
+ {
+ return LocalAndClusterLockManager.this.localNode;
+ }
+
+ public void setLocalNode(ClusterNode localNode)
+ {
+ LocalAndClusterLockManager.this.localNode = localNode;
+ }
+
+ public void lockFromCluster(Serializable lockName, ClusterNode caller, long timeout) throws TimeoutException,
+ InterruptedException
+ {
+ LocalAndClusterLockManager.this.doLock(lockName, caller, timeout);
+ }
+
+ public ClusterNode getLockHolder(Serializable lockName)
+ {
+ LocalLock lock = LocalAndClusterLockManager.this.getLocalLock(lockName, false);
+ return lock == null ? null : lock.holder;
+ }
+
+ public void unlockFromCluster(Serializable lockName, ClusterNode caller)
+ {
+ LocalAndClusterLockManager.this.doUnlock(lockName, caller);
+ }
+
+ }
+
+ private ClusterNode localNode;
+ private ConcurrentMap<Serializable, LocalLock> localLocks = new ConcurrentHashMap<Serializable, LocalLock>();
+ private final NonGloballyExclusiveClusterLockSupport clusterSupport;
+
+ public LocalAndClusterLockManager(String serviceHAName, HAPartition partition)
+ {
+ ClusterHandler handler = new ClusterHandler();
+ clusterSupport = new NonGloballyExclusiveClusterLockSupport(serviceHAName, partition, handler);
+ }
+
+ // ----------------------------------------------------------------- Public
+
+ public void lockLocally(Serializable lockName, long timeout)
+ throws TimeoutException, InterruptedException
+ {
+ if (this.localNode == null)
+ {
+ throw new IllegalStateException("Null localNode");
+ }
+
+ doLock(lockName, this.localNode, timeout);
+ }
+
+ public void unlockLocally(Serializable lockName)
+ {
+ if (this.localNode == null)
+ {
+ throw new IllegalStateException("Null localNode");
+ }
+
+ doUnlock(lockName, this.localNode);
+ }
+
+ public void lockGlobally(Serializable lockName, long timeout)
+ throws TimeoutException, InterruptedException
+ {
+ this.clusterSupport.lock(lockName, timeout);
+ }
+
+ public void unlockGlobally(Serializable lockName)
+ {
+ this.clusterSupport.unlock(lockName);
+ }
+
+ public void start() throws Exception
+ {
+ this.clusterSupport.start();
+ }
+
+ public void stop() throws Exception
+ {
+ this.clusterSupport.stop();
+ }
+
+ // ----------------------------------------------------------------- Private
+
+ private LocalLock getLocalLock(Serializable categoryName, boolean create)
+ {
+ LocalLock category = localLocks.get(categoryName);
+ if (category == null && create)
+ {
+ category = new LocalLock();
+ LocalLock existing = localLocks.putIfAbsent(categoryName, category);
+ if (existing != null)
+ {
+ category = existing;
+ }
+ }
+ return category;
+ }
+
+ private void doLock(Serializable lockName, ClusterNode caller, long timeout) throws TimeoutException,
+ InterruptedException
+ {
+ LocalLock lock = getLocalLock(lockName, true);
+ lock.lock(caller, timeout);
+ }
+
+ private void doUnlock(Serializable lockName, ClusterNode caller)
+ {
+ LocalLock lock = getLocalLock(lockName, false);
+ if (lock != null)
+ {
+ lock.unlock(caller);
+ }
+ }
+
+}
Property changes on: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java
___________________________________________________________________
Name: svn:keywords
+
Modified: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LocalLockHandler.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LocalLockHandler.java 2009-03-19 20:48:17 UTC (rev 86129)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LocalLockHandler.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -27,8 +27,11 @@
import org.jboss.ha.framework.interfaces.ClusterNode;
/**
+ * Provides local lock coordination for an {@link AbstractClusterLockSupport}.
+ *
* @author Brian Stansberry
- *
+ *
+ * @version $Revision$
*/
public interface LocalLockHandler
{
@@ -51,28 +54,20 @@
* the lock acquisition should be considered a failure.
* A value less than one means wait as long as necessary.
*
- * @return a flag indicating whether this LocalLockHandler is internally
- * tracking the fact that <code>caller</code> holds the lock.
- * If <code>false</code> is returned, the calling code can assume
- * that the next call to this method with the same <code>categoryName</code>
- * but <strong>any</strong> <code>caller</code> value would succeed.
- *
* @throws TimeoutException if the lock could not be acquired within the
* specified timeout
* @throws InterruptedException
*/
- boolean tryLock(Serializable lockName, ClusterNode caller, long timeout)
+ void lockFromCluster(Serializable lockName, ClusterNode caller, long timeout)
throws TimeoutException, InterruptedException;
/**
* Release the lock.
*
- * @param lockName
- * @param caller
- * @return <code>true</code> if the caller held the lock, <code>false</code>
- * otherwise.
+ * @param lockName the name of the lock.
+ * @param caller the node making the request
*/
- boolean releaseLock(Serializable lockName, ClusterNode caller);
+ void unlockFromCluster(Serializable lockName, ClusterNode caller);
ClusterNode getLocalNode(ClusterNode localNode);
Deleted: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LockState.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LockState.java 2009-03-19 20:48:17 UTC (rev 86129)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/LockState.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -1,86 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.ha.framework.server.lock;
-
-import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.jboss.ha.framework.interfaces.ClusterNode;
-
-public class LockState
-{
- /** Lock status of a category */
- public enum State
- {
- /** No one has the lock and local node is not attempting to acquire */
- UNLOCKED,
- /** Local node is attempting to acquire lock across cluster */
- REMOTE_LOCKING,
- /** Local node has lock across cluster, now attempting locally */
- LOCAL_LOCKING,
- /** The lock is held locally */
- LOCKED,
- /**
- * This object has been removed from the categories map and
- * should be discarded
- */
- INVALID
- }
- final Serializable lockId;
- final AtomicReference<LockState.State> state = new AtomicReference<LockState.State>(State.UNLOCKED);
- ClusterNode holder;
-
- LockState(Serializable lockId)
- {
- if (lockId == null)
- {
- throw new IllegalArgumentException("lockId is null");
- }
- this.lockId = lockId;
- }
-
- public synchronized ClusterNode getHolder()
- {
- return holder;
- }
-
- public synchronized void invalidate()
- {
- this.state.set(State.INVALID);
- this.holder = null;
- }
-
- public synchronized void lock(ClusterNode holder)
- {
- this.state.set(State.LOCKED);
- this.holder = holder;
- }
-
- public synchronized void release()
- {
- if (this.state.compareAndSet(State.LOCKED, State.UNLOCKED))
- {
- this.holder = null;
- }
- }
-}
\ No newline at end of file
Copied: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/NonGloballyExclusiveClusterLockSupport.java (from rev 84208, trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ReadWriteClusterLockManager.java)
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/NonGloballyExclusiveClusterLockSupport.java (rev 0)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/NonGloballyExclusiveClusterLockSupport.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.framework.server.lock;
+
+import java.io.Serializable;
+
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.HAPartition;
+
+/**
+ * Support class for cluster locking scenarios where threads can hold a
+ * local lock on a category but not a cluster-wide lock. Multiple nodes can
+ * simultaneously hold a local lock on a category, but none can hold a local
+ * lock on a category if the cluster-wide lock is held. Cluster-wide lock cannot
+ * be acquired while any node holds a local lock.
+ * <p>
+ * <strong>NOTE:</strong> This class does not support "upgrades", i.e. scenarios
+ * where a thread acquires the local lock and then while holding the local
+ * lock attempts to acquire the cluster-wide lock.
+ * </p>
+ *
+ * @author Brian Stansberry
+ */
+public class NonGloballyExclusiveClusterLockSupport extends AbstractClusterLockSupport
+{
+
+ // ------------------------------------------------------------- Constructor
+
+ public NonGloballyExclusiveClusterLockSupport(String serviceHAName,
+ HAPartition partition,
+ LocalLockHandler handler)
+ {
+ super(serviceHAName, partition, handler);
+ }
+
+ // ------------------------------------------------------------------ Public
+
+ public void unlock(Serializable lockId)
+ {
+ ClusterNode myself = getLocalClusterNode();
+ if (myself == null)
+ {
+ throw new IllegalStateException("Must call start() before first call to unlock()");
+ }
+
+ ClusterLockState category = getClusterLockState(lockId, false);
+
+ if (category != null && myself.equals(category.getHolder()))
+ {
+ getLocalHandler().unlockFromCluster(lockId, myself);
+ category.release();
+
+ try
+ {
+ getPartition().callMethodOnCluster(getServiceHAName(), "releaseRemoteLock",
+ new Object[]{lockId, myself},
+ RELEASE_REMOTE_LOCK_TYPES, true);
+ }
+ catch (RuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed releasing remote lock", e);
+ }
+ }
+ }
+
+ // --------------------------------------------------------------- Protected
+
+ @Override
+ protected ClusterLockState getClusterLockState(Serializable categoryName)
+ {
+ return getClusterLockState(categoryName, true);
+ }
+
+ @Override
+ protected RemoteLockResponse yieldLock(ClusterLockState lockState, ClusterNode caller, long timeout)
+ {
+ return new RemoteLockResponse(RemoteLockResponse.Flag.REJECT, lockState.getHolder());
+ }
+
+ @Override
+ protected RemoteLockResponse handleLockSuccess(ClusterLockState lockState, ClusterNode caller)
+ {
+ recordLockHolder(lockState, caller);
+ return new RemoteLockResponse(RemoteLockResponse.Flag.OK);
+ }
+
+ // ----------------------------------------------------------------- Private
+}
Property changes on: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/NonGloballyExclusiveClusterLockSupport.java
___________________________________________________________________
Name: svn:keywords
+
Name: svn:mergeinfo
+
Deleted: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ReadWriteClusterLockManager.java
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ReadWriteClusterLockManager.java 2009-03-19 20:48:17 UTC (rev 86129)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ReadWriteClusterLockManager.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -1,103 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.ha.framework.server.lock;
-
-import java.io.Serializable;
-
-import org.jboss.ha.framework.interfaces.ClusterNode;
-import org.jboss.ha.framework.interfaces.HAPartition;
-
-/**
- *
- *
- * @author Brian Stansberry
- */
-public class ReadWriteClusterLockManager extends AbstractClusterLockManager
-{
-
- public ReadWriteClusterLockManager(String serviceHAName,
- HAPartition partition,
- LocalLockHandler handler)
- {
- super(serviceHAName, partition, handler);
- }
-
- // ------------------------------------------------------ ClusterLockManager
-
- public void unlock(Serializable lockId)
- {
- ClusterNode myself = getLocalClusterNode();
- if (myself == null)
- {
- throw new IllegalStateException("Must call start() before first call to unlock()");
- }
-
- LockState category = getCategory(lockId, false);
-
- if (category != null && myself.equals(category.getHolder()))
- {
- getLocalHandler().releaseLock(lockId, myself);
- category.release();
-
- try
- {
- getPartition().callMethodOnCluster(getServiceHAName(), "releaseRemoteLock",
- new Object[]{lockId, myself},
- RELEASE_REMOTE_LOCK_TYPES, true);
- }
- catch (RuntimeException e)
- {
- throw e;
- }
- catch (Exception e)
- {
- throw new RuntimeException("Failed releasing remote lock", e);
- }
- }
- }
-
- // ------------------------------------------------------------------ Public
-
- // --------------------------------------------------------------- Protected
-
- @Override
- protected LockState getCategory(Serializable categoryName)
- {
- return getCategory(categoryName, true);
- }
-
- @Override
- protected RemoteLockResponse yieldLock(Serializable categoryName, LockState category, ClusterNode caller, long timeout)
- {
- return new RemoteLockResponse(RemoteLockResponse.Flag.REJECT, category.getHolder());
- }
-
- @Override
- protected RemoteLockResponse handleLockSuccess(Serializable categoryName, LockState category, ClusterNode caller)
- {
- recordLockHolder(category, caller);
- return new RemoteLockResponse(RemoteLockResponse.Flag.OK);
- }
-
- // ----------------------------------------------------------------- Private
-}
Copied: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/YieldingGloballyExclusiveClusterLockSupport.java (from rev 84208, trunk/cluster/src/main/org/jboss/ha/framework/server/lock/ExclusiveClusterLockManager.java)
===================================================================
--- trunk/cluster/src/main/org/jboss/ha/framework/server/lock/YieldingGloballyExclusiveClusterLockSupport.java (rev 0)
+++ trunk/cluster/src/main/org/jboss/ha/framework/server/lock/YieldingGloballyExclusiveClusterLockSupport.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.framework.server.lock;
+
+import java.io.Serializable;
+
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.HAPartition;
+
+/**
+ * Support class for cluster locking scenarios where threads cannot acquire
+ * a local lock unless the node owns a cluster-wide lock, but where the node
+ * owning the cluster-wide lock will yield it to another node if no thread
+ * has a local lock. Use case for this is scenarios like session management
+ * where a node needs to acquire ownership of a cluster-wide lock for a session,
+ * but then once acquired wishes to handle multiple calls for the session without
+ * make cluster-wide locking calls. The node handling the session would acquire
+ * the cluster-wide lock if it doesn't have it and thereafter would only use
+ * local locks; if another node received a request for the session it would
+ * request the lock and the first node would release it if no local locks are
+ * held.
+ *
+ * @author Brian Stansberry
+ */
+public class YieldingGloballyExclusiveClusterLockSupport extends AbstractClusterLockSupport
+{
+ public YieldingGloballyExclusiveClusterLockSupport(String serviceHAName,
+ HAPartition partition,
+ LocalLockHandler handler)
+ {
+ super(serviceHAName, partition, handler);
+ }
+
+ // ------------------------------------------------------ ClusterLockManager
+
+ public void unlock(Serializable lockId)
+ {
+ ClusterNode myself = getLocalClusterNode();
+ if (myself == null)
+ {
+ throw new IllegalStateException("Must call start() before first call to unlock()");
+ }
+
+ ClusterLockState category = getClusterLockState(lockId, false);
+
+ if (category != null && myself.equals(category.getHolder()))
+ {
+ category.invalidate();
+ getLocalHandler().unlockFromCluster(lockId, myself);
+ removeLockState(category);
+ }
+ }
+
+ // --------------------------------------------------------------- Protected
+
+ @Override
+ protected ClusterLockState getClusterLockState(Serializable categoryName)
+ {
+ return getClusterLockState(categoryName, false);
+ }
+
+ @Override
+ protected RemoteLockResponse yieldLock(ClusterLockState lockState, ClusterNode caller, long timeout)
+ {
+ if (getLocalClusterNode().equals(lockState.getHolder()))
+ {
+ return getLock(lockState.lockId, lockState, caller, timeout);
+ }
+ else
+ {
+ return new RemoteLockResponse(RemoteLockResponse.Flag.REJECT, lockState.getHolder());
+ }
+ }
+
+ @Override
+ protected RemoteLockResponse handleLockSuccess(ClusterLockState lockState, ClusterNode caller)
+ {
+ if (getLocalClusterNode().equals(caller))
+ {
+ recordLockHolder(lockState, caller);
+ }
+ else
+ {
+ // Caller succeeded, but since this node doesn't hold the
+ // lock we don't want to hold the category in our map any longer
+ lockState.invalidate();
+ removeLockState(lockState);
+ }
+ return new RemoteLockResponse(RemoteLockResponse.Flag.OK);
+ }
+
+
+ // ----------------------------------------------------------------- Private
+}
Property changes on: trunk/cluster/src/main/org/jboss/ha/framework/server/lock/YieldingGloballyExclusiveClusterLockSupport.java
___________________________________________________________________
Name: svn:keywords
+
Name: svn:mergeinfo
+
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/ReadWriteClusteredLockManagerUnitTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/ReadWriteClusteredLockManagerUnitTestCase.java 2009-03-19 20:48:17 UTC (rev 86129)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/ReadWriteClusteredLockManagerUnitTestCase.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -26,6 +26,7 @@
import static org.easymock.EasyMock.aryEq;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.makeThreadSafe;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.resetToNice;
@@ -40,12 +41,12 @@
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.HAPartition;
-import org.jboss.ha.framework.server.lock.AbstractClusterLockManager;
+import org.jboss.ha.framework.server.lock.AbstractClusterLockSupport;
import org.jboss.ha.framework.server.lock.LocalLockHandler;
-import org.jboss.ha.framework.server.lock.ReadWriteClusterLockManager;
+import org.jboss.ha.framework.server.lock.NonGloballyExclusiveClusterLockSupport;
import org.jboss.ha.framework.server.lock.RemoteLockResponse;
import org.jboss.ha.framework.server.lock.TimeoutException;
-import org.jboss.ha.framework.server.lock.AbstractClusterLockManager.RpcTarget;
+import org.jboss.ha.framework.server.lock.AbstractClusterLockSupport.RpcTarget;
import org.jboss.test.cluster.lock.ClusteredLockManagerTestBase;
/**
@@ -54,7 +55,7 @@
* @author Brian Stansberry
*
*/
-public class ReadWriteClusteredLockManagerUnitTestCase extends ClusteredLockManagerTestBase<ReadWriteClusterLockManager>
+public class ReadWriteClusteredLockManagerUnitTestCase extends ClusteredLockManagerTestBase<NonGloballyExclusiveClusterLockSupport>
{
/**
* Create a new ClusteredLockManagerImplUnitTestCase.
@@ -67,16 +68,16 @@
}
@Override
- protected ReadWriteClusterLockManager createClusteredLockManager(String serviceHAName,
+ protected NonGloballyExclusiveClusterLockSupport createClusteredLockManager(String serviceHAName,
HAPartition partition, LocalLockHandler handler)
{
- return new ReadWriteClusterLockManager(serviceHAName, partition, handler);
+ return new NonGloballyExclusiveClusterLockSupport(serviceHAName, partition, handler);
}
public void testBasicRemoteLock() throws Exception
{
- TesteeSet<ReadWriteClusterLockManager> testeeSet = getTesteeSet(node1, 1, 2);
- ReadWriteClusterLockManager testee = testeeSet.impl;
+ TesteeSet<NonGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, 1, 2);
+ NonGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
LocalLockHandler handler = testee.getLocalHandler();
RpcTarget target = testeeSet.target;
@@ -84,7 +85,7 @@
assertFalse(node1.equals(caller));
resetToStrict(handler);
- expect(handler.tryLock("test", caller, 1000)).andReturn(true);
+ handler.lockFromCluster("test", caller, 1000);
replay(handler);
RemoteLockResponse rsp = target.remoteLock("test", caller, 1000);
@@ -109,8 +110,8 @@
public void testContestedRemoteLock() throws Exception
{
- TesteeSet<ReadWriteClusterLockManager> testeeSet = getTesteeSet(node1, 1, 3);
- ReadWriteClusterLockManager testee = testeeSet.impl;
+ TesteeSet<NonGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, 1, 3);
+ NonGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
LocalLockHandler handler = testee.getLocalHandler();
RpcTarget target = testeeSet.target;
@@ -121,7 +122,7 @@
assertFalse(node1.equals(caller2));
resetToStrict(handler);
- expect(handler.tryLock("test", caller1, 1000)).andReturn(true);
+ handler.lockFromCluster("test", caller1, 1000);
replay(handler);
RemoteLockResponse rsp = target.remoteLock("test", caller1, 1000);
@@ -146,8 +147,8 @@
public void testConcurrentRemoteLock() throws Exception
{
- TesteeSet<ReadWriteClusterLockManager> testeeSet = getTesteeSet(node1, 1, 3);
- ReadWriteClusterLockManager testee = testeeSet.impl;
+ TesteeSet<NonGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, 1, 3);
+ NonGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
LocalLockHandler handler = testee.getLocalHandler();
final RpcTarget target = testeeSet.target;
@@ -165,8 +166,10 @@
CountDownLatch answerDoneLatch = new CountDownLatch(1);
BlockingAnswer<Boolean> caller1Answer = new BlockingAnswer<Boolean>(Boolean.TRUE, answerStartLatch, null, answerDoneLatch);
BlockingAnswer<Boolean> caller2Answer = new BlockingAnswer<Boolean>(new TimeoutException(caller1), answerDoneLatch, 0, null, null);
- expect(handler.tryLock("test", caller1, 1000)).andAnswer(caller1Answer);
- expect(handler.tryLock("test", caller2, 1000)).andAnswer(caller2Answer);
+ handler.lockFromCluster("test", caller1, 1000);
+ expectLastCall().andAnswer(caller1Answer);
+ handler.lockFromCluster("test", caller2, 1000);
+ expectLastCall().andAnswer(caller2Answer);
replay(handler);
CountDownLatch startLatch1 = new CountDownLatch(1);
@@ -221,8 +224,8 @@
public void testRemoteLockFailsAgainstLocalLock() throws Exception
{
- TesteeSet<ReadWriteClusterLockManager> testeeSet = getTesteeSet(node1, 1, 2);
- ReadWriteClusterLockManager testee = testeeSet.impl;
+ TesteeSet<NonGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, 1, 2);
+ NonGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
LocalLockHandler handler = testee.getLocalHandler();
RpcTarget target = testeeSet.target;
@@ -231,7 +234,8 @@
resetToStrict(handler);
// We throw TimeoutException to indicate "node1" holds the lock
- expect(handler.tryLock("test", caller1, 1000)).andThrow(new TimeoutException(node1));
+ handler.lockFromCluster("test", caller1, 1000);
+ expectLastCall().andThrow(new TimeoutException(node1));
replay(handler);
RemoteLockResponse rsp = target.remoteLock("test", caller1, 1000);
@@ -245,7 +249,7 @@
resetToStrict(handler);
// We return normally to indicate success
- expect(handler.tryLock("test", caller1, 1000)).andReturn(true);
+ handler.lockFromCluster("test", caller1, 1000);
replay(handler);
rsp = target.remoteLock("test", caller1, 1000);
@@ -269,8 +273,8 @@
private void basicClusterLockFailsAgainstLocalLockTest(int viewSize) throws Exception
{
int viewPos = viewSize == 1 ? 0 : 1;
- TesteeSet<ReadWriteClusterLockManager> testeeSet = getTesteeSet(node1, viewPos, viewSize);
- ReadWriteClusterLockManager testee = testeeSet.impl;
+ TesteeSet<NonGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, viewPos, viewSize);
+ NonGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
HAPartition partition = testee.getPartition();
LocalLockHandler handler = testee.getLocalHandler();
@@ -286,16 +290,17 @@
expect(partition.callMethodOnCluster(eq("test"),
eq("remoteLock"),
eqLockParams(node1, 2000000),
- aryEq(AbstractClusterLockManager.REMOTE_LOCK_TYPES),
+ aryEq(AbstractClusterLockSupport.REMOTE_LOCK_TYPES),
eq(true))).andReturn(rspList).atLeastOnce();
- expect(handler.tryLock(eq("test"), eq(node1), anyLong())).andThrow(new TimeoutException(node1)).atLeastOnce();
+ handler.lockFromCluster(eq("test"), eq(node1), anyLong());
+ expectLastCall().andThrow(new TimeoutException(node1)).atLeastOnce();
expect(partition.callMethodOnCluster(eq("test"),
eq("releaseRemoteLock"),
aryEq(new Object[]{"test", node1}),
- aryEq(AbstractClusterLockManager.RELEASE_REMOTE_LOCK_TYPES),
+ aryEq(AbstractClusterLockSupport.RELEASE_REMOTE_LOCK_TYPES),
eq(true))).andReturn(new ArrayList<Object>()).atLeastOnce();
replay(partition);
replay(handler);
@@ -314,8 +319,8 @@
*/
public void testDeadMemberCleanupAllowsRemoteLock() throws Exception
{
- TesteeSet<ReadWriteClusterLockManager> testeeSet = getTesteeSet(node1, 1, 3);
- ReadWriteClusterLockManager testee = testeeSet.impl;
+ TesteeSet<NonGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, 1, 3);
+ NonGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
LocalLockHandler handler = testee.getLocalHandler();
RpcTarget target = testeeSet.target;
@@ -327,7 +332,7 @@
assertFalse(node1.equals(caller2));
resetToStrict(handler);
- expect(handler.tryLock("test", caller1, 1000)).andReturn(true);
+ handler.lockFromCluster("test", caller1, 1000);
replay(handler);
RemoteLockResponse rsp = target.remoteLock("test", caller1, 1000);
@@ -346,7 +351,7 @@
resetToStrict(handler);
expect(handler.getLockHolder("test")).andReturn(caller1);
- expect(handler.releaseLock("test", caller1)).andReturn(true);
+ handler.unlockFromCluster("test", caller1);
replay(handler);
testee.membershipChanged(dead, new Vector<ClusterNode>(), all);
@@ -355,7 +360,7 @@
// A call from a different caller should work
resetToStrict(handler);
- expect(handler.tryLock("test", caller2, 1000)).andReturn(true);
+ handler.lockFromCluster("test", caller2, 1000);
replay(handler);
rsp = target.remoteLock("test", caller2, 1000);
@@ -373,8 +378,8 @@
*/
public void testSpuriousLockReleaseIgnored2() throws Exception
{
- TesteeSet<ReadWriteClusterLockManager> testeeSet = getTesteeSet(node1, 1, 3);
- ReadWriteClusterLockManager testee = testeeSet.impl;
+ TesteeSet<NonGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, 1, 3);
+ NonGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
HAPartition partition = testee.getPartition();
LocalLockHandler handler = testee.getLocalHandler();
RpcTarget target = testeeSet.target;
@@ -385,7 +390,7 @@
resetToStrict(partition);
resetToStrict(handler);
- expect(handler.tryLock(eq("test"), eq(caller1), anyLong())).andReturn(true);
+ handler.lockFromCluster(eq("test"), eq(caller1), anyLong());
expect(handler.getLockHolder("test")).andReturn(caller1);
Copied: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/YieldingGloballyExclusiveClusterLockSupportUnitTestCase.java (from rev 84208, trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/ExclusiveClusteredLockManagerUnitTestCase.java)
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/YieldingGloballyExclusiveClusterLockSupportUnitTestCase.java (rev 0)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/YieldingGloballyExclusiveClusterLockSupportUnitTestCase.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -0,0 +1,190 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.cluster.defaultcfg.test;
+
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.resetToStrict;
+import static org.easymock.EasyMock.verify;
+
+import java.util.List;
+import java.util.Vector;
+
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.HAPartition;
+import org.jboss.ha.framework.server.lock.YieldingGloballyExclusiveClusterLockSupport;
+import org.jboss.ha.framework.server.lock.LocalLockHandler;
+import org.jboss.ha.framework.server.lock.RemoteLockResponse;
+import org.jboss.ha.framework.server.lock.AbstractClusterLockSupport.RpcTarget;
+import org.jboss.test.cluster.lock.ClusteredLockManagerTestBase;
+
+/**
+ * Unit test of ExclusiveClusterLockManager
+ *
+ * @author Brian Stansberry
+ *
+ */
+public class YieldingGloballyExclusiveClusterLockSupportUnitTestCase extends ClusteredLockManagerTestBase<YieldingGloballyExclusiveClusterLockSupport>
+{
+ /**
+ * Create a new ClusteredLockManagerImplUnitTestCase.
+ *
+ * @param name
+ */
+ public YieldingGloballyExclusiveClusterLockSupportUnitTestCase(String name)
+ {
+ super(name);
+ }
+
+ @Override
+ protected YieldingGloballyExclusiveClusterLockSupport createClusteredLockManager(String serviceHAName,
+ HAPartition partition, LocalLockHandler handler)
+ {
+ return new YieldingGloballyExclusiveClusterLockSupport(serviceHAName, partition, handler);
+ }
+
+ public void testBasicRemoteLock() throws Exception
+ {
+ TesteeSet<YieldingGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, 1, 2);
+ YieldingGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
+ LocalLockHandler handler = testee.getLocalHandler();
+ RpcTarget target = testeeSet.target;
+
+ ClusterNode caller = testee.getCurrentView().get(0);
+ assertFalse(node1.equals(caller));
+
+ resetToStrict(handler);
+ replay(handler);
+
+ RemoteLockResponse rsp = target.remoteLock("test", caller, 1000);
+
+ assertEquals(RemoteLockResponse.Flag.OK, rsp.flag);
+ assertNull(rsp.holder);
+
+ verify(handler);
+
+ // Do it again; should still work
+ resetToStrict(handler);
+ replay(handler);
+
+ rsp = target.remoteLock("test", caller, 1000);
+
+ assertEquals(RemoteLockResponse.Flag.OK, rsp.flag);
+ assertNull(rsp.holder);
+
+ verify(handler);
+ }
+
+ public void testContestedRemoteLock() throws Exception
+ {
+ TesteeSet<YieldingGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, 1, 3);
+ YieldingGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
+ LocalLockHandler handler = testee.getLocalHandler();
+ RpcTarget target = testeeSet.target;
+
+ ClusterNode caller1 = testee.getCurrentView().get(0);
+ assertFalse(node1.equals(caller1));
+
+ ClusterNode caller2 = testee.getCurrentView().get(2);
+ assertFalse(node1.equals(caller2));
+
+ resetToStrict(handler);
+ replay(handler);
+
+ RemoteLockResponse rsp = target.remoteLock("test", caller1, 1000);
+
+ assertEquals(RemoteLockResponse.Flag.OK, rsp.flag);
+ assertNull(rsp.holder);
+
+ verify(handler);
+
+ // A call from a different caller should still work as
+ // w/ supportLockOnly==false we only reject if WE hold the lock
+ resetToStrict(handler);
+ replay(handler);
+
+ rsp = target.remoteLock("test", caller2, 1000);
+
+ assertEquals(RemoteLockResponse.Flag.OK, rsp.flag);
+ assertNull(rsp.holder);
+
+ verify(handler);
+
+ }
+
+ /**
+ * Test that if a member holds a lock but is then removed from the
+ * view, another remote member can obtain the lock.
+ *
+ * @throws Exception
+ */
+ public void testDeadMemberCleanupAllowsRemoteLock() throws Exception
+ {
+ TesteeSet<YieldingGloballyExclusiveClusterLockSupport> testeeSet = getTesteeSet(node1, 1, 3);
+ YieldingGloballyExclusiveClusterLockSupport testee = testeeSet.impl;
+ LocalLockHandler handler = testee.getLocalHandler();
+ RpcTarget target = testeeSet.target;
+
+ List<ClusterNode> members = testee.getCurrentView();
+ ClusterNode caller1 = members.get(0);
+ assertFalse(node1.equals(caller1));
+
+ ClusterNode caller2 = members.get(2);
+ assertFalse(node1.equals(caller2));
+
+ resetToStrict(handler);
+ replay(handler);
+
+ RemoteLockResponse rsp = target.remoteLock("test", caller1, 1000);
+
+ assertEquals(RemoteLockResponse.Flag.OK, rsp.flag);
+ assertNull(rsp.holder);
+
+ verify(handler);
+
+ // Change the view
+ Vector<ClusterNode> dead = new Vector<ClusterNode>();
+ dead.add(caller1);
+
+ Vector<ClusterNode> all = new Vector<ClusterNode>(members);
+ all.remove(caller1);
+
+ resetToStrict(handler);
+ replay(handler);
+
+ testee.membershipChanged(dead, new Vector<ClusterNode>(), all);
+
+ verify(handler);
+
+ // A call from a different caller should work
+ resetToStrict(handler);
+ replay(handler);
+
+ rsp = target.remoteLock("test", caller2, 1000);
+
+ assertEquals(RemoteLockResponse.Flag.OK, rsp.flag);
+ assertNull(rsp.holder);
+
+ verify(handler);
+ }
+
+}
Property changes on: trunk/testsuite/src/main/org/jboss/test/cluster/defaultcfg/test/YieldingGloballyExclusiveClusterLockSupportUnitTestCase.java
___________________________________________________________________
Name: svn:keywords
+
Name: svn:mergeinfo
+
Modified: trunk/testsuite/src/main/org/jboss/test/cluster/lock/ClusteredLockManagerTestBase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/cluster/lock/ClusteredLockManagerTestBase.java 2009-03-19 20:48:17 UTC (rev 86129)
+++ trunk/testsuite/src/main/org/jboss/test/cluster/lock/ClusteredLockManagerTestBase.java 2009-03-19 21:37:59 UTC (rev 86130)
@@ -7,6 +7,7 @@
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.isA;
import static org.easymock.EasyMock.makeThreadSafe;
import static org.easymock.EasyMock.replay;
@@ -32,14 +33,14 @@
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.server.ClusterNodeImpl;
-import org.jboss.ha.framework.server.lock.AbstractClusterLockManager;
-import org.jboss.ha.framework.server.lock.ExclusiveClusterLockManager;
+import org.jboss.ha.framework.server.lock.AbstractClusterLockSupport;
+import org.jboss.ha.framework.server.lock.YieldingGloballyExclusiveClusterLockSupport;
import org.jboss.ha.framework.server.lock.LocalLockHandler;
import org.jboss.ha.framework.server.lock.RemoteLockResponse;
-import org.jboss.ha.framework.server.lock.AbstractClusterLockManager.RpcTarget;
+import org.jboss.ha.framework.server.lock.AbstractClusterLockSupport.RpcTarget;
import org.jgroups.stack.IpAddress;
-public abstract class ClusteredLockManagerTestBase<T extends AbstractClusterLockManager> extends TestCase
+public abstract class ClusteredLockManagerTestBase<T extends AbstractClusterLockSupport> extends TestCase
{
protected ClusterNode node1;
@@ -122,21 +123,21 @@
try
{
- new ExclusiveClusterLockManager(null, haPartition, handler);
+ new YieldingGloballyExclusiveClusterLockSupport(null, haPartition, handler);
fail("Null serviceHAName should prevent construction");
}
catch (IllegalArgumentException good) {}
try
{
- new ExclusiveClusterLockManager("test", null, handler);
+ new YieldingGloballyExclusiveClusterLockSupport("test", null, handler);
fail("Null HAPartition should prevent construction");
}
catch (IllegalArgumentException good) {}
try
{
- new ExclusiveClusterLockManager("test", haPartition, null);
+ new YieldingGloballyExclusiveClusterLockSupport("test", haPartition, null);
fail("Null LocalLockHandler should prevent construction");
}
catch (IllegalArgumentException good) {}
@@ -147,7 +148,7 @@
replay(haPartition);
replay(handler);
- ExclusiveClusterLockManager testee = new ExclusiveClusterLockManager("test", haPartition, handler);
+ YieldingGloballyExclusiveClusterLockSupport testee = new YieldingGloballyExclusiveClusterLockSupport("test", haPartition, handler);
assertEquals("test", testee.getServiceHAName());
assertEquals("TestPartition", testee.getPartitionName());
@@ -163,7 +164,7 @@
replay(haPartition);
replay(handler);
- ExclusiveClusterLockManager testee = new ExclusiveClusterLockManager("test", haPartition, handler);
+ YieldingGloballyExclusiveClusterLockSupport testee = new YieldingGloballyExclusiveClusterLockSupport("test", haPartition, handler);
try
{
@@ -183,7 +184,7 @@
assertEquals("Current view is empty when unstarted", 0, testee.getCurrentView().size());
- haPartition.registerRPCHandler(eq("test"), isA(ExclusiveClusterLockManager.RpcTarget.class));
+ haPartition.registerRPCHandler(eq("test"), isA(RpcTarget.class));
haPartition.registerMembershipListener(testee);
Vector<ClusterNode> view = new Vector<ClusterNode>();
view.add(node1);
@@ -291,7 +292,7 @@
{
int viewPos = viewSize == 1 ? 0 : 1;
TesteeSet<T> testeeSet = getTesteeSet(node1, viewPos, viewSize);
- AbstractClusterLockManager testee = testeeSet.impl;
+ AbstractClusterLockSupport testee = testeeSet.impl;
HAPartition partition = testee.getPartition();
LocalLockHandler handler = testee.getLocalHandler();
@@ -307,10 +308,10 @@
expect(partition.callMethodOnCluster(eq("test"),
eq("remoteLock"),
eqLockParams(node1, 200000),
- aryEq(ExclusiveClusterLockManager.REMOTE_LOCK_TYPES),
+ aryEq(YieldingGloballyExclusiveClusterLockSupport.REMOTE_LOCK_TYPES),
eq(true))).andReturn(rspList);
- expect(handler.tryLock(eq("test"), eq(node1), anyLong())).andReturn(true);
+ handler.lockFromCluster(eq("test"), eq(node1), anyLong());
replay(partition);
replay(handler);
@@ -325,7 +326,7 @@
public void testRemoteRejectionFromSuperiorCaller() throws Exception
{
TesteeSet<T> testeeSet = getTesteeSet(node1, 1, 3);
- AbstractClusterLockManager testee = testeeSet.impl;
+ AbstractClusterLockSupport testee = testeeSet.impl;
HAPartition partition = testee.getPartition();
LocalLockHandler handler = testee.getLocalHandler();
@@ -342,7 +343,7 @@
expect(partition.callMethodOnCluster(eq("test"),
eq("remoteLock"),
eqLockParams(node1, 200000),
- aryEq(ExclusiveClusterLockManager.REMOTE_LOCK_TYPES),
+ aryEq(YieldingGloballyExclusiveClusterLockSupport.REMOTE_LOCK_TYPES),
eq(true))).andReturn(rspList).atLeastOnce();
replay(partition);
@@ -358,7 +359,7 @@
public void testRemoteRejectionFromInferiorCaller() throws Exception
{
TesteeSet<T> testeeSet = getTesteeSet(node1, 1, 3);
- AbstractClusterLockManager testee = testeeSet.impl;
+ AbstractClusterLockSupport testee = testeeSet.impl;
HAPartition partition = testee.getPartition();
LocalLockHandler handler = testee.getLocalHandler();
@@ -375,14 +376,14 @@
expect(partition.callMethodOnCluster(eq("test"),
eq("remoteLock"),
eqLockParams(node1, 200000),
- aryEq(ExclusiveClusterLockManager.REMOTE_LOCK_TYPES),
+ aryEq(YieldingGloballyExclusiveClusterLockSupport.REMOTE_LOCK_TYPES),
eq(true))).andReturn(rspList);
expect(partition.callMethodOnCluster(eq("test"),
eq("releaseRemoteLock"),
aryEq(new Object[]{"test", node1}),
- aryEq(AbstractClusterLockManager.RELEASE_REMOTE_LOCK_TYPES),
+ aryEq(AbstractClusterLockSupport.RELEASE_REMOTE_LOCK_TYPES),
eq(true))).andReturn(rspList);
rspList = new ArrayList<RemoteLockResponse>();
@@ -392,10 +393,11 @@
expect(partition.callMethodOnCluster(eq("test"),
eq("remoteLock"),
eqLockParams(node1, 200000),
- aryEq(ExclusiveClusterLockManager.REMOTE_LOCK_TYPES),
+ aryEq(YieldingGloballyExclusiveClusterLockSupport.REMOTE_LOCK_TYPES),
eq(true))).andReturn(rspList);
- expect(handler.tryLock(eq("test"), eq(node1), anyLong())).andReturn(true).atLeastOnce();
+ handler.lockFromCluster(eq("test"), eq(node1), anyLong());
+ expectLastCall().atLeastOnce();
replay(partition);
replay(handler);
@@ -430,7 +432,7 @@
expect(partition.callMethodOnCluster(eq("test"),
eq("remoteLock"),
eqLockParams(node1, 200000),
- aryEq(ExclusiveClusterLockManager.REMOTE_LOCK_TYPES),
+ aryEq(YieldingGloballyExclusiveClusterLockSupport.REMOTE_LOCK_TYPES),
eq(true))).andReturn(rspList);
// When caller 1 invokes, block before giving response
@@ -438,7 +440,8 @@
CountDownLatch answerStartLatch = new CountDownLatch(1);
CountDownLatch answerDoneLatch = new CountDownLatch(1);
BlockingAnswer<Boolean> caller1Answer = new BlockingAnswer<Boolean>(Boolean.TRUE, answerAwaitLatch, answerStartLatch, answerDoneLatch);
- expect(handler.tryLock(eq("test"), eq(node1), anyLong())).andAnswer(caller1Answer);
+ handler.lockFromCluster(eq("test"), eq(node1), anyLong());
+ expectLastCall().andAnswer(caller1Answer);
replay(partition);
replay(handler);
@@ -507,15 +510,15 @@
expect(partition.callMethodOnCluster(eq("test"),
eq("remoteLock"),
eqLockParams(node1, 200000),
- aryEq(ExclusiveClusterLockManager.REMOTE_LOCK_TYPES),
+ aryEq(YieldingGloballyExclusiveClusterLockSupport.REMOTE_LOCK_TYPES),
eq(true))).andAnswer(caller1Answer).atLeastOnce();
- expect(handler.tryLock(eq("test"), eq(superiorCaller), anyLong())).andReturn(true);
+ handler.lockFromCluster(eq("test"), eq(superiorCaller), anyLong());
expect(partition.callMethodOnCluster(eq("test"),
eq("releaseRemoteLock"),
aryEq(new Object[]{"test", node1}),
- aryEq(AbstractClusterLockManager.RELEASE_REMOTE_LOCK_TYPES),
+ aryEq(AbstractClusterLockSupport.RELEASE_REMOTE_LOCK_TYPES),
eq(true))).andReturn(new ArrayList<Object>()).atLeastOnce();
replay(partition);
@@ -585,13 +588,13 @@
expect(partition.callMethodOnCluster(eq("test"),
eq("remoteLock"),
eqLockParams(node1, 200000),
- aryEq(ExclusiveClusterLockManager.REMOTE_LOCK_TYPES),
+ aryEq(YieldingGloballyExclusiveClusterLockSupport.REMOTE_LOCK_TYPES),
eq(true))).andAnswer(caller1Answer);
expect(partition.callMethodOnCluster(eq("test"),
eq("releaseRemoteLock"),
aryEq(new Object[]{"test", node1}),
- aryEq(AbstractClusterLockManager.RELEASE_REMOTE_LOCK_TYPES),
+ aryEq(AbstractClusterLockSupport.RELEASE_REMOTE_LOCK_TYPES),
eq(true))).andReturn(new ArrayList<Object>());
rspList = new ArrayList<RemoteLockResponse>();
@@ -601,10 +604,10 @@
expect(partition.callMethodOnCluster(eq("test"),
eq("remoteLock"),
eqLockParams(node1, 200000),
- aryEq(ExclusiveClusterLockManager.REMOTE_LOCK_TYPES),
+ aryEq(YieldingGloballyExclusiveClusterLockSupport.REMOTE_LOCK_TYPES),
eq(true))).andReturn(rspList);
- expect(handler.tryLock(eq("test"), eq(node1), anyLong())).andReturn(true);
+ handler.lockFromCluster(eq("test"), eq(node1), anyLong());
replay(partition);
replay(handler);
@@ -653,7 +656,7 @@
public void testSpuriousRemoteLockReleaseIgnored() throws Exception
{
TesteeSet<T> testeeSet = getTesteeSet(node1, 1, 2);
- AbstractClusterLockManager testee = testeeSet.impl;
+ AbstractClusterLockSupport testee = testeeSet.impl;
HAPartition partition = testee.getPartition();
LocalLockHandler handler = testee.getLocalHandler();
@@ -668,10 +671,10 @@
expect(partition.callMethodOnCluster(eq("test"),
eq("remoteLock"),
eqLockParams(node1, 200000),
- aryEq(ExclusiveClusterLockManager.REMOTE_LOCK_TYPES),
+ aryEq(YieldingGloballyExclusiveClusterLockSupport.REMOTE_LOCK_TYPES),
eq(true))).andReturn(rspList);
- expect(handler.tryLock(eq("test"), eq(node1), anyLong())).andReturn(true);
+ handler.lockFromCluster(eq("test"), eq(node1), anyLong());
expect(handler.getLockHolder("test")).andReturn(node1);
@@ -692,8 +695,8 @@
expect(haPartition.getClusterNode()).andReturn(node);
expect(haPartition.getPartitionName()).andReturn("TestPartition");
- Capture<ExclusiveClusterLockManager.RpcTarget> c = new Capture<ExclusiveClusterLockManager.RpcTarget>();
- haPartition.registerRPCHandler(eq("test"), and(isA(ExclusiveClusterLockManager.RpcTarget.class), capture(c)));
+ Capture<RpcTarget> c = new Capture<RpcTarget>();
+ haPartition.registerRPCHandler(eq("test"), and(isA(RpcTarget.class), capture(c)));
Vector<ClusterNode> view = getView(node, viewPos, viewSize);
expect(haPartition.getCurrentView()).andReturn(view);
@@ -740,7 +743,7 @@
}
}
- protected class TesteeSet<C extends AbstractClusterLockManager>
+ protected class TesteeSet<C extends AbstractClusterLockSupport>
{
public final C impl;
public final RpcTarget target;
@@ -909,9 +912,9 @@
protected class LocalLockCaller extends AbstractCaller<Boolean>
{
- private final AbstractClusterLockManager target;
+ private final AbstractClusterLockSupport target;
- public LocalLockCaller(AbstractClusterLockManager target, CountDownLatch startLatch,
+ public LocalLockCaller(AbstractClusterLockSupport target, CountDownLatch startLatch,
CountDownLatch proceedLatch, CountDownLatch finishLatch)
{
super(startLatch, proceedLatch, finishLatch);
More information about the jboss-cvs-commits
mailing list