[hornetq-commits] JBoss hornetq SVN: r8642 - in trunk: tests/src/org/hornetq/tests/unit/core/journal/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 9 11:59:54 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-09 11:59:54 -0500 (Wed, 09 Dec 2009)
New Revision: 8642

Modified:
   trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-241 - Changing TimedBuffer to stop spinning when there's no pending syncs

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java	2009-12-09 16:09:45 UTC (rev 8641)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java	2009-12-09 16:59:54 UTC (rev 8642)
@@ -143,7 +143,7 @@
       if (timedBuffer != null)
       {
          // When moving to a new file, we need to make sure any pending buffer will be transfered to the buffer
-         timedBuffer.flush(true);
+         timedBuffer.flush();
          timedBuffer.setObserver(null);
       }
    }

Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-12-09 16:09:45 UTC (rev 8641)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-12-09 16:59:54 UTC (rev 8642)
@@ -20,6 +20,7 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.buffers.HornetQBuffer;
@@ -46,13 +47,8 @@
 
    private TimedBufferObserver bufferObserver;
 
-   // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
-   // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
-   // prevent that
-   private final Semaphore spinLimiter = new Semaphore(1);
+   private CheckTimer timer;
 
-   private CheckTimer timerRunnable = new CheckTimer();
-
    private final int bufferSize;
 
    private final HornetQBuffer buffer;
@@ -125,9 +121,9 @@
          return;
       }
 
-      timerRunnable = new CheckTimer();
+      timer = new CheckTimer();
 
-      timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout");
+      timerThread = new Thread(timer, "hornetq-buffer-timeout");
 
       timerThread.start();
 
@@ -152,10 +148,8 @@
 
       bufferObserver = null;
 
-      spinLimiter.release();
+      timer.close();
 
-      timerRunnable.close();
-
       if (logRates)
       {
          logRatesTimerTask.cancel();
@@ -238,15 +232,9 @@
 
    public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
    {
+
       delayFlush = false;
 
-      if (buffer.writerIndex() == 0)
-      {
-         // More bytes have been added so the timer flush thread can resume
-
-         spinLimiter.release();
-      }
-
       bytes.encode(buffer);
 
       callbacks.add(callback);
@@ -265,22 +253,18 @@
          // }
       }
 
+      timer.resumeSpin();
    }
 
-   public void flush()
-   {
-      flush(false);
-   }
-
    /** 
     * force means the Journal is moving to a new file. Any pending write need to be done immediately
     * or data could be lost
     * */
-   public void flush(final boolean force)
+   public synchronized void flush()
    {
       synchronized (this)
       {
-         if ((force || !delayFlush) && buffer.writerIndex() > 0)
+         if (buffer.writerIndex() > 0)
          {
             int pos = buffer.writerIndex();
 
@@ -301,17 +285,6 @@
                bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
             }
 
-            try
-            {
-               // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
-               // when the buffer is inactive
-               spinLimiter.acquire();
-            }
-            catch (InterruptedException e)
-            {
-               // Ignore
-            }
-
             lastFlushTime.set(System.nanoTime());
 
             pendingSync = false;
@@ -323,6 +296,8 @@
             bufferLimit = 0;
 
             flushesDone.incrementAndGet();
+
+            timer.pauseSpin();
          }
       }
    }
@@ -387,6 +362,58 @@
    {
       private volatile boolean closed = false;
 
+      private boolean spinning = false;
+
+      // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
+      // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
+      // prevent that
+      private final Semaphore spinLimiter = new Semaphore(1);
+
+      public CheckTimer()
+      {
+         if (!spinLimiter.tryAcquire())
+         {
+            // JDK would be screwed up if this was happening
+            throw new IllegalStateException("InternalError: Semaphore not working properly!");
+         }
+         spinning = false;
+      }
+
+      // Needs to be called within synchronized blocks on TimedBuffer
+      public void resumeSpin()
+      {
+         synchronized (TimedBuffer.this)
+         {
+            if (!spinning)
+            {
+               spinning = true;
+               spinLimiter.release();
+            }
+         }
+      }
+
+      // Needs to be called within synchronized blocks on TimedBuffer
+      public void pauseSpin()
+      {
+         synchronized (TimedBuffer.this)
+         {
+            if (spinning)
+            {
+               spinning = false;
+               try
+               {
+                  if (!spinLimiter.tryAcquire(60, TimeUnit.SECONDS))
+                  {
+                     throw new IllegalStateException("Internal error on TimedBuffer. Can't stop spinning");
+                  }
+               }
+               catch (InterruptedException ignored)
+               {
+               }
+            }
+         }
+      }
+
       public void run()
       {
          while (!closed)
@@ -395,9 +422,22 @@
             // timeout since the time of the last flush
             // Effectively flushing "resets" the timer
 
-            if (pendingSync && bufferObserver != null && System.nanoTime() > lastFlushTime.get() + timeout)
+            if (System.nanoTime() > lastFlushTime.get() + timeout)
             {
-               flush();
+               // delayFlush and pendingSync are changed inside synchronized blocks
+               // They need to be done atomically
+               synchronized (TimedBuffer.this)
+               {
+                  if (!delayFlush && pendingSync && bufferObserver != null)
+                  {
+                     flush();
+                  }
+                  else if (!closed && !delayFlush)
+                  {
+                     // if delayFlush is set, it means we have to keep trying, we can't stop spinning on this case
+                     pauseSpin();
+                  }
+               }
             }
 
             try
@@ -417,6 +457,7 @@
       public void close()
       {
          closed = true;
+         resumeSpin();
       }
    }
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java	2009-12-09 16:09:45 UTC (rev 8641)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java	2009-12-09 16:59:54 UTC (rev 8642)
@@ -41,6 +41,8 @@
 
    // Attributes ----------------------------------------------------
 
+   private static final int ONE_SECOND = 1000000000; // in nanoseconds
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -85,13 +87,91 @@
          }
       }
 
