[jboss-cvs] JBossAS SVN: r104840 - in projects/cluster/ha-server-api/trunk/src: test/java/org/jboss/test/ha/framework/server/lock and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun May 16 17:11:57 EDT 2010


Author: bstansberry at jboss.com
Date: 2010-05-16 17:11:57 -0400 (Sun, 16 May 2010)
New Revision: 104840

Modified:
   projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
   projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java
   projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java
   projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/TimeoutException.java
   projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java
   projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/SharedLocalYieldingClusterLockManagerUnitTestCase.java
Log:
[JBCLUSTER-268] Improve algorithm

Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java	2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java	2010-05-16 21:11:57 UTC (rev 104840)
@@ -116,6 +116,12 @@
       {
          throw new IllegalArgumentException("localHandler is null");
       }
+      if (!rpcDispatcher.isConsistentWith(membershipNotifier))
+      {
+         throw new IllegalArgumentException(GroupRpcDispatcher.class.getSimpleName() + 
+               " " + rpcDispatcher + " is not compatible with " + 
+               GroupMembershipNotifier.class.getSimpleName() + " " + membershipNotifier);
+      }
       
       this.rpcDispatcher = rpcDispatcher;
       this.membershipNotifier = membershipNotifier;

Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java	2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/LocalAndClusterLockManager.java	2010-05-16 21:11:57 UTC (rev 104840)
@@ -184,11 +184,6 @@
    public void lockLocally(Serializable lockName, long timeout)
          throws TimeoutException, InterruptedException
    {
-      if (this.localNode == null)
-      {
-         throw new IllegalStateException("Null localNode");
-      }
-      
       doLock(lockName, this.localNode, timeout);
    }
    
@@ -199,11 +194,6 @@
     */
    public void unlockLocally(Serializable lockName)
    {
-      if (this.localNode == null)
-      {
-         throw new IllegalStateException("Null localNode");
-      }
-      
       doUnlock(lockName, this.localNode);   
    }
    
