[hornetq-commits] JBoss hornetq SVN: r8658 - in trunk: src/main/org/hornetq/core/journal/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 10 04:04:09 EST 2009


Author: timfox
Date: 2009-12-10 04:04:09 -0500 (Thu, 10 Dec 2009)
New Revision: 8658

Modified:
   trunk/examples/core/perf/server0/hornetq-configuration.xml
   trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
Log:
simplified timed buffer fix + fix extra hang in receiveimmediate code

Modified: trunk/examples/core/perf/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/hornetq-configuration.xml	2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/examples/core/perf/server0/hornetq-configuration.xml	2009-12-10 09:04:09 UTC (rev 8658)
@@ -17,7 +17,7 @@
    
    <persistence-enabled>true</persistence-enabled>
 
-   <journal-sync-non-transactional>false</journal-sync-non-transactional>
+   <journal-sync-non-transactional>true</journal-sync-non-transactional>
    <journal-sync-transactional>true</journal-sync-transactional>
    <journal-type>ASYNCIO</journal-type>
    <journal-min-files>20</journal-min-files>

Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-12-10 09:04:09 UTC (rev 8658)
@@ -20,7 +20,7 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.buffers.HornetQBuffer;
@@ -47,8 +47,13 @@
 
    private TimedBufferObserver bufferObserver;
 
-   private CheckTimer timer;
+   // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
+   // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
+   // prevent that
+   private final Semaphore spinLimiter = new Semaphore(1);
 
+   private CheckTimer timerRunnable = new CheckTimer();
+
    private final int bufferSize;
 
    private final HornetQBuffer buffer;
@@ -85,6 +90,8 @@
 
    private final AtomicLong lastFlushTime = new AtomicLong(0);
 
+   private boolean spinning = false;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -121,9 +128,9 @@
          return;
       }
 
-      timer = new CheckTimer();
+      timerRunnable = new CheckTimer();
 
-      timerThread = new Thread(timer, "hornetq-buffer-timeout");
+      timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout");
 
       timerThread.start();
 
@@ -134,6 +141,15 @@
          logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
       }
 
+      // Need to start with the spin limiter acquired
+      try
+      {
+         spinLimiter.acquire();
+      }
+      catch (InterruptedException ignore)
+      {
+      }
+
       started = true;
    }
 
@@ -148,8 +164,10 @@
 
       bufferObserver = null;
 
-      timer.stop();
+      spinLimiter.release();
 
+      timerRunnable.close();
+
       if (logRates)
       {
          logRatesTimerTask.cancel();
@@ -232,7 +250,6 @@
 
    public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
    {
-
       delayFlush = false;
 
       bytes.encode(buffer);
@@ -252,69 +269,78 @@
          // flush();
          // }
 
-         timer.resumeSpin();
+         if (!spinning)
+         {
+            spinLimiter.release();
+
+            spinning = true;
+         }
       }
 
    }
 
-   
-   /** 
-    * This method will verify if it a flush is required.
-    * It is called directly by the CheckTimer.
-    * 
-    * @return true means you can pause spinning for a while
-    * */
-   private synchronized boolean checkFlush()
+   public void flush()
    {
-      // delayFlush and pendingSync are changed inside synchronized blocks
-      // They need to be done atomically
-      if (!delayFlush && pendingSync && bufferObserver != null)
-      {
-         flush();
-         return true;
-      }
-      else return !delayFlush;
+      flush(false);
    }
-   
+
    /** 
-    * Note: Flush could be called by either the checkFlush (and timer), or by the Journal directly before moving to a new file
+    * force means the Journal is moving to a new file. Any pending write need to be done immediately
+    * or data could be lost
     * */
-   public synchronized void flush()
+   public void flush(final boolean force)
    {
-      if (buffer.writerIndex() > 0)
+      synchronized (this)
       {
-         int pos = buffer.writerIndex();
-
-         if (logRates)
+         if ((force || !delayFlush) && buffer.writerIndex() > 0)
          {
-            bytesFlushed.addAndGet(pos);
-         }
+            int pos = buffer.writerIndex();
 
-         ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+            if (logRates)
+            {
+               bytesFlushed.addAndGet(pos);
+            }
 
-         // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
-         // Using bufferToFlush.put(buffer) would make several append calls for each byte
+            ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
 
-         bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
+            // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+            // Using bufferToFlush.put(buffer) would make several append calls for each byte
 
-         if (bufferToFlush != null)
-         {
-            bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
-         }
+            bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
 
-         lastFlushTime.set(System.nanoTime());
+            if (bufferToFlush != null)
+            {
+               bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+            }
 
-         pendingSync = false;
+            if (spinning)
+            {
+               try
+               {
+                  // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
+                  // when the buffer is inactive
+                  spinLimiter.acquire();
+               }
+               catch (InterruptedException e)
+               {
+                  // Ignore
+               }
 
-         callbacks = new LinkedList<IOAsyncTask>();
+               spinning = false;
+            }
 
-         buffer.clear();
+            lastFlushTime.set(System.nanoTime());
 
-         bufferLimit = 0;
+            pendingSync = false;
 
-         flushesDone.incrementAndGet();
+            callbacks = new LinkedList<IOAsyncTask>();
 
-         timer.pauseSpin();
+            buffer.clear();
+
+            bufferLimit = 0;
+
+            flushesDone.incrementAndGet();
+         }
       }
    }
 
