[jboss-cvs] JBossAS SVN: r104840 - in projects/cluster/ha-server-api/trunk/src: test/java/org/jboss/test/ha/framework/server/lock and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun May 16 17:11:57 EDT 2010
Author: bstansberry at jboss.com
Date: 2010-05-16 17:11:57 -0400 (Sun, 16 May 2010)
New Revision: 104840
Modified:
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java
projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/TimeoutException.java
projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java
projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/SharedLocalYieldingClusterLockManagerUnitTestCase.java
Log:
[JBCLUSTER-268] Improve algorithm
Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java 2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java 2010-05-16 21:11:57 UTC (rev 104840)
@@ -116,6 +116,12 @@
{
throw new IllegalArgumentException("localHandler is null");
}
+ if (!rpcDispatcher.isConsistentWith(membershipNotifier))
+ {
+ throw new IllegalArgumentException(GroupRpcDispatcher.class.getSimpleName() +
+ " " + rpcDispatcher + " is not compatible with " +
+ GroupMembershipNotifier.class.getSimpleName() + " " + membershipNotifier);
+ }
this.rpcDispatcher = rpcDispatcher;
this.membershipNotifier = membershipNotifier;
Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java 2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java 2010-05-16 21:11:57 UTC (rev 104840)
@@ -184,11 +184,6 @@
public void lockLocally(Serializable lockName, long timeout)
throws TimeoutException, InterruptedException
{
- if (this.localNode == null)
- {
- throw new IllegalStateException("Null localNode");
- }
-
doLock(lockName, this.localNode, timeout);
}
@@ -199,11 +194,6 @@
*/
public void unlockLocally(Serializable lockName)
{
- if (this.localNode == null)
- {
- throw new IllegalStateException("Null localNode");
- }
-
doUnlock(lockName, this.localNode);
}
@@ -223,7 +213,10 @@
public void lockGlobally(Serializable lockName, long timeout)
throws TimeoutException, InterruptedException
{
- this.clusterSupport.lock(lockName, timeout);
+ if (!this.clusterSupport.lock(lockName, timeout))
+ {
+ throw new TimeoutException("Cannot acquire lock " + lockName + " from cluster");
+ }
}
/**
@@ -243,7 +236,12 @@
*/
public void start() throws Exception
{
- this.clusterSupport.start();
+ this.clusterSupport.start();
+
+ if (this.localNode == null)
+ {
+ throw new IllegalStateException("Null localNode");
+ }
}
/**
Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java 2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java 2010-05-16 21:11:57 UTC (rev 104840)
@@ -27,7 +27,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.jboss.ha.framework.interfaces.ClusterNode;
@@ -72,88 +72,206 @@
private class LocalLock
{
- private volatile ClusterNode holder;
private volatile boolean removable;
private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
- private final AtomicInteger localLockCount = new AtomicInteger();
-
- private void lock(ClusterNode caller, long timeout) throws TimeoutException
+ private final AtomicReference<LockState> lockState = new AtomicReference<LockState>(LockState.AVAILABLE);
+
+ /**
+ * Just takes the lock for the local node. This should only be invoked
+ * for new locks or in a callback from the cluster support, which
+ * won't make the callback until all other nodes in cluster agree.
+ *
+ * @return the LockState after the lock is taken.
+ */
+ private LockState lockForLocalNode()
{
- if (SharedLocalYieldingClusterLockManager.this.localNode.equals(caller))
+ LockState lockedState = null;
+ boolean updated = false;
+ while (!updated)
{
- this.localLockCount.incrementAndGet();
- this.holder = SharedLocalYieldingClusterLockManager.this.localNode;
+ LockState current = lockState.get();
+ lockedState = current.incrementAndTake(SharedLocalYieldingClusterLockManager.this.localNode);
+ updated = lockState.compareAndSet(current, lockedState);
}
- else
+ return lockedState;
+ }
+
+ private LockState lockForRemotNode(ClusterNode caller, long timeout) throws TimeoutException
+ {
+ LockState lockedState = null;
+
+ long deadline = System.currentTimeMillis() + timeout;
+ boolean wasInterrupted = false;
+ Thread currentThread = Thread.currentThread();
+ waiters.add(currentThread);
+
+ try
{
- long deadline = System.currentTimeMillis() + timeout;
- boolean wasInterrupted = false;
- Thread current = Thread.currentThread();
- waiters.add(current);
-
- try
- {
- // Block while not first in queue or cannot acquire lock
- while (waiters.peek() != current ||
- localLockCount.get() > 0)
- {
- LockSupport.parkUntil(deadline);
- if (Thread.interrupted()) // ignore interrupts while waiting
- wasInterrupted = true;
- if (System.currentTimeMillis() >= deadline)
+ // Block while not first in queue or cannot acquire lock
+ LockState currentState = lockState.get();
+ lockedState = currentState.take(caller);
+ while (waiters.peek() != currentThread ||
+ currentState.lockHolder == SharedLocalYieldingClusterLockManager.this.localNode ||
+ !lockState.compareAndSet(currentState, lockedState))
+ {
+ LockSupport.parkUntil(deadline);
+ if (Thread.interrupted()) // ignore interrupts while waiting
+ wasInterrupted = true;
+
+ currentState = lockState.get();
+ lockedState = currentState.take(caller);
+ if (System.currentTimeMillis() >= deadline)
+ {
+ // One last attempt
+ if (waiters.peek() != currentThread ||
+ currentState.lockHolder == SharedLocalYieldingClusterLockManager.this.localNode ||
+ !lockState.compareAndSet(currentState, lockedState))
{
- if (waiters.peek() != current ||
- localLockCount.get() > 0)
- {
- throw new TimeoutException(this.holder);
- }
- break;
+ throw new TimeoutException(SharedLocalYieldingClusterLockManager.this.localNode);
}
+ // Succeeded
+ break;
}
-
- if (localLockCount.get() == 0)
- {
- holder = caller;
- }
- else
- {
- throw new TimeoutException(this.holder);
- }
}
- finally
- {
- waiters.remove();
- if (wasInterrupted) // reassert interrupt status on exit
- current.interrupt();
- }
}
+ finally
+ {
+ waiters.remove();
+ if (wasInterrupted) // reassert interrupt status on exit
+ currentThread.interrupt();
+ }
+
+ return lockedState;
}
private void unlock(ClusterNode caller)
{
- if (caller.equals(holder))
+ LockState current = lockState.get();
+ if (caller.equals(current.lockHolder))
{
- if (SharedLocalYieldingClusterLockManager.this.localNode.equals(caller))
+ LockState newState = null;
+ if (SharedLocalYieldingClusterLockManager.this.localNode == current.lockHolder)
{
- if (this.localLockCount.decrementAndGet() == 0)
+ for (;;)
{
- holder = null;
+ newState = current.decrementAndRelease();
+ if (lockState.compareAndSet(current, newState))
+ {
+ break;
+ }
+ else
+ {
+ current = lockState.get();
+ }
}
}
else
{
- holder = null;
+ for (;;)
+ {
+ newState = current.release();
+ if (lockState.compareAndSet(current, newState))
+ {
+ break;
+ }
+ else
+ {
+ current = lockState.get();
+ }
+ }
}
- if (holder == null)
+ if (newState.lockHolder == null)
{
+ // Wake up anyone waiting for this lock
LockSupport.unpark(waiters.peek());
}
}
- }
+ }
+ private LockState registerForLocalLock()
+ {
+ LockState current = lockState.get();
+ LockState newState = null;
+ for (;;)
+ {
+ newState = current.increment();
+ if (lockState.compareAndSet(current, newState))
+ {
+ break;
+ }
+ else
+ {
+ current = lockState.get();
+ }
+ }
+ return newState;
+ }
+
}
+ private static class LockState
+ {
+ private static final LockState AVAILABLE = new LockState(0, null, null);
+
+ private final int localLockCount;
+ private final ClusterNode lockHolder;
+ private final Thread latestRegistrant;
+ private final boolean invalid;
+
+ private LockState(int localLockCount, ClusterNode lockHolder, Thread latestRegistrant)
+ {
+ this.localLockCount = localLockCount;
+ this.lockHolder = lockHolder;
+ this.latestRegistrant = latestRegistrant;
+ this.invalid = false;
+ }
+
+ private LockState(int localLockCount, ClusterNode lockHolder, Thread latestRegistrant, boolean invalid)
+ {
+ this.localLockCount = localLockCount;
+ this.lockHolder = lockHolder;
+ this.latestRegistrant = latestRegistrant;
+ this.invalid = invalid;
+ }
+
+ private LockState increment()
+ {
+ return new LockState(localLockCount + 1, lockHolder, Thread.currentThread());
+ }
+
+ private LockState decrement()
+ {
+ return new LockState(localLockCount - 1, lockHolder, latestRegistrant);
+ }
+
+ private LockState incrementAndTake(ClusterNode owner)
+ {
+ return new LockState(localLockCount + 1, owner, latestRegistrant);
+ }
+
+ private LockState take(ClusterNode owner)
+ {
+ return new LockState(localLockCount, owner, latestRegistrant);
+ }
+
+ private LockState decrementAndRelease()
+ {
+ return localLockCount == 1 ? new LockState(0, null, latestRegistrant) : decrement();
+ }
+
+ private LockState release()
+ {
+ return new LockState(localLockCount, null, latestRegistrant);
+ }
+
+ private LockState invalidate()
+ {
+ return new LockState(localLockCount, lockHolder, latestRegistrant, true);
+ }
+
+ }
+
/** Handles callbacks from the cluster lock support object */
private class ClusterHandler implements LocalLockHandler
{
@@ -173,22 +291,60 @@
InterruptedException
{
LocalLock lock = getLocalLock(lockName, true);
- lock.lock(caller, timeout);
- if (!localNode.equals(caller))
+
+ if (localNode.equals(caller)) // can't use identity here as caller may have been deserialized
+ {
+ LockState lockState = lock.lockForLocalNode();
+ if (lockState.latestRegistrant != Thread.currentThread())
+ {
+ // Someone else may be blocking waiting for this thread to
+ // acquire the lock from the cluster. Tell them we have.
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ }
+ else
{
+ LockState currentState = lock.lockForRemotNode(caller, timeout);
+
// Any local thread who has a ref to lock will now need to request it
// remotely from caller, which won't grant it until this method returns.
// So, we can remove lock from the map. If that local thread is granted
// the lock by caller, when that thread calls lockFromCluster, we'll create
// a new lock to handle that.
localLocks.remove(lockName, lock);
+
+ LockState invalidated = null;
+ for (;;)
+ {
+ invalidated = currentState.invalidate();
+ if (lock.lockState.compareAndSet(currentState, invalidated))
+ {
+ break;
+ }
+ else
+ {
+ currentState = lock.lockState.get();
+ }
+ }
+
+ if (invalidated.latestRegistrant != null)
+ {
+ // Wake up anyone waiting on this lock so they know it's invalid
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
+ }
}
}
public ClusterNode getLockHolder(Serializable lockName)
{
LocalLock lock = getLocalLock(lockName, false);
- return lock == null ? null : lock.holder;
+ return lock == null ? null : lock.lockState.get().lockHolder;
}
public void unlockFromCluster(Serializable lockName, ClusterNode caller)
@@ -258,50 +414,89 @@
*/
public LockResult lock(Serializable lockName, long timeout, boolean newLock)
throws TimeoutException, InterruptedException
- {
- if (this.localNode == null)
- {
- throw new IllegalStateException("Null localNode");
- }
-
+ {
LockResult result = null;
LocalLock localLock = getLocalLock(lockName, false);
if (localLock == null)
{
+ localLock = getLocalLock(lockName, true);
if (newLock)
{
// Here we assume the caller knows what they are doing and this
// is really is a new lock, and that no other
- // node is going to try to take it
- localLock = getLocalLock(lockName, true);
- localLock.lock(this.localNode, timeout);
- result = (localLock.localLockCount.get() == 1 ? LockResult.NEW_LOCK : LockResult.ALREADY_HELD);
+ // node or local thread is trying to try to take it
+
+ LockState lockState = localLock.lockForLocalNode();
+ result = (lockState.localLockCount == 1 ? LockResult.NEW_LOCK : LockResult.ALREADY_HELD);
}
- else
- {
- this.clusterSupport.lock(lockName, timeout);
- result = LockResult.ACQUIRED_FROM_CLUSTER;
- }
}
- else
+
+ if (result == null) // We have to ask the cluster
{
- localLock.localLockCount.incrementAndGet(); // Now no other node can become localLock.holder
+ LockState lockState = localLock.registerForLocalLock();
try
{
- if (this.localNode.equals(localLock.holder))
- {
- result = LockResult.ALREADY_HELD;
-
- // Check for race where we locked something that's been removed
- if (localLock.removable && localLock != getLocalLock(lockName, false))
+ long remaining = timeout;
+ do
+ {
+ if (this.localNode == lockState.lockHolder) // object identity is safe
{
- return lock(lockName, timeout, newLock);
+ result = LockResult.ALREADY_HELD;
+
+ // Check for race where we registered for something that's been removed
+ if (localLock.removable && localLock != getLocalLock(lockName, false))
+ {
+ // oops; try again
+ result = lock(lockName, remaining, newLock);
+ }
}
+ else
+ {
+ if (lockState.invalid)
+ {
+ // the lock was removed; start over
+ result = lock(lockName, remaining, newLock);
+ }
+ else if (lockState.localLockCount == 1)
+ {
+ // Only one thread should ask the cluster for the lock;
+ // we were first so it's our task
+ if (this.clusterSupport.lock(lockName, remaining))
+ {
+ result = LockResult.ACQUIRED_FROM_CLUSTER;
+ }
+ else
+ {
+ throw new TimeoutException("Cannot acquire lock " + lockName + " from cluster");
+ }
+ }
+ else
+ {
+ // Some other thread is asking the cluster
+ // Wait for them to finish
+ long start = System.currentTimeMillis();
+ synchronized (localLock)
+ {
+ lockState = localLock.lockState.get();
+ if (lockState.lockHolder == null)
+ {
+ localLock.wait(remaining);
+ }
+ }
+ // If something woke us up, see where we are
+ // and loop again
+ lockState = localLock.lockState.get();
+
+ remaining = timeout - (System.currentTimeMillis() - start);
+
+ }
+ }
}
- else
+ while (result == null && remaining > 0);
+
+ if (result == null)
{
- this.clusterSupport.lock(lockName, timeout);
- result = LockResult.ACQUIRED_FROM_CLUSTER;
+ throwTimeoutException(lockName, lockState);
}
}
finally
@@ -314,10 +509,12 @@
{
// Only decrement if the current lock object for this key is
// the same one we incremented above
- LocalLock current = localLocks.get(lockName);
- if (current == localLock)
+ boolean updated = false;
+ while (localLocks.get(lockName) == localLock && !updated)
{
- localLock.localLockCount.decrementAndGet();
+ LockState currentState = localLock.registerForLocalLock();
+ LockState decremented = currentState.decrement();
+ updated = localLock.lockState.compareAndSet(currentState, decremented);
}
}
}
@@ -325,7 +522,7 @@
return result;
}
-
+
/**
* Releases a previously acquired lock.
*
@@ -343,7 +540,7 @@
this.clusterSupport.unlock(lockName);
- if (lock != null && lock.removable && lock.localLockCount.get() == 0)
+ if (lock != null && lock.removable && lock.lockState.get().lockHolder == null)
{
localLocks.remove(lockName, lock);
}
@@ -357,6 +554,11 @@
public void start() throws Exception
{
this.clusterSupport.start();
+
+ if (this.localNode == null)
+ {
+ throw new IllegalStateException("Null localNode");
+ }
}
/**
@@ -386,5 +588,13 @@
}
return category;
}
+
+ private static void throwTimeoutException(Serializable lockName, LockState lockState) throws TimeoutException
+ {
+ TimeoutException te = lockState.lockHolder == null
+ ? new TimeoutException("Unable to acquire lock " + lockName)
+ : new TimeoutException(lockState.lockHolder);
+ throw te;
+ }
}
Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/TimeoutException.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/TimeoutException.java 2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/TimeoutException.java 2010-05-16 21:11:57 UTC (rev 104840)
@@ -42,6 +42,11 @@
this.owner = owner;
}
+ public TimeoutException(String msg)
+ {
+ super(msg);
+ }
+
public ClusterNode getOwner()
{
return owner;
Modified: projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java 2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java 2010-05-16 21:11:57 UTC (rev 104840)
@@ -96,6 +96,7 @@
}
catch (IllegalArgumentException good) {}
+ expect(haPartition.isConsistentWith(haPartition)).andReturn(Boolean.TRUE);
expect(haPartition.getClusterNode()).andReturn(node1);
expect(haPartition.getGroupName()).andReturn("TestPartition");
@@ -111,7 +112,8 @@
public void testStart() throws Exception
{
HAPartition haPartition = createNiceMock(HAPartition.class);
- LocalLockHandler handler = createNiceMock(LocalLockHandler.class);
+ LocalLockHandler handler = createNiceMock(LocalLockHandler.class);
+ expect(haPartition.isConsistentWith(haPartition)).andReturn(Boolean.TRUE);
expect(haPartition.getClusterNode()).andReturn(node1);
expect(haPartition.getGroupName()).andReturn("TestPartition");
@@ -689,7 +691,8 @@
protected TesteeSet<T> getTesteeSet(ClusterNode node, int viewPos, int viewSize) throws Exception
{
HAPartition haPartition = createNiceMock(HAPartition.class);
- LocalLockHandler handler = createNiceMock(LocalLockHandler.class);
+ LocalLockHandler handler = createNiceMock(LocalLockHandler.class);
+ expect(haPartition.isConsistentWith(haPartition)).andReturn(Boolean.TRUE);
expect(haPartition.getClusterNode()).andReturn(node);
expect(haPartition.getGroupName()).andReturn("TestPartition");
Modified: projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/SharedLocalYieldingClusterLockManagerUnitTestCase.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/SharedLocalYieldingClusterLockManagerUnitTestCase.java 2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/SharedLocalYieldingClusterLockManagerUnitTestCase.java 2010-05-16 21:11:57 UTC (rev 104840)
@@ -31,6 +31,7 @@
import static org.easymock.EasyMock.isA;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.resetToNice;
import static org.easymock.EasyMock.resetToStrict;
import static org.jboss.test.ha.framework.server.lock.LockParamsMatcher.eqLockParams;
@@ -39,17 +40,23 @@
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
+import org.apache.log4j.Logger;
import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.ResponseFilter;
import org.jboss.ha.framework.server.lock.AbstractClusterLockSupport;
import org.jboss.ha.framework.server.lock.RemoteLockResponse;
import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager;
+import org.jboss.ha.framework.server.lock.TimeoutException;
import org.jboss.ha.framework.server.lock.AbstractClusterLockSupport.RpcTarget;
import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager.LockResult;
import org.jboss.test.ha.util.MockClusterNode;
@@ -64,6 +71,8 @@
*/
public class SharedLocalYieldingClusterLockManagerUnitTestCase extends TestCase
{
+ private static final Logger log = Logger.getLogger(SharedLocalYieldingClusterLockManagerUnitTestCase.class);
+
private static final ResponseFilter NULL_FILTER = null;
private ClusterNode node1;
@@ -141,10 +150,142 @@
assertEquals(LockResult.ACQUIRED_FROM_CLUSTER, ts.testee.lock("test", 1000, true));
}
+
+ public void testConcurrentLocalLocks() throws Exception
+ {
+ concurrentLocalLocksTest(false, -1);
+ }
+
+ public void testConcurrentLocalNewLocks() throws Exception
+ {
+ concurrentLocalLocksTest(true, -1);
+ }
+
+ public void testConcurrentLocalLocksSomeNew() throws Exception
+ {
+ concurrentLocalLocksTest(true, 0);
+ concurrentLocalLocksTest(true, 1);
+ concurrentLocalLocksTest(true, 2);
+ concurrentLocalLocksTest(true, 3);
+ }
+
+ private void concurrentLocalLocksTest(boolean newLock, int newLockPos) throws Exception
+ {
+ TesteeSet ts = getTesteeSet(node1, 0, 3);
+
+ if (!newLock || newLockPos >= 0)
+ {
+ resetToNice(ts.partition);
+ EasyMock.makeThreadSafe(ts.partition, true);
+ ts.control.checkOrder(false);
+
+ List<RemoteLockResponse> rspList = getOKResponses(2);
+ expect(ts.partition.getMethodCallTimeout()).andReturn(60000l).anyTimes();
+ expect(ts.partition.callMethodOnCluster(eq("test"),
+ eq("remoteLock"),
+ eqLockParams(node1, 200000),
+ aryEq(AbstractClusterLockSupport.REMOTE_LOCK_TYPES),
+ eq(RemoteLockResponse.class),
+ eq(true),
+ eq(NULL_FILTER),
+ anyInt(),
+ eq(false))).andReturn(rspList).anyTimes();
+
+ replay(ts.partition);
+ }
+ Locker[] lockers = new Locker[4];
+ CountDownLatch readyLatch = new CountDownLatch(lockers.length);
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch endLatch = new CountDownLatch(lockers.length);
+ ExecutorService executor = Executors.newFixedThreadPool(lockers.length);
+
+ for (int i = 0; i < lockers.length; i++)
+ {
+ boolean newOne = newLock && (i == newLockPos || newLockPos < 0);
+ lockers[i] = new Locker(ts, newOne, readyLatch, startLatch, endLatch);
+ executor.submit(lockers[i]);
+ }
+
+ readyLatch.await(5, TimeUnit.SECONDS);
+ startLatch.countDown();
+ endLatch.await(5, TimeUnit.SECONDS);
+
+ boolean sawNewLockResult = false;
+
+ for (Locker locker : lockers)
+ {
+ if (locker.exception != null)
+ {
+ log.error(locker + " caught exception", locker.exception);
+ throw (Exception) locker.exception;
+ }
+
+ LockResult newLockResult = locker.newLock ? LockResult.NEW_LOCK : LockResult.ACQUIRED_FROM_CLUSTER;
+
+ if (sawNewLockResult)
+ {
+ if (locker.newLock)
+ {
+ assertTrue(locker.result == LockResult.ALREADY_HELD || locker.result == LockResult.NEW_LOCK);
+ }
+ else
+ {
+ assertTrue(locker.result == LockResult.ALREADY_HELD || locker.result == LockResult.ACQUIRED_FROM_CLUSTER);
+ }
+ }
+ else if (locker.result != LockResult.ALREADY_HELD)
+ {
+ assertEquals(newLockResult, locker.result);
+ sawNewLockResult = true;
+ }
+ }
+
+ assertTrue("Saw a new lock result", sawNewLockResult);
+ }
+
+ public void testRejectionFromCluster() throws Exception
+ {
+ TesteeSet ts = getTesteeSet(node1, 0, 3);
+
+ resetToNice(ts.partition);
+
+ List<RemoteLockResponse> rspList = getRejectionResponses(node3, 2);
+ expect(ts.partition.getMethodCallTimeout()).andReturn(60000l).atLeastOnce();
+ expect(ts.partition.callMethodOnCluster(eq("test"),
+ eq("remoteLock"),
+ eqLockParams(node1, 200000),
+ aryEq(AbstractClusterLockSupport.REMOTE_LOCK_TYPES),
+ eq(RemoteLockResponse.class),
+ eq(true),
+ eq(NULL_FILTER),
+ anyInt(),
+ eq(false))).andReturn(rspList).atLeastOnce();
+// expect(ts.partition.callMethodOnCluster(eq("test"),
+// eq("releaseRemoteLock"),
+// eqLockParams(node1, 200000),
+// aryEq(AbstractClusterLockSupport.REMOTE_LOCK_TYPES),
+// eq(RemoteLockResponse.class),
+// eq(true),
+// eq(NULL_FILTER),
+// anyInt(),
+// eq(false))).andReturn(rspList);
+
+ replay(ts.partition);
+
+ try
+ {
+ ts.testee.lock("test", 1000, false);
+ fail("Did not throw TimeoutException");
+ }
+ catch (TimeoutException ok) {}
+ }
+
protected TesteeSet getTesteeSet(ClusterNode node, int viewPos, int viewSize) throws Exception
{
- HAPartition haPartition = createNiceMock(HAPartition.class);
+ IMocksControl control = EasyMock.createNiceControl();
+ HAPartition haPartition = control.createMock(HAPartition.class);
+ expect(haPartition.isConsistentWith(haPartition)).andReturn(Boolean.TRUE);
expect(haPartition.getClusterNode()).andReturn(node);
expect(haPartition.getPartitionName()).andReturn("TestPartition");
@@ -161,7 +302,7 @@
reset(haPartition);
- return new TesteeSet(testee, haPartition, c.getValue());
+ return new TesteeSet(testee, haPartition, c.getValue(), control);
}
private Vector<ClusterNode> getView(ClusterNode member, int viewPos, int numMembers)
@@ -186,18 +327,31 @@
}
return rspList;
}
+
+ private static List<RemoteLockResponse> getRejectionResponses(ClusterNode owner, int numResponses)
+ {
+ List<RemoteLockResponse> rspList = new ArrayList<RemoteLockResponse>();
+ rspList.add(new RemoteLockResponse(owner, RemoteLockResponse.Flag.FAIL));
+ for (int i = 1; i < numResponses + 1; i++)
+ {
+ rspList.add(new RemoteLockResponse(null, RemoteLockResponse.Flag.OK));
+ }
+ return rspList;
+ }
private class TesteeSet
{
+ private final IMocksControl control;
private final SharedLocalYieldingClusterLockManager testee;
private final HAPartition partition;
private final RpcTarget target;
- private TesteeSet(SharedLocalYieldingClusterLockManager testee, HAPartition partition, RpcTarget target)
+ private TesteeSet(SharedLocalYieldingClusterLockManager testee, HAPartition partition, RpcTarget target, IMocksControl control)
{
this.testee = testee;
this.partition = partition;
this.target = target;
+ this.control = control;
}
}
@@ -205,15 +359,17 @@
{
private final TesteeSet ts;
private final boolean newLock;
+ private final CountDownLatch readyLatch;
private final CountDownLatch startLatch;
private final CountDownLatch endLatch;
private LockResult result;
private Exception exception;
- private Locker(TesteeSet ts, boolean newLock, CountDownLatch startLatch, CountDownLatch endLatch)
+ private Locker(TesteeSet ts, boolean newLock, CountDownLatch readyLatch, CountDownLatch startLatch, CountDownLatch endLatch)
{
this.ts = ts;
this.newLock = newLock;
+ this.readyLatch = readyLatch;
this.startLatch = startLatch;
this.endLatch = endLatch;
}
@@ -222,6 +378,7 @@
{
try
{
+ readyLatch.countDown();
startLatch.await(10, TimeUnit.SECONDS);
result = ts.testee.lock("test", 1000, newLock);
}
More information about the jboss-cvs-commits
mailing list