[jbosscache-commits] JBoss Cache SVN: r6750 - in core/trunk/src/main/java/org/jboss/cache: util/concurrent and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Sep 18 09:14:33 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-09-18 09:14:33 -0400 (Thu, 18 Sep 2008)
New Revision: 6750

Added:
   core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
Log:
Better threadgate impl for flush block gate

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-09-18 11:02:45 UTC (rev 6749)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-09-18 13:14:33 UTC (rev 6750)
@@ -32,7 +32,7 @@
 import org.jboss.cache.statetransfer.DefaultStateTransferManager;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.TransactionTable;
-import org.jboss.cache.util.ThreadGate;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
 import org.jboss.cache.util.reflect.ReflectionUtil;
 import org.jgroups.Address;
 import org.jgroups.Channel;
@@ -58,6 +58,7 @@
 import java.util.List;
 import java.util.Set;
 import java.util.Vector;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Manager that handles all RPC calls between JBoss Cache instances
@@ -82,7 +83,7 @@
    /**
     * Thread gate used to block Dispatcher during JGroups FLUSH protocol
     */
-   private final ThreadGate flushBlockGate = new ThreadGate();
+   private final ReclosableLatch flushBlockGate = new ReclosableLatch();
    /**
     * JGroups RpcDispatcher in use.
     */
@@ -429,7 +430,7 @@
             log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage);
          if (channel.flushSupported())
          {
-            if (!flushBlockGate.await(configuration.getStateRetrievalTimeout()))
+            if (!flushBlockGate.await(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
                throw new TimeoutException("State retrieval timed out waiting for flush unblock.");
          }
          useOutOfBandMessage = false;

Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java	2008-09-18 13:14:33 UTC (rev 6750)
@@ -0,0 +1,66 @@
+package org.jboss.cache.util.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * A better impl of {@link org.jboss.cache.util.ThreadGate}, that uses an {@link java.util.concurrent.locks.AbstractQueuedSynchronizer}.
+ * <p/>
+ * This implementation allows you to create a latch with a default state (open or closed), and repeatedly open or close
+ * the latch.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class ReclosableLatch extends AbstractQueuedSynchronizer
+{
+   // the following states are used in the AQS.
+   private static final int OPEN_STATE = 0, CLOSED_STATE = 1;
+
+   public ReclosableLatch()
+   {
+      setState(CLOSED_STATE);
+   }
+
+   public ReclosableLatch(boolean defaultOpen)
+   {
+      setState(defaultOpen ? OPEN_STATE : CLOSED_STATE);
+   }
+
+   @Override
+   public final int tryAcquireShared(int ignored)
+   {
+      // return 1 if we allow the requestor to proceed, -1 if we want the requestor to block.
+      return getState() == OPEN_STATE ? 1 : -1;
+   }
+
+   @Override
+   public final boolean tryReleaseShared(int state)
+   {
+      // used as a mechanism to set the state of the Sync.
+      setState(state);
+      return true;
+   }
+
+   public final void open()
+   {
+      // do not use setState() directly since this won't notify parked threads.
+      releaseShared(OPEN_STATE);
+   }
+
+   public final void close()
+   {
+      // do not use setState() directly since this won't notify parked threads.
+      releaseShared(CLOSED_STATE);
+   }
+
+   public final void await() throws InterruptedException
+   {
+      acquireSharedInterruptibly(1); // the 1 is a dummy value that is not used.
+   }
+
+   public final boolean await(long time, TimeUnit unit) throws InterruptedException
+   {
+      return tryAcquireSharedNanos(1, unit.toNanos(time)); // the 1 is a dummy value that is not used.
+   }
+}




More information about the jbosscache-commits mailing list