@@ -223,7 +213,10 @@
    public void lockGlobally(Serializable lockName, long timeout)
          throws TimeoutException, InterruptedException
    {
-      this.clusterSupport.lock(lockName, timeout);
+      if (!this.clusterSupport.lock(lockName, timeout))
+      {
+         throw new TimeoutException("Cannot acquire lock " + lockName + " from cluster");
+      }
    }
    
    /**
@@ -243,7 +236,12 @@
     */
    public void start() throws Exception
    {
-      this.clusterSupport.start();
+      this.clusterSupport.start();   
+      
+      if (this.localNode == null)
+      {
+         throw new IllegalStateException("Null localNode");
+      }
    }
    
    /**

Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java	2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/SharedLocalYieldingClusterLockManager.java	2010-05-16 21:11:57 UTC (rev 104840)
@@ -27,7 +27,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
 
 import org.jboss.ha.framework.interfaces.ClusterNode;
@@ -72,88 +72,206 @@
    
    private class LocalLock
    {
-      private volatile ClusterNode holder;
       private volatile boolean removable;
       private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
-      private final AtomicInteger localLockCount = new AtomicInteger();
-
-      private void lock(ClusterNode caller, long timeout) throws TimeoutException
+      private final AtomicReference<LockState> lockState = new AtomicReference<LockState>(LockState.AVAILABLE);
+      
+      /**
+       * Just takes the lock for the local node. This should only be invoked
+       * for new locks or in a callback from the cluster support, which
+       * won't make the callback until all other nodes in cluster agree.
+       * 
+       * @return the LockState after the lock is taken.
+       */
+      private LockState lockForLocalNode()
       {
-         if (SharedLocalYieldingClusterLockManager.this.localNode.equals(caller))
+         LockState lockedState = null;
+         boolean updated = false;
+         while (!updated)
          {
-            this.localLockCount.incrementAndGet();
-            this.holder = SharedLocalYieldingClusterLockManager.this.localNode;
+            LockState current = lockState.get();
+            lockedState = current.incrementAndTake(SharedLocalYieldingClusterLockManager.this.localNode);
+            updated = lockState.compareAndSet(current, lockedState);
          }
-         else
+         return lockedState;
+      }
+      
+      private LockState lockForRemotNode(ClusterNode caller, long timeout) throws TimeoutException
+      {
+         LockState lockedState = null;
+         
+         long deadline = System.currentTimeMillis() + timeout;
+         boolean wasInterrupted = false;
+         Thread currentThread = Thread.currentThread();
+         waiters.add(currentThread);
+         
+         try
          {
-            long deadline = System.currentTimeMillis() + timeout;
-            boolean wasInterrupted = false;
-            Thread current = Thread.currentThread();
-            waiters.add(current);
-            
-            try
-            {
-               // Block while not first in queue or cannot acquire lock
-               while (waiters.peek() != current || 
-                      localLockCount.get() > 0) 
-               { 
-                  LockSupport.parkUntil(deadline);
-                  if (Thread.interrupted()) // ignore interrupts while waiting
-                     wasInterrupted = true;
-                  if (System.currentTimeMillis() >= deadline)
+            // Block while not first in queue or cannot acquire lock
+            LockState currentState = lockState.get();
+            lockedState = currentState.take(caller);
+            while (waiters.peek() != currentThread || 
+                   currentState.lockHolder == SharedLocalYieldingClusterLockManager.this.localNode ||
+                   !lockState.compareAndSet(currentState, lockedState)) 
+            { 
+               LockSupport.parkUntil(deadline);
+               if (Thread.interrupted()) // ignore interrupts while waiting
+                  wasInterrupted = true;
+               
+               currentState = lockState.get();
+               lockedState = currentState.take(caller);
+               if (System.currentTimeMillis() >= deadline)
+               {
+                  // One last attempt
+                  if (waiters.peek() != currentThread || 
+                        currentState.lockHolder == SharedLocalYieldingClusterLockManager.this.localNode ||
+                        !lockState.compareAndSet(currentState, lockedState))
                   {
-                     if (waiters.peek() != current || 
-                           localLockCount.get() > 0)
-                     {
-                        throw new TimeoutException(this.holder);
-                     }
-                     break;
+                     throw new TimeoutException(SharedLocalYieldingClusterLockManager.this.localNode);
                   }
+                  // Succeeded
+                  break;
                }
-               
-               if (localLockCount.get() == 0)
-               {
-                  holder = caller;
-               }
-               else
-               {
-                  throw new TimeoutException(this.holder);
-               }
             }
-            finally
-            {
-               waiters.remove();
-               if (wasInterrupted)          // reassert interrupt status on exit
-                  current.interrupt();
-            }
          }
+         finally
+         {
+            waiters.remove();
+            if (wasInterrupted)          // reassert interrupt status on exit
+               currentThread.interrupt();
+         }
+         
+         return lockedState;
       }
       
       private void unlock(ClusterNode caller)
       {
-         if (caller.equals(holder))              
+         LockState current = lockState.get();
+         if (caller.equals(current.lockHolder))              
          {
-            if (SharedLocalYieldingClusterLockManager.this.localNode.equals(caller))
+            LockState newState = null;
+            if (SharedLocalYieldingClusterLockManager.this.localNode == current.lockHolder)
             {
-               if (this.localLockCount.decrementAndGet() == 0)
+               for (;;)
                {
-                  holder = null;
+                  newState = current.decrementAndRelease();
+                  if (lockState.compareAndSet(current, newState))
+                  {
+                     break;
+                  }
+                  else
+                  {
+                     current = lockState.get();
+                  }
                }
             }
             else
             {
-               holder = null;
+               for (;;)
+               {
+                  newState = current.release();
+                  if (lockState.compareAndSet(current, newState))
+                  {
+                     break;
+                  }
+                  else
+                  {
+                     current = lockState.get();
+                  }
+               }
             }
             
-            if (holder == null)
+            if (newState.lockHolder == null)
             {
+               // Wake up anyone waiting for this lock
                LockSupport.unpark(waiters.peek());
             }
          }
-       } 
+      }
       
+      private LockState registerForLocalLock()
+      {       
+         LockState current = lockState.get();
+         LockState newState = null;
+         for (;;)
+         {
+            newState = current.increment();
+            if (lockState.compareAndSet(current, newState))
+            {
+               break;
+            }
+            else
+            {
+               current = lockState.get();
+            }               
+         }
+         return newState;
+      }
+      
    }
    
