[jbosscache-commits] JBoss Cache SVN: r7694 - core/trunk/src/main/java/org/jboss/cache.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Fri Feb 13 14:07:31 EST 2009


Author: jason.greene at jboss.com
Date: 2009-02-13 14:07:31 -0500 (Fri, 13 Feb 2009)
New Revision: 7694

Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
Log:
Use multiple impls of flush tracker


Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-02-13 19:06:47 UTC (rev 7693)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-02-13 19:07:31 UTC (rev 7694)
@@ -131,7 +131,7 @@
    private volatile boolean isInLocalMode;
    private ComponentRegistry componentRegistry;
    private LockManager lockManager;
-   private FlushTracker flushTracker = new FlushTracker();
+   private FlushTracker flushTracker;
 
 
    @Inject
@@ -153,11 +153,11 @@
       this.lockManager = lockManager;
    }
 
-   public class FlushTracker
+   public abstract class FlushTracker
    {
       private final ReclosableLatch flushBlockGate = new ReclosableLatch();
       private final AtomicInteger flushCompletionCount = new AtomicInteger();
-      private final ReentrantReadWriteLock coordinationLock = new ReentrantReadWriteLock();
+
       private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
 
       public void block()
@@ -178,18 +178,21 @@
          return flushCompletionCount.get();
       }
 
-      public void lockProcessingLock()
+      public abstract void lockProcessingLock();
+      public abstract void unlockProcessingLock();
+      public abstract void lockSuspendProcessingLock();
+      public abstract void unlockSuspendProcessingLock();
+
+      public void waitForFlushCompletion(long timeout)
       {
-         if (! configuration.isNonBlockingStateTransfer())
-            return;
-
-         for (;;)
+         for (; ;)
          {
             try
             {
-               if (!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
-                  throw new TimeoutException("Could not obtain processing lock");
-
+               if (channel.flushSupported() && !flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
+               {
+                  throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + timeout + " millis) ");
+               }
                return;
             }
             catch (InterruptedException e)
@@ -199,26 +202,16 @@
          }
       }
 
-      public void unlockProcessingLock()
+      public void waitForFlushStart(long timeout)
       {
-         if (! configuration.isNonBlockingStateTransfer())
-            return;
-
-         coordinationLock.readLock().unlock();
-      }
-
-      public void lockSuspendProcessingLock()
-      {
-         if (! configuration.isNonBlockingStateTransfer())
-            return;
-
-         for (;;)
+         for (; ;)
          {
             try
             {
-               if (!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
-                  throw new TimeoutException("Could not obtain processing lock");
-
+               if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
+               {
+                  throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + timeout + " millis) ");
+               }
                return;
             }
             catch (InterruptedException e)
@@ -227,26 +220,41 @@
             }
          }
       }
+   }
 
-      public void unlockSuspendProcessingLock()
+   private final class StandardFlushTracker extends FlushTracker
+   {
+      // All locking methods are no-ops
+      public void lockProcessingLock()
       {
-         if (! configuration.isNonBlockingStateTransfer())
-            return;
+      }
 
-         if (coordinationLock.isWriteLockedByCurrentThread())
-            coordinationLock.writeLock().unlock();
+      public void lockSuspendProcessingLock()
+      {
       }
 
-      public void waitForFlushCompletion(long timeout)
+      public void unlockProcessingLock()
       {
-         for (; ;)
+      }
+
+      public void unlockSuspendProcessingLock()
+      {
+      }
+   }
+
+   private final class NonBlockingFlushTracker extends FlushTracker
+   {
+      private final ReentrantReadWriteLock coordinationLock = new ReentrantReadWriteLock();
+
+      public void lockProcessingLock()
+      {
+         for (;;)
          {
             try
             {
-               if (channel.flushSupported() && !flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
-               {
-                  throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + timeout + " millis) ");
-               }
+               if (!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
+                  throw new TimeoutException("Could not obtain processing lock");
+
                return;
             }
             catch (InterruptedException e)
@@ -256,16 +264,20 @@
          }
       }
 
-      public void waitForFlushStart(long timeout)
+      public void unlockProcessingLock()
       {
-         for (; ;)
+         coordinationLock.readLock().unlock();
+      }
+
+      public void lockSuspendProcessingLock()
+      {
+         for (;;)
          {
             try
             {
-               if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
-               {
-                  throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + timeout + " millis) ");
-               }
+               if (!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
+                  throw new TimeoutException("Could not obtain processing lock");
+
                return;
             }
             catch (InterruptedException e)
@@ -274,6 +286,13 @@
             }
          }
       }
+
+      public void unlockSuspendProcessingLock()
+      {
+         if (coordinationLock.isWriteLockedByCurrentThread())
+            coordinationLock.writeLock().unlock();
+      }
+
    }
 
    // ------------ START: Lifecycle methods ------------
@@ -302,6 +321,7 @@
 
             boolean fetchState = shouldFetchStateOnStartup();
             boolean nonBlocking = configuration.isNonBlockingStateTransfer();
+            this.flushTracker = nonBlocking ? new NonBlockingFlushTracker() : new StandardFlushTracker();
             initialiseChannelAndRpcDispatcher(fetchState && !nonBlocking);
 
             if (!fetchState || nonBlocking)




More information about the jbosscache-commits mailing list