-      TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false); // Any big timeout
+      TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND, false);
 
-      timedBuffer.setObserver(new TestObserver());
+      timedBuffer.start();
 
-      int x = 0;
-      for (int i = 0; i < 10; i++)
+      try
       {
+
+         timedBuffer.setObserver(new TestObserver());
+
+         int x = 0;
+         for (int i = 0; i < 10; i++)
+         {
+            byte[] bytes = new byte[10];
+            for (int j = 0; j < 10; j++)
+            {
+               bytes[j] = UnitTestCase.getSamplebyte(x++);
+            }
+
+            HornetQBuffer buff = HornetQBuffers.wrappedBuffer(bytes);
+
+            timedBuffer.checkSize(10);
+            timedBuffer.addBytes(buff, false, dummyCallback);
+         }
+
+         timedBuffer.checkSize(1);
+
+         Assert.assertEquals(1, flushTimes.get());
+
+         ByteBuffer flushedBuffer = buffers.get(0);
+
+         Assert.assertEquals(100, flushedBuffer.limit());
+
+         Assert.assertEquals(100, flushedBuffer.capacity());
+
+         flushedBuffer.rewind();
+
+         for (int i = 0; i < 100; i++)
+         {
+            Assert.assertEquals(UnitTestCase.getSamplebyte(i), flushedBuffer.get());
+         }
+      }
+      finally
+      {
+         timedBuffer.stop();
+      }
+
+   }
+
+   public void testTimingAndFlush() throws Exception
+   {
+      final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+      final AtomicInteger flushTimes = new AtomicInteger(0);
+      class TestObserver implements TimedBufferObserver
+      {
+         public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOAsyncTask> callbacks)
+         {
+            buffers.add(buffer);
+            flushTimes.incrementAndGet();
+         }
+
+         /* (non-Javadoc)
+          * @see org.hornetq.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+          */
+         public ByteBuffer newBuffer(final int minSize, final int maxSize)
+         {
+            return ByteBuffer.allocate(maxSize);
+         }
+
+         public int getRemainingBytes()
+         {
+            return 1024 * 1024;
+         }
+      }
+
+      TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND / 10, false);
+
+      timedBuffer.start();
+
+      try
+      {
+
+         timedBuffer.setObserver(new TestObserver());
+
+         int x = 0;
+
          byte[] bytes = new byte[10];
          for (int j = 0; j < 10; j++)
          {
@@ -102,23 +182,42 @@
 
          timedBuffer.checkSize(10);
          timedBuffer.addBytes(buff, false, dummyCallback);
-      }
 
-      timedBuffer.checkSize(1);
+         Thread.sleep(200);
 
-      Assert.assertEquals(1, flushTimes.get());
+         Assert.assertEquals(0, flushTimes.get());
 
-      ByteBuffer flushedBuffer = buffers.get(0);
+         bytes = new byte[10];
+         for (int j = 0; j < 10; j++)
+         {
+            bytes[j] = UnitTestCase.getSamplebyte(x++);
+         }
 
-      Assert.assertEquals(100, flushedBuffer.limit());
+         buff = HornetQBuffers.wrappedBuffer(bytes);
 
-      Assert.assertEquals(100, flushedBuffer.capacity());
+         timedBuffer.checkSize(10);
+         timedBuffer.addBytes(buff, true, dummyCallback);
 
-      flushedBuffer.rewind();
+         Thread.sleep(500);
 
-      for (int i = 0; i < 100; i++)
+         Assert.assertEquals(1, flushTimes.get());
+
+         ByteBuffer flushedBuffer = buffers.get(0);
+
+         Assert.assertEquals(20, flushedBuffer.limit());
+
+         Assert.assertEquals(20, flushedBuffer.capacity());
+
+         flushedBuffer.rewind();
+
+         for (int i = 0; i < 20; i++)
+         {
+            Assert.assertEquals(UnitTestCase.getSamplebyte(i), flushedBuffer.get());
+         }
+      }
+      finally
       {
-         Assert.assertEquals(UnitTestCase.getSamplebyte(i), flushedBuffer.get());
+         timedBuffer.stop();
       }
 
    }



More information about the hornetq-commits mailing list