@@ -376,69 +402,19 @@
 
    private class CheckTimer implements Runnable
    {
-      private volatile boolean stopped = false;
+      private volatile boolean closed = false;
 
-      private boolean spinning = false;
-
-      // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
-      // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
-      // prevent that
-      private final Semaphore spinLimiter = new Semaphore(1);
-
-      public CheckTimer()
-      {
-         if (!spinLimiter.tryAcquire())
-         {
-            // JDK would be screwed up if this was happening
-            throw new IllegalStateException("InternalError: Semaphore not working properly!");
-         }
-         spinning = false;
-      }
-
-      // Needs to be called within synchronized blocks on TimedBuffer
-      public void resumeSpin()
-      {
-         spinning = true;
-         spinLimiter.release();
-      }
-
-      // Needs to be called within synchronized blocks on TimedBuffer
-      public void pauseSpin()
-      {
-         if (spinning)
-         {
-            spinning = false;
-            try
-            {
-               if (!spinLimiter.tryAcquire(60, TimeUnit.SECONDS))
-               {
-                  throw new IllegalStateException("Internal error on TimedBuffer. Can't stop spinning");
-               }
-            }
-            catch (InterruptedException ignored)
-            {
-            }
-         }
-      }
-
       public void run()
       {
-         while (!stopped)
+         while (!closed)
          {
             // We flush on the timer if there are pending syncs there and we've waited waited at least one
             // timeout since the time of the last flush
             // Effectively flushing "resets" the timer
 
-            if (System.nanoTime() > lastFlushTime.get() + timeout)
+            if (pendingSync && bufferObserver != null && System.nanoTime() > lastFlushTime.get() + timeout)
             {
-               if (checkFlush())
-               {
-                  if (!stopped)
-                  {
-                     // can't pause spin if stopped, or we would hang the thread
-                     pauseSpin();
-                  }
-               }
+               flush();
             }
 
             try
@@ -455,10 +431,9 @@
          }
       }
 
-      public void stop()
+      public void close()
       {
-         stopped = true;
-         resumeSpin();
+         closed = true;
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-12-10 09:04:09 UTC (rev 8658)
@@ -48,6 +48,7 @@
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.Future;
 import org.hornetq.utils.TypedProperties;
 
 /**
@@ -120,10 +121,9 @@
    private final ManagementService managementService;
 
    private final Binding binding;
-   
+
    private boolean transferring = false;
 
-
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerConsumerImpl(final long id,
@@ -201,7 +201,7 @@
          // should go back into the
          // queue for delivery later.
          if (!started || transferring)
-         {            
+         {
             return HandleStatus.BUSY;
          }
 
@@ -416,25 +416,59 @@
          promptDelivery(true);
       }
    }
-   
+
    public void setTransferring(final boolean transferring)
    {
       lock.lock();
       try
       {
          this.transferring = transferring;
+
+         if (transferring)
+         {
+            // Now we must wait for any large message delivery to finish
+            while (largeMessageInDelivery)
+            {
+               try
+               {
+                  Thread.sleep(1);
+               }
+               catch (InterruptedException ignore)
+               {
+               }
+            }
+         }
       }
       finally
       {
          lock.unlock();
       }
-      
+
+      //Outside the lock
+      if (transferring)
+      {
+         // And we must wait for any force delivery to be executed - this is executed async so we add a future to the
+         // executor and
+         // wait for it to complete
+
+         Future future = new Future();
+
+         executor.execute(future);
+
+         boolean ok = future.await(10000);
+
+         if (!ok)
+         {
+            log.warn("Timed out waiting for executor to complete");
+         }
+      }
+
       if (!transferring)
       {
          promptDelivery(true);
       }
    }
-   
+
    public void receiveCredits(final int credits) throws Exception
    {
       if (credits == -1)

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java	2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java	2009-12-10 09:04:09 UTC (rev 8658)
@@ -742,7 +742,7 @@
 
       for (MyHandler handler : handlers)
       {
-         boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+         boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
 
          Assert.assertTrue(ok);
       }



More information about the hornetq-commits mailing list