[jboss-cvs] JBossAS SVN: r104321 - in projects/cluster/ha-server-api/trunk/src: main/java/org/jboss/ha/framework/server/lock and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Apr 29 12:36:37 EDT 2010


Author: bstansberry at jboss.com
Date: 2010-04-29 12:36:37 -0400 (Thu, 29 Apr 2010)
New Revision: 104321

Added:
   projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/
   projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingClusterLockManager.java
   projects/cluster/ha-server-api/trunk/src/test/java/org/jboss/test/ha/framework/server/lock/
Modified:
   projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
Log:
[JBCLUSTER-223] Bring AS cluster module distributed locking into ha-server-api

Copied: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock (from rev 104319, trunk/cluster/src/main/java/org/jboss/ha/framework/server/lock)

Modified: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java
===================================================================
--- trunk/cluster/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java	2010-04-29 15:11:25 UTC (rev 104319)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/AbstractClusterLockSupport.java	2010-04-29 16:36:37 UTC (rev 104321)
@@ -154,29 +154,23 @@
             {
                // Get the lock on all other nodes in the cluster
                
-               @SuppressWarnings("unchecked")
-               ArrayList rsps = partition.callMethodOnCluster(getServiceHAName(), 
+               List<RemoteLockResponse> rsps = partition.callMethodOnCluster(getServiceHAName(), 
                      "remoteLock", new Object[]{lockId, me, new Long(left)}, 
-                     REMOTE_LOCK_TYPES, true);
+                     REMOTE_LOCK_TYPES, RemoteLockResponse.class, true, null, partition.getMethodCallTimeout(), false);
                
                boolean remoteLocked = true;
                if (rsps != null)
                {
-                  for (Object rsp : rsps)
+                  for (RemoteLockResponse rsp : rsps)
                   {
-                     if ((rsp instanceof RemoteLockResponse) == false)
+                     if (rsp.flag != RemoteLockResponse.Flag.OK)
                      {
                         remoteLocked = false;
-                     }
-                     else if (((RemoteLockResponse) rsp).flag != RemoteLockResponse.Flag.OK)
-                     {
-                        RemoteLockResponse curRsp = (RemoteLockResponse) rsp;
-                        remoteLocked = false;
                         if (superiorCompetitor == null)
                         {
-                           superiorCompetitor = getSuperiorCompetitor(curRsp.holder);
-                           log.debug("Received " + curRsp.flag + " response from " + 
-                                 curRsp.responder + " -- reports lock is held by " + curRsp.holder);
+                           superiorCompetitor = getSuperiorCompetitor(rsp.holder);
+                           log.debug("Received " + rsp.flag + " response from " + 
+                                 rsp.responder + " -- reports lock is held by " + rsp.holder);
                         }
                      }
                   }

Added: projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingClusterLockManager.java
===================================================================
--- projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingClusterLockManager.java	                        (rev 0)
+++ projects/cluster/ha-server-api/trunk/src/main/java/org/jboss/ha/framework/server/lock/YieldingClusterLockManager.java	2010-04-29 16:36:37 UTC (rev 104321)
@@ -0,0 +1,330 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.ha.framework.server.lock;
+
+import java.io.Serializable;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+
+import org.jboss.ha.framework.interfaces.ClusterNode;
+import org.jboss.ha.framework.interfaces.HAPartition;
+
+/**
+ * @author Brian Stansberry
+ * 
+ * @version $Revision:$
+ */
+public class YieldingClusterLockManager
+{
+   /** Result of a {@link YieldingClusterLockManager#lock(Serializable, long, boolean) lock call}*/
+   public static enum LockResult 
+   { 
+      /** Indicates the lock was acquired after requesting it from the cluster */
+      ACQUIRED_FROM_CLUSTER, 
+      /** Indicates this node already held the lock */
+      ALREADY_HELD, 
+      /** 
+       * Indicates the 'newLock' param passed to 
+       * {@link YieldingClusterLockManager#lock(Serializable, long, boolean)}
+       * was <code>true</code> and the local node in fact was unaware of the
+       * lock. If in fact the local node was already aware of the lock (which
+       * would generally indicate a flaw in the application using this class)
+       * NEW_LOCK will not be returned; rather one of the other enum values
+       * will be returned.
+       */
+      NEW_LOCK 
+   }
+   
+   private class LocalLock
+   {
+      private volatile ClusterNode holder;
+      private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
+      private final AtomicInteger localLockCount = new AtomicInteger();
+
+      private void lock(ClusterNode caller, long timeout) throws TimeoutException
+      {
+         if (YieldingClusterLockManager.this.localNode.equals(caller))
+         {
+            this.localLockCount.incrementAndGet();
+            this.holder = YieldingClusterLockManager.this.localNode;
+         }
+         else
+         {
+            long deadline = System.currentTimeMillis() + timeout;
+            boolean wasInterrupted = false;
+            Thread current = Thread.currentThread();
+            waiters.add(current);
+            
+            try
+            {
+               // Block while not first in queue or cannot acquire lock
+               while (waiters.peek() != current || 
+                      localLockCount.get() > 0) 
+               { 
+                  LockSupport.parkUntil(deadline);
+                  if (Thread.interrupted()) // ignore interrupts while waiting
+                     wasInterrupted = true;
+                  if (System.currentTimeMillis() >= deadline)
+                  {
+                     if (waiters.peek() != current || 
+                           localLockCount.get() > 0)
+                     {
+                        throw new TimeoutException(this.holder);
+                     }
+                     break;
+                  }
+               }
+               
+               if (localLockCount.get() == 0)
+               {
+                  holder = caller;
+               }
+               else
+               {
+                  throw new TimeoutException(this.holder);
+               }
+            }
+            finally
+            {
+               waiters.remove();
+               if (wasInterrupted)          // reassert interrupt status on exit
+                  current.interrupt();
+            }
+         }
+      }
+      
+      private void unlock(ClusterNode caller)
+      {
+         if (caller.equals(holder))              
+         {
+            if (YieldingClusterLockManager.this.localNode.equals(caller))
+            {
+               if (this.localLockCount.decrementAndGet() == 0)
+               {
+                  holder = null;
+               }
+            }
+            else
+            {
+               holder = null;
+            }
+            
+            if (holder == null)
+            {
+               LockSupport.unpark(waiters.peek());
+            }
+         }
+       } 
+      
+   }
+   
+   /** Handles callbacks from the cluster lock support object */
+   private class ClusterHandler implements LocalLockHandler
+   {      
+      // ----------------------------------------------------- LocalLockHandler
+      
+      public ClusterNode getLocalNode(ClusterNode localNode)
+      {
+         return YieldingClusterLockManager.this.localNode;
+      }
+
+      public void setLocalNode(ClusterNode localNode)
+      {
+         YieldingClusterLockManager.this.localNode = localNode;
+      }
+
+      public void lockFromCluster(Serializable lockName, ClusterNode caller, long timeout) throws TimeoutException,
+            InterruptedException
+      {
+         YieldingClusterLockManager.this.doLock(lockName, caller, timeout);
+      }
+
+      public ClusterNode getLockHolder(Serializable lockName)
+      {
+         LocalLock lock = YieldingClusterLockManager.this.getLocalLock(lockName, false);
+         return lock == null ? null : lock.holder;
+      }
+
+      public void unlockFromCluster(Serializable lockName, ClusterNode caller)
+      {
+         YieldingClusterLockManager.this.doUnlock(lockName, caller);
+      }
+      
+   }
+   
+   private ClusterNode localNode;
+   private ConcurrentMap<Serializable, LocalLock> localLocks = new ConcurrentHashMap<Serializable, LocalLock>();
+   private final YieldingGloballyExclusiveClusterLockSupport clusterSupport;
+   
+   public YieldingClusterLockManager(String serviceHAName, HAPartition partition)
+   {
+      ClusterHandler handler = new ClusterHandler();
+      clusterSupport = new YieldingGloballyExclusiveClusterLockSupport(serviceHAName, partition, handler);
+   }
+   
+   // ----------------------------------------------------------------- Public
+   
+   /**
+    * Acquire the given lock.
+    * 
+    * @param lockName the identifier of the lock that should be acquired
+    * @param timeout max time in ms to wait before throwing a TimeoutException
+    *                if the lock cannot be acquired
+    *                
+    * @return enum indicating how the lock was acquired
+    *                
+    * @throws TimeoutException if the lock cannot be acquired before the timeout
+    * @throws InterruptedException if the thread is interrupted while trying to
+    *                              acquire the lock
+    */
+   public LockResult lock (Serializable lockName, long timeout)
+      throws TimeoutException, InterruptedException
+   {
+      return lock(lockName, timeout, false);
+   }
+   
+   /**
+    * Acquire the given lock.
+    * 
+    * @param lockName the identifier of the lock that should be acquired
+    * @param timeout max time in ms to wait before throwing a TimeoutException
+    *                if the lock cannot be acquired
+    * @param newLock <code>true</code> if this object should assume this is the
+    *                first use cluster-wide of the lock identified by
+    *                <code>lockName</code>, and just acquire the lock locally 
+    *                without any cluster-wide call. See discussion of 
+    *                {@link LockResult#NEW_LOCK}.
+    *                
+    * @return enum indicating how the lock was acquired
+    *                
+    * @throws TimeoutException if the lock cannot be acquired before the timeout
+    * @throws InterruptedException if the thread is interrupted while trying to
+    *                              acquire the lock
+    */
+   public LockResult lock(Serializable lockName, long timeout, boolean newLock)
+      throws TimeoutException, InterruptedException
+   {   
+      if (this.localNode == null)
+      {
+         throw new IllegalStateException("Null localNode");
+      }
+      
+      LockResult result = null;
+      LocalLock localLock = getLocalLock(lockName, false);
+      if (localLock == null)
+      {
+         if (newLock)
+         {
+            doLock(lockName, this.localNode, timeout);
+            result = LockResult.NEW_LOCK;
+         }
+         else
+         {
+            this.clusterSupport.lock(lockName, timeout);
+            result = LockResult.ACQUIRED_FROM_CLUSTER;
+         }
+      }
+      else
+      {
+         localLock.localLockCount.incrementAndGet(); // Now no other node can become localLock.holder         
+         try
+         {
+            if (this.localNode.equals(localLock.holder))
+            {
+               result = LockResult.ALREADY_HELD;
+            }
+            else
+            {
+               this.clusterSupport.lock(lockName, timeout);
+               result = LockResult.ACQUIRED_FROM_CLUSTER;
+            }
+         }
+         finally
+         {
+            // If we called clusterSupport.lock() above, its callback into
+            // ClusterHandler.lockFromCluster() will increment localLock.localLockCount.
+            // So, decrement so we don't double count
+            // (If we threw an exception above we should also decrement)
+            if (result != LockResult.ALREADY_HELD)
+            {
+               localLock.localLockCount.decrementAndGet();
+            }
+         }         
+      }
+      
+      return result;
+   }
+   
+   public void unlock(Serializable lockName)
+   {
+      this.clusterSupport.unlock(lockName);
+   }
+   
+   public void start() throws Exception
+   {
+      this.clusterSupport.start();
+   }
+   
+   public void stop() throws Exception
+   {
+      this.clusterSupport.stop();
+   }
+   
+   // ----------------------------------------------------------------- Private
+   
+   private LocalLock getLocalLock(Serializable categoryName, boolean create)
+   {
+      LocalLock category = localLocks.get(categoryName);
+      if (category == null && create)
+      {
+         category = new LocalLock();
+         LocalLock existing = localLocks.putIfAbsent(categoryName, category);
+         if (existing != null)
+         {
+            category = existing;
+         }         
+      }
+      return category;
+   }
+   
+   private void doLock(Serializable lockName, ClusterNode caller, long timeout) throws TimeoutException,
+         InterruptedException
+   {
+      LocalLock lock = getLocalLock(lockName, true);
+      lock.lock(caller, timeout);
+   }
+   
+   private void doUnlock(Serializable lockName, ClusterNode caller)
+   {
+      LocalLock lock = getLocalLock(lockName, false);
+      if (lock != null)
+      {
+         lock.unlock(caller);
+      }
+   }
+
+}




More information about the jboss-cvs-commits mailing list