+   private static class LockState
+   {
+      private static final LockState AVAILABLE = new LockState(0, null, null);
+      
+      private final int localLockCount;
+      private final ClusterNode lockHolder;
+      private final Thread latestRegistrant; 
+      private final boolean invalid;
+      
+      private LockState(int localLockCount, ClusterNode lockHolder, Thread latestRegistrant)
+      {
+         this.localLockCount = localLockCount;
+         this.lockHolder = lockHolder;
+         this.latestRegistrant = latestRegistrant;
+         this.invalid = false;
+      }
+      
+      private LockState(int localLockCount, ClusterNode lockHolder, Thread latestRegistrant, boolean invalid)
+      {
+         this.localLockCount = localLockCount;
+         this.lockHolder = lockHolder;
+         this.latestRegistrant = latestRegistrant;
+         this.invalid = invalid;
+      }
+      
+      private LockState increment()
+      {
+         return new LockState(localLockCount + 1, lockHolder, Thread.currentThread());
+      }
+      
+      private LockState decrement()
+      {
+         return new LockState(localLockCount - 1, lockHolder, latestRegistrant);
+      }
+      
+      private LockState incrementAndTake(ClusterNode owner)
+      {
+         return new LockState(localLockCount + 1, owner, latestRegistrant);
+      }
+      
+      private LockState take(ClusterNode owner)
+      {
+         return new LockState(localLockCount, owner, latestRegistrant);
+      }
+      
+      private LockState decrementAndRelease()
+      {
+         return localLockCount == 1 ? new LockState(0, null, latestRegistrant) : decrement();
+      }
+      
+      private LockState release()
+      {
+         return new LockState(localLockCount, null, latestRegistrant);
+      }
+      
+      private LockState invalidate()
+      {
+         return new LockState(localLockCount, lockHolder, latestRegistrant, true);
+      }
+      
+   }
+   
    /** Handles callbacks from the cluster lock support object */
    private class ClusterHandler implements LocalLockHandler
    {      
@@ -173,22 +291,60 @@
             InterruptedException
       {
          LocalLock lock = getLocalLock(lockName, true);
-         lock.lock(caller, timeout);
-         if (!localNode.equals(caller))
+         
+         if (localNode.equals(caller)) // can't use identity here as caller may have been deserialized
+         {    
+            LockState lockState = lock.lockForLocalNode();
+            if (lockState.latestRegistrant != Thread.currentThread())
+            {
+               // Someone else may be blocking waiting for this thread to
+               // acquire the lock from the cluster. Tell them we have.
+               synchronized (lock)
+               {
+                  lock.notifyAll();
+               }
+            }
+         }
+         else 
          {
+            LockState currentState = lock.lockForRemotNode(caller, timeout);
+            
             // Any local thread who has a ref to lock will now need to request it
             // remotely from caller, which won't grant it until this method returns.
             // So, we can remove lock from the map. If that local thread is granted
             // the lock by caller, when that thread calls lockFromCluster, we'll create
             // a new lock to handle that.
             localLocks.remove(lockName, lock);
+            
+            LockState invalidated = null;
+            for (;;)
+            {
+               invalidated = currentState.invalidate();
+               if (lock.lockState.compareAndSet(currentState, invalidated))
+               {
+                  break;
+               }
+               else
+               {
+                  currentState = lock.lockState.get();
+               }
+            }
+            
+            if (invalidated.latestRegistrant != null)
+            {
+               // Wake up anyone waiting on this lock so they know it's invalid
+               synchronized (lock)
+               {
+                  lock.notifyAll();
+               }
+            }
          }
       }
 
       public ClusterNode getLockHolder(Serializable lockName)
       {
          LocalLock lock = getLocalLock(lockName, false);
-         return lock == null ? null : lock.holder;
+         return lock == null ? null : lock.lockState.get().lockHolder;
       }
 
       public void unlockFromCluster(Serializable lockName, ClusterNode caller)
@@ -258,50 +414,89 @@
     */
    public LockResult lock(Serializable lockName, long timeout, boolean newLock)
       throws TimeoutException, InterruptedException
-   {   
-      if (this.localNode == null)
-      {
-         throw new IllegalStateException("Null localNode");
-      }
-      
+   {      
       LockResult result = null;
       LocalLock localLock = getLocalLock(lockName, false);
       if (localLock == null)
       {
+         localLock = getLocalLock(lockName, true);
          if (newLock)
          {
             // Here we assume the caller knows what they are doing and this 
             // is really is a new lock, and that no other
-            // node is going to try to take it
-            localLock = getLocalLock(lockName, true);
-            localLock.lock(this.localNode, timeout);
-            result = (localLock.localLockCount.get() == 1 ? LockResult.NEW_LOCK : LockResult.ALREADY_HELD);
+            // node or local thread is trying to try to take it
+            
+            LockState lockState = localLock.lockForLocalNode();
+            result = (lockState.localLockCount == 1 ? LockResult.NEW_LOCK : LockResult.ALREADY_HELD);
          }
-         else
-         {
-            this.clusterSupport.lock(lockName, timeout);
-            result = LockResult.ACQUIRED_FROM_CLUSTER;
-         }
       }
-      else
+      
+      if (result == null) // We have to ask the cluster
       {
-         localLock.localLockCount.incrementAndGet(); // Now no other node can become localLock.holder         
+         LockState lockState = localLock.registerForLocalLock(); 
          try
          {
-            if (this.localNode.equals(localLock.holder))
-            {
-               result = LockResult.ALREADY_HELD;
-               
-               // Check for race where we locked something that's been removed
-               if (localLock.removable && localLock != getLocalLock(lockName, false))
+            long remaining = timeout;
+            do
+            {               
+               if (this.localNode == lockState.lockHolder) // object identity is safe
                {
-                  return lock(lockName, timeout, newLock);
+                  result = LockResult.ALREADY_HELD;
+                  
+                  // Check for race where we registered for something that's been removed
+                  if (localLock.removable && localLock != getLocalLock(lockName, false))
+                  {
+                     // oops; try again
+                     result = lock(lockName, remaining, newLock);
+                  }
                }
+               else
+               {  
+                  if (lockState.invalid)
+                  {
+                     // the lock was removed; start over
+                     result = lock(lockName, remaining, newLock);
+                  }
+                  else if (lockState.localLockCount == 1)
+                  {
+                     // Only one thread should ask the cluster for the lock;
+                     // we were first so it's our task
+                     if (this.clusterSupport.lock(lockName, remaining))
+                     {
+                        result = LockResult.ACQUIRED_FROM_CLUSTER;
+                     }
+                     else
+                     {
+                        throw new TimeoutException("Cannot acquire lock " + lockName + " from cluster");
+                     }
+                  }
+                  else
+                  {
+                     // Some other thread is asking the cluster
+                     // Wait for them to finish
+                     long start = System.currentTimeMillis();
+                     synchronized (localLock)
+                     {
+                        lockState = localLock.lockState.get();
+                        if (lockState.lockHolder == null)
+                        {
+                           localLock.wait(remaining);
+                        }
+                     }
+                     // If something woke us up, see where we are
+                     // and loop again
+                     lockState = localLock.lockState.get();
+                     
+                     remaining = timeout - (System.currentTimeMillis() - start);
+                     
+                  }
+               }
             }
-            else
+            while (result == null && remaining > 0);
+            
+            if (result == null)
             {
-               this.clusterSupport.lock(lockName, timeout);
-               result = LockResult.ACQUIRED_FROM_CLUSTER;
+               throwTimeoutException(lockName, lockState);
             }
          }
          finally
@@ -314,10 +509,12 @@
             {
                // Only decrement if the current lock object for this key is
                // the same one we incremented above
-               LocalLock current = localLocks.get(lockName);
-               if (current == localLock)
+               boolean updated = false;
+               while (localLocks.get(lockName) == localLock && !updated)
                {
-                  localLock.localLockCount.decrementAndGet();
+                  LockState currentState = localLock.registerForLocalLock();
+                  LockState decremented = currentState.decrement();
+                  updated = localLock.lockState.compareAndSet(currentState, decremented);
                }
             }
          }         
@@ -325,7 +522,7 @@
       
       return result;
    }
-   
+
    /**
     * Releases a previously acquired lock.
     * 
@@ -343,7 +540,7 @@
       
       this.clusterSupport.unlock(lockName);
       
-      if (lock != null && lock.removable && lock.localLockCount.get() == 0)
+      if (lock != null && lock.removable && lock.lockState.get().lockHolder == null)
       {
          localLocks.remove(lockName, lock);
       }
@@ -357,6 +554,11 @@
    public void start() throws Exception
    {
       this.clusterSupport.start();
+      
+      if (this.localNode == null)
+      {
+         throw new IllegalStateException("Null localNode");
+      }
    }
    
    /**
@@ -386,5 +588,13 @@
       }
       return category;
    }
+   
+   private static void throwTimeoutException(Serializable lockName, LockState lockState) throws TimeoutException
+   {
+      TimeoutException te = lockState.lockHolder == null 
+         ? new TimeoutException("Unable to acquire lock " + lockName) 
+         : new TimeoutException(lockState.lockHolder);
+      throw te;
+   }
 
 }

Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/TimeoutException.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/TimeoutException.java	2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/TimeoutException.java	2010-05-16 21:11:57 UTC (rev 104840)
@@ -42,6 +42,11 @@
       this.owner = owner;
    }
    
+   public TimeoutException(String msg)
+   {
+      super(msg);
+   }
+   
    public ClusterNode getOwner()
    {
       return owner;

Modified: projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java	2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/ClusteredLockManagerTestBase.java	2010-05-16 21:11:57 UTC (rev 104840)
@@ -96,6 +96,7 @@
       }
       catch (IllegalArgumentException good) {}  
       
+      expect(haPartition.isConsistentWith(haPartition)).andReturn(Boolean.TRUE);
       expect(haPartition.getClusterNode()).andReturn(node1);
       expect(haPartition.getGroupName()).andReturn("TestPartition");
       
@@ -111,7 +112,8 @@
    public void testStart() throws Exception
    {
       HAPartition haPartition = createNiceMock(HAPartition.class);      
-      LocalLockHandler handler = createNiceMock(LocalLockHandler.class);       
+      LocalLockHandler handler = createNiceMock(LocalLockHandler.class); 
+      expect(haPartition.isConsistentWith(haPartition)).andReturn(Boolean.TRUE);
       expect(haPartition.getClusterNode()).andReturn(node1);
       expect(haPartition.getGroupName()).andReturn("TestPartition");
       
@@ -689,7 +691,8 @@
    protected TesteeSet<T> getTesteeSet(ClusterNode node, int viewPos, int viewSize) throws Exception
    {
       HAPartition haPartition = createNiceMock(HAPartition.class);      
-      LocalLockHandler handler = createNiceMock(LocalLockHandler.class);       
+      LocalLockHandler handler = createNiceMock(LocalLockHandler.class);  
+      expect(haPartition.isConsistentWith(haPartition)).andReturn(Boolean.TRUE);
       expect(haPartition.getClusterNode()).andReturn(node);
       expect(haPartition.getGroupName()).andReturn("TestPartition");
       

Modified: projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/SharedLocalYieldingClusterLockManagerUnitTestCase.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/SharedLocalYieldingClusterLockManagerUnitTestCase.java	2010-05-16 19:25:32 UTC (rev 104839)
+++ projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/SharedLocalYieldingClusterLockManagerUnitTestCase.java	2010-05-16 21:11:57 UTC (rev 104840)
@@ -31,6 +31,7 @@
 import static org.easymock.EasyMock.isA;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.resetToNice;
 import static org.easymock.EasyMock.resetToStrict;
 import static org.jboss.test.ha.framework.server.lock.LockParamsMatcher.eqLockParams;
 
@@ -39,17 +40,23 @@
 import java.util.List;
 import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
+import org.apache.log4j.Logger;
 import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
 import org.jboss.ha.framework.interfaces.ClusterNode;
 import org.jboss.ha.framework.interfaces.HAPartition;
 import org.jboss.ha.framework.interfaces.ResponseFilter;
 import org.jboss.ha.framework.server.lock.AbstractClusterLockSupport;
 import org.jboss.ha.framework.server.lock.RemoteLockResponse;
 import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager;
+import org.jboss.ha.framework.server.lock.TimeoutException;
 import org.jboss.ha.framework.server.lock.AbstractClusterLockSupport.RpcTarget;
 import org.jboss.ha.framework.server.lock.SharedLocalYieldingClusterLockManager.LockResult;
 import org.jboss.test.ha.util.MockClusterNode;
@@ -64,6 +71,8 @@
  */
 public class SharedLocalYieldingClusterLockManagerUnitTestCase extends TestCase
 {
+   private static final Logger log = Logger.getLogger(SharedLocalYieldingClusterLockManagerUnitTestCase.class);
+   
    private static final ResponseFilter NULL_FILTER = null;
    
    private ClusterNode node1;
@@ -141,10 +150,142 @@
 
       assertEquals(LockResult.ACQUIRED_FROM_CLUSTER, ts.testee.lock("test", 1000, true));
    }
+   
+   public void testConcurrentLocalLocks() throws Exception
+   {
+      concurrentLocalLocksTest(false, -1);
+   }
+   
+   public void testConcurrentLocalNewLocks() throws Exception
+   {
+      concurrentLocalLocksTest(true, -1);
+   }
+   
+   public void testConcurrentLocalLocksSomeNew() throws Exception
+   {
+      concurrentLocalLocksTest(true, 0);
+      concurrentLocalLocksTest(true, 1);
+      concurrentLocalLocksTest(true, 2);
+      concurrentLocalLocksTest(true, 3);
+   }
+   
+   private void concurrentLocalLocksTest(boolean newLock, int newLockPos) throws Exception
+   {
+      TesteeSet ts = getTesteeSet(node1, 0, 3);
+      
+      if (!newLock || newLockPos >= 0)
+      {
+         resetToNice(ts.partition);
+         EasyMock.makeThreadSafe(ts.partition, true);
+         ts.control.checkOrder(false);
+         
+         List<RemoteLockResponse> rspList = getOKResponses(2);
+         expect(ts.partition.getMethodCallTimeout()).andReturn(60000l).anyTimes();
+         expect(ts.partition.callMethodOnCluster(eq("test"), 
+                                              eq("remoteLock"), 
+                                              eqLockParams(node1, 200000), 
+                                              aryEq(AbstractClusterLockSupport.REMOTE_LOCK_TYPES), 
+                                              eq(RemoteLockResponse.class),
+                                              eq(true),
+                                              eq(NULL_FILTER),
+                                              anyInt(),
+                                              eq(false))).andReturn(rspList).anyTimes();
+         
+         replay(ts.partition);
+      }
 
+      Locker[] lockers = new Locker[4];
+      CountDownLatch readyLatch = new CountDownLatch(lockers.length);
+      CountDownLatch startLatch = new CountDownLatch(1);
+      CountDownLatch endLatch = new CountDownLatch(lockers.length);
+      ExecutorService executor = Executors.newFixedThreadPool(lockers.length);
+      
+      for (int i = 0; i < lockers.length; i++)
+      {
+         boolean newOne = newLock && (i == newLockPos || newLockPos < 0); 
+         lockers[i] = new Locker(ts, newOne, readyLatch, startLatch, endLatch);
+         executor.submit(lockers[i]);
+      }
+      
+      readyLatch.await(5, TimeUnit.SECONDS);
+      startLatch.countDown();
+      endLatch.await(5, TimeUnit.SECONDS);
+      
+      boolean sawNewLockResult = false;
+      
+      for (Locker locker : lockers)
+      {
+         if (locker.exception != null)
+         {
+            log.error(locker + " caught exception", locker.exception);
+            throw (Exception) locker.exception;
+         }
+         
+         LockResult newLockResult = locker.newLock ? LockResult.NEW_LOCK : LockResult.ACQUIRED_FROM_CLUSTER;
+         
+         if (sawNewLockResult)
+         {
+            if (locker.newLock)
+            {   
+               assertTrue(locker.result == LockResult.ALREADY_HELD || locker.result == LockResult.NEW_LOCK);
+            }
+            else
+            {
+               assertTrue(locker.result == LockResult.ALREADY_HELD || locker.result == LockResult.ACQUIRED_FROM_CLUSTER);
+            }
+         }
+         else if (locker.result != LockResult.ALREADY_HELD)
+         {
+            assertEquals(newLockResult, locker.result);
+            sawNewLockResult = true;
+         }
+      }
+      
+      assertTrue("Saw a new lock result", sawNewLockResult);
+   }
+   
+   public void testRejectionFromCluster() throws Exception
+   {
+      TesteeSet ts = getTesteeSet(node1, 0, 3);
+      
+      resetToNice(ts.partition);
+      
+      List<RemoteLockResponse> rspList = getRejectionResponses(node3, 2);
+      expect(ts.partition.getMethodCallTimeout()).andReturn(60000l).atLeastOnce();
+      expect(ts.partition.callMethodOnCluster(eq("test"), 
+                                           eq("remoteLock"), 
+                                           eqLockParams(node1, 200000), 
+                                           aryEq(AbstractClusterLockSupport.REMOTE_LOCK_TYPES), 
+                                           eq(RemoteLockResponse.class),
+                                           eq(true),
+                                           eq(NULL_FILTER),
+                                           anyInt(),
+                                           eq(false))).andReturn(rspList).atLeastOnce();
+//      expect(ts.partition.callMethodOnCluster(eq("test"), 
+//            eq("releaseRemoteLock"), 
+//            eqLockParams(node1, 200000), 
+//            aryEq(AbstractClusterLockSupport.REMOTE_LOCK_TYPES), 
+//            eq(RemoteLockResponse.class),
+//            eq(true),
+//            eq(NULL_FILTER),
+//            anyInt(),
+//            eq(false))).andReturn(rspList);
+      
+      replay(ts.partition);
+
+      try
+      {
+         ts.testee.lock("test", 1000, false);
+         fail("Did not throw TimeoutException");
+      }
+      catch (TimeoutException ok) {}
+   }
+
    protected TesteeSet getTesteeSet(ClusterNode node, int viewPos, int viewSize) throws Exception
    {
-      HAPartition haPartition = createNiceMock(HAPartition.class);     
+      IMocksControl control = EasyMock.createNiceControl();
+      HAPartition haPartition = control.createMock(HAPartition.class); 
+      expect(haPartition.isConsistentWith(haPartition)).andReturn(Boolean.TRUE);
       expect(haPartition.getClusterNode()).andReturn(node);
       expect(haPartition.getPartitionName()).andReturn("TestPartition");
       
@@ -161,7 +302,7 @@
       
       reset(haPartition);
       
-      return new TesteeSet(testee, haPartition, c.getValue());     
+      return new TesteeSet(testee, haPartition, c.getValue(), control);     
    }    
    
    private Vector<ClusterNode> getView(ClusterNode member, int viewPos, int numMembers)
@@ -186,18 +327,31 @@
       }
       return rspList;
    }
+
+   private static List<RemoteLockResponse> getRejectionResponses(ClusterNode owner, int numResponses)
+   {
+      List<RemoteLockResponse> rspList = new ArrayList<RemoteLockResponse>();
+      rspList.add(new RemoteLockResponse(owner, RemoteLockResponse.Flag.FAIL));
+      for (int i = 1; i < numResponses + 1; i++)
+      {
+         rspList.add(new RemoteLockResponse(null, RemoteLockResponse.Flag.OK));
+      }
+      return rspList;
+   }
    
    private class TesteeSet
    {
+      private final IMocksControl control;
       private final SharedLocalYieldingClusterLockManager testee;
       private final HAPartition partition;
       private final RpcTarget target;
       
-      private TesteeSet(SharedLocalYieldingClusterLockManager testee, HAPartition partition, RpcTarget target)
+      private TesteeSet(SharedLocalYieldingClusterLockManager testee, HAPartition partition, RpcTarget target, IMocksControl control)
       {
          this.testee = testee;
          this.partition = partition;
          this.target = target;
+         this.control = control;
       }
    }
    
@@ -205,15 +359,17 @@
    {
       private final TesteeSet ts;
       private final boolean newLock;
+      private final CountDownLatch readyLatch;
       private final CountDownLatch startLatch;
       private final CountDownLatch endLatch;
       private LockResult result;
       private Exception exception;
       
-      private Locker(TesteeSet ts, boolean newLock, CountDownLatch startLatch, CountDownLatch endLatch)
+      private Locker(TesteeSet ts, boolean newLock, CountDownLatch readyLatch, CountDownLatch startLatch, CountDownLatch endLatch)
       {
          this.ts = ts;
          this.newLock = newLock;
+         this.readyLatch = readyLatch;
          this.startLatch = startLatch;
          this.endLatch = endLatch;
       }
@@ -222,6 +378,7 @@
       {
          try
          {
+            readyLatch.countDown();
             startLatch.await(10, TimeUnit.SECONDS);
             result = ts.testee.lock("test", 1000, newLock);
          }




More information about the jboss-cvs-commits mailing list