[hornetq-commits] JBoss hornetq SVN: r8656 - trunk/src/main/org/hornetq/core/journal/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 9 16:07:16 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-09 16:07:15 -0500 (Wed, 09 Dec 2009)
New Revision: 8656

Modified:
   trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
Tweak to TimedBuffer.

Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-12-09 20:53:57 UTC (rev 8655)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-12-09 21:07:15 UTC (rev 8656)
@@ -148,7 +148,7 @@
 
       bufferObserver = null;
 
-      timer.close();
+      timer.stop();
 
       if (logRates)
       {
@@ -258,8 +258,26 @@
 
    
    /** 
-    * Note: Flush could be called by either the CheckTime, or by the Journal directly when moving to a new file
+    * This method will verify if it a flush is required.
+    * It is called directly by the CheckTimer.
+    * 
+    * @return true means you can pause spinning for a while
     * */
+   private synchronized boolean checkFlush()
+   {
+      // delayFlush and pendingSync are changed inside synchronized blocks
+      // They need to be done atomically
+      if (!delayFlush && pendingSync && bufferObserver != null)
+      {
+         flush();
+         return true;
+      }
+      else return !delayFlush;
+   }
+   
+   /** 
+    * Note: Flush could be called by either the checkFlush (and timer), or by the Journal directly before moving to a new file
+    * */
    public synchronized void flush()
    {
       if (buffer.writerIndex() > 0)
@@ -357,7 +375,7 @@
 
    private class CheckTimer implements Runnable
    {
-      private volatile boolean closed = false;
+      private volatile boolean stopped = false;
 
       private boolean spinning = false;
 
@@ -404,7 +422,7 @@
 
       public void run()
       {
-         while (!closed)
+         while (!stopped)
          {
             // We flush on the timer if there are pending syncs there and we've waited waited at least one
             // timeout since the time of the last flush
@@ -412,17 +430,11 @@
 
             if (System.nanoTime() > lastFlushTime.get() + timeout)
             {
-               // delayFlush and pendingSync are changed inside synchronized blocks
-               // They need to be done atomically
-               synchronized (TimedBuffer.this)
+               if (checkFlush())
                {
-                  if (!delayFlush && pendingSync && bufferObserver != null)
+                  if (!stopped)
                   {
-                     flush();
-                  }
-                  else if (!closed && !delayFlush)
-                  {
-                     // if delayFlush is set, it means we have to keep trying, we can't stop spinning on this case
+                     // can't pause spin if stopped, or we would hang the thread
                      pauseSpin();
                   }
                }
@@ -442,9 +454,9 @@
          }
       }
 
-      public void close()
+      public void stop()
       {
-         closed = true;
+         stopped = true;
          resumeSpin();
       }
    }



More information about the hornetq-commits mailing list