Author: jason.greene(a)jboss.com
Date: 2009-02-13 14:07:31 -0500 (Fri, 13 Feb 2009)
New Revision: 7694
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
Log:
Use multiple impls of flush tracker
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-13 19:06:47 UTC
(rev 7693)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-13 19:07:31 UTC
(rev 7694)
@@ -131,7 +131,7 @@
private volatile boolean isInLocalMode;
private ComponentRegistry componentRegistry;
private LockManager lockManager;
- private FlushTracker flushTracker = new FlushTracker();
+ private FlushTracker flushTracker;
@Inject
@@ -153,11 +153,11 @@
this.lockManager = lockManager;
}
- public class FlushTracker
+ public abstract class FlushTracker
{
private final ReclosableLatch flushBlockGate = new ReclosableLatch();
private final AtomicInteger flushCompletionCount = new AtomicInteger();
- private final ReentrantReadWriteLock coordinationLock = new
ReentrantReadWriteLock();
+
private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
public void block()
@@ -178,18 +178,21 @@
return flushCompletionCount.get();
}
- public void lockProcessingLock()
+ public abstract void lockProcessingLock();
+ public abstract void unlockProcessingLock();
+ public abstract void lockSuspendProcessingLock();
+ public abstract void unlockSuspendProcessingLock();
+
+ public void waitForFlushCompletion(long timeout)
{
- if (! configuration.isNonBlockingStateTransfer())
- return;
-
- for (;;)
+ for (; ;)
{
try
{
- if
(!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
- throw new TimeoutException("Could not obtain processing
lock");
-
+ if (channel.flushSupported() && !flushBlockGate.await(timeout,
TimeUnit.MILLISECONDS))
+ {
+ throw new TimeoutException("State retrieval timed out waiting for
flush to unblock. (timeout = " + timeout + " millis) ");
+ }
return;
}
catch (InterruptedException e)
@@ -199,26 +202,16 @@
}
}
- public void unlockProcessingLock()
+ public void waitForFlushStart(long timeout)
{
- if (! configuration.isNonBlockingStateTransfer())
- return;
-
- coordinationLock.readLock().unlock();
- }
-
- public void lockSuspendProcessingLock()
- {
- if (! configuration.isNonBlockingStateTransfer())
- return;
-
- for (;;)
+ for (; ;)
{
try
{
- if
(!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
- throw new TimeoutException("Could not obtain processing
lock");
-
+ if (channel.flushSupported() && !flushWaitGate.await(timeout,
TimeUnit.MILLISECONDS))
+ {
+ throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + timeout + " millis) ");
+ }
return;
}
catch (InterruptedException e)
@@ -227,26 +220,41 @@
}
}
}
+ }
- public void unlockSuspendProcessingLock()
+ private final class StandardFlushTracker extends FlushTracker
+ {
+ // All locking methods are no-ops
+ public void lockProcessingLock()
{
- if (! configuration.isNonBlockingStateTransfer())
- return;
+ }
- if (coordinationLock.isWriteLockedByCurrentThread())
- coordinationLock.writeLock().unlock();
+ public void lockSuspendProcessingLock()
+ {
}
- public void waitForFlushCompletion(long timeout)
+ public void unlockProcessingLock()
{
- for (; ;)
+ }
+
+ public void unlockSuspendProcessingLock()
+ {
+ }
+ }
+
+ private final class NonBlockingFlushTracker extends FlushTracker
+ {
+ private final ReentrantReadWriteLock coordinationLock = new
ReentrantReadWriteLock();
+
+ public void lockProcessingLock()
+ {
+ for (;;)
{
try
{
- if (channel.flushSupported() && !flushBlockGate.await(timeout,
TimeUnit.MILLISECONDS))
- {
- throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + timeout + " millis) ");
- }
+ if
(!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Could not obtain processing
lock");
+
return;
}
catch (InterruptedException e)
@@ -256,16 +264,20 @@
}
}
- public void waitForFlushStart(long timeout)
+ public void unlockProcessingLock()
{
- for (; ;)
+ coordinationLock.readLock().unlock();
+ }
+
+ public void lockSuspendProcessingLock()
+ {
+ for (;;)
{
try
{
- if (channel.flushSupported() && !flushWaitGate.await(timeout,
TimeUnit.MILLISECONDS))
- {
- throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + timeout + " millis) ");
- }
+ if
(!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Could not obtain processing
lock");
+
return;
}
catch (InterruptedException e)
@@ -274,6 +286,13 @@
}
}
}
+
+ public void unlockSuspendProcessingLock()
+ {
+ if (coordinationLock.isWriteLockedByCurrentThread())
+ coordinationLock.writeLock().unlock();
+ }
+
}
// ------------ START: Lifecycle methods ------------
@@ -302,6 +321,7 @@
boolean fetchState = shouldFetchStateOnStartup();
boolean nonBlocking = configuration.isNonBlockingStateTransfer();
+ this.flushTracker = nonBlocking ? new NonBlockingFlushTracker() : new
StandardFlushTracker();
initialiseChannelAndRpcDispatcher(fetchState && !nonBlocking);
if (!fetchState || nonBlocking)