[jbosscache-commits] JBoss Cache SVN: r6750 - in core/trunk/src/main/java/org/jboss/cache: util/concurrent and 1 other directory.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Thu Sep 18 09:14:33 EDT 2008
Author: manik.surtani at jboss.com
Date: 2008-09-18 09:14:33 -0400 (Thu, 18 Sep 2008)
New Revision: 6750
Added:
core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
Log:
Better threadgate impl for flush block gate
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-09-18 11:02:45 UTC (rev 6749)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-09-18 13:14:33 UTC (rev 6750)
@@ -32,7 +32,7 @@
import org.jboss.cache.statetransfer.DefaultStateTransferManager;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionTable;
-import org.jboss.cache.util.ThreadGate;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
import org.jboss.cache.util.reflect.ReflectionUtil;
import org.jgroups.Address;
import org.jgroups.Channel;
@@ -58,6 +58,7 @@
import java.util.List;
import java.util.Set;
import java.util.Vector;
+import java.util.concurrent.TimeUnit;
/**
* Manager that handles all RPC calls between JBoss Cache instances
@@ -82,7 +83,7 @@
/**
* Thread gate used to block Dispatcher during JGroups FLUSH protocol
*/
- private final ThreadGate flushBlockGate = new ThreadGate();
+ private final ReclosableLatch flushBlockGate = new ReclosableLatch();
/**
* JGroups RpcDispatcher in use.
*/
@@ -429,7 +430,7 @@
log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage);
if (channel.flushSupported())
{
- if (!flushBlockGate.await(configuration.getStateRetrievalTimeout()))
+ if (!flushBlockGate.await(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
throw new TimeoutException("State retrieval timed out waiting for flush unblock.");
}
useOutOfBandMessage = false;
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java 2008-09-18 13:14:33 UTC (rev 6750)
@@ -0,0 +1,66 @@
+package org.jboss.cache.util.concurrent;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * A better impl of {@link org.jboss.cache.util.ThreadGate}, that uses an {@link java.util.concurrent.locks.AbstractQueuedSynchronizer}.
+ * <p/>
+ * This implementation allows you to create a latch with a default state (open or closed), and repeatedly open or close
+ * the latch.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class ReclosableLatch extends AbstractQueuedSynchronizer
+{
+ // the following states are used in the AQS.
+ private static final int OPEN_STATE = 0, CLOSED_STATE = 1;
+
+ public ReclosableLatch()
+ {
+ setState(CLOSED_STATE);
+ }
+
+ public ReclosableLatch(boolean defaultOpen)
+ {
+ setState(defaultOpen ? OPEN_STATE : CLOSED_STATE);
+ }
+
+ @Override
+ public final int tryAcquireShared(int ignored)
+ {
+ // return 1 if we allow the requestor to proceed, -1 if we want the requestor to block.
+ return getState() == OPEN_STATE ? 1 : -1;
+ }
+
+ @Override
+ public final boolean tryReleaseShared(int state)
+ {
+ // used as a mechanism to set the state of the Sync.
+ setState(state);
+ return true;
+ }
+
+ public final void open()
+ {
+ // do not use setState() directly since this won't notify parked threads.
+ releaseShared(OPEN_STATE);
+ }
+
+ public final void close()
+ {
+ // do not use setState() directly since this won't notify parked threads.
+ releaseShared(CLOSED_STATE);
+ }
+
+ public final void await() throws InterruptedException
+ {
+ acquireSharedInterruptibly(1); // the 1 is a dummy value that is not used.
+ }
+
+ public final boolean await(long time, TimeUnit unit) throws InterruptedException
+ {
+ return tryAcquireSharedNanos(1, unit.toNanos(time)); // the 1 is a dummy value that is not used.
+ }
+}
More information about the jbosscache-commits
mailing list