[hornetq-commits] JBoss hornetq SVN: r8319 - branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 18 20:12:11 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-18 20:12:10 -0500 (Wed, 18 Nov 2009)
New Revision: 8319

Modified:
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
Log:
Tweaks

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-18 21:33:55 UTC (rev 8318)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-19 01:12:10 UTC (rev 8319)
@@ -15,21 +15,17 @@
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.hornetq.core.asyncio.AIOCallback;
 import org.hornetq.core.asyncio.AsynchronousFile;
 import org.hornetq.core.asyncio.BufferCallback;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
 
 /**
  * 
@@ -50,12 +46,6 @@
 
    private final BufferCallback bufferCallback;
 
-   /** A context switch on AIO would make it to synchronize the disk before
-       switching to the new thread, what would cause
-       serious performance problems. Because of that we make all the writes on
-       AIO using a single thread. */
-   private final Executor executor;
-
    /** The pool for Thread pollers */
    private final Executor pollerExecutor;
 
@@ -69,10 +59,9 @@
                             final Executor executor,
                             final Executor pollerExecutor)
    {
-      super(directory, new File(directory + "/" + fileName), factory);
+      super(executor, directory, new File(directory + "/" + fileName), factory);
       this.maxIO = maxIO;
       this.bufferCallback = bufferCallback;
-      this.executor = executor;
       this.pollerExecutor = pollerExecutor;
    }
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-18 21:33:55 UTC (rev 8318)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-19 01:12:10 UTC (rev 8319)
@@ -36,15 +36,14 @@
 public class AIOSequentialFileFactory extends AbstractSequentialFactory
 {
 
-   // Timeout used to wait executors to shutdown
-   private static final int EXECUTOR_TIMEOUT = 60;
-
    private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
 
    private static final boolean trace = log.isTraceEnabled();
 
    private final ReuseBuffersController buffersControl = new ReuseBuffersController();
 
+   protected ExecutorService pollerExecutor;
+
    // This method exists just to make debug easier.
    // I could replace log.trace by log.info temporarily while I was debugging
    // Journal
@@ -53,13 +52,6 @@
       log.trace(message);
    }
 
-   /** A single AIO write executor for every AIO File.
-    *  This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
-    *  And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
-   private ExecutorService writeExecutor;
-
-   private ExecutorService pollerExecutor;
-
    public AIOSequentialFileFactory(final String journalDir)
    {
       this(journalDir,
@@ -152,9 +144,6 @@
    {
       super.start();
 
-      writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
-                                                                                 true));
-
       pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
                                                                               true));
 
@@ -163,23 +152,8 @@
    @Override
    public void stop()
    {
-      super.stop();
-      
       buffersControl.stop();
 
-      writeExecutor.shutdown();
-
-      try
-      {
-         if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
-         {
-            log.warn("Timed out on AIO writer shutdown", new Exception("Timed out on AIO writer shutdown"));
-         }
-      }
-      catch (InterruptedException e)
-      {
-      }
-
       pollerExecutor.shutdown();
 
       try
@@ -192,6 +166,8 @@
       catch (InterruptedException e)
       {
       }
+
+      super.stop();
    }
 
    @Override

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-18 21:33:55 UTC (rev 8318)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-19 01:12:10 UTC (rev 8319)
@@ -19,10 +19,14 @@
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
 
 /**
  * 
@@ -34,17 +38,29 @@
  */
 public abstract class AbstractSequentialFactory implements SequentialFileFactory
 {
+
+   // Timeout used to wait executors to shutdown
+   protected static final int EXECUTOR_TIMEOUT = 60;
+
    private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
 
+   /** For AIO: A single AIO write executor for every AIO File.
+    *  This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
+    *  And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls.
+    *  
+    *  For NIO: this is used to execute the callbacks.
+    *           We can't call the executor holding a lock.
+    *   */
+   protected ExecutorService writeExecutor;
+
    protected final String journalDir;
 
    protected final TimedBuffer timedBuffer;
-   
+
    protected final int bufferSize;
 
    protected final long bufferTimeout;
 
-
    public AbstractSequentialFactory(final String journalDir,
                                     final boolean buffered,
                                     final int bufferSize,
@@ -71,6 +87,22 @@
       {
          timedBuffer.stop();
       }
+
+      if (writeExecutor != null)
+      {
+         writeExecutor.shutdown();
+
+         try
+         {
+            if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+            {
+               log.warn("Timed out on AIO writer shutdown", new Exception("Timed out on AIO writer shutdown"));
+            }
+         }
+         catch (InterruptedException e)
+         {
+         }
+      }
    }
 
    public void start()
@@ -79,6 +111,17 @@
       {
          timedBuffer.start();
       }
+
+      if (isSupportsCallbacks())
+      {
+         writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-writer-pool" + System.identityHashCode(this),
+                                                                                    true));
+      }
+      else
+      {
+         writeExecutor = null;
+      }
+
    }
 
    /* (non-Javadoc)
@@ -99,7 +142,7 @@
          }
       }
    }
-   
+
    public void flush()
    {
       if (timedBuffer != null)
@@ -117,7 +160,6 @@
       }
    }
 
-
    public void releaseBuffer(ByteBuffer buffer)
    {
    }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-18 21:33:55 UTC (rev 8318)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-19 01:12:10 UTC (rev 8319)
@@ -16,6 +16,7 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.journal.IOAsyncTask;
@@ -42,7 +43,17 @@
    private File file;
 
    private final String directory;
+   
+   /** on AIO: A context switch on AIO would make it to synchronize the disk before
+   switching to the new thread, what would cause
+   serious performance problems. Because of that we make all the writes on
+   AIO using a single thread. 
+       on NIO: We can't execute callbacks while inside the locks, as more IO operations could be
+               performed later */
+   protected final Executor executor;
 
+
+
    protected final SequentialFileFactory factory;
 
    protected long fileSize = 0;
@@ -63,12 +74,13 @@
     * @param file
     * @param directory
     */
-   public AbstractSequentialFile(final String directory, final File file, final SequentialFileFactory factory)
+   public AbstractSequentialFile(final Executor executor, final String directory, final File file, final SequentialFileFactory factory)
    {
       super();
       this.file = file;
       this.directory = directory;
       this.factory = factory;
+      this.executor = executor;
    }
 
    // Public --------------------------------------------------------

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-18 21:33:55 UTC (rev 8318)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-19 01:12:10 UTC (rev 8319)
@@ -18,6 +18,7 @@
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.Executor;
 
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
@@ -40,14 +41,14 @@
 
    private RandomAccessFile rfile;
 
-   public NIOSequentialFile(final SequentialFileFactory factory, final String directory, final String fileName)
+   public NIOSequentialFile(final SequentialFileFactory factory, final Executor executor, final String directory, final String fileName)
    {
-      super(directory, new File(directory + "/" + fileName), factory);
+      super(executor, directory, new File(directory + "/" + fileName), factory);
    }
 
    public NIOSequentialFile(final SequentialFileFactory factory, final File file)
    {
-      super(file.getParent(), new File(file.getPath()), factory);
+      super(null, file.getParent(), new File(file.getPath()), factory);
    }
 
    public int getAlignment()
@@ -239,7 +240,20 @@
 
       if (callback != null)
       {
-         callback.done();
+         if (executor == null)
+         {
+            callback.done();
+         }
+         else
+         {
+            executor.execute(new Runnable()
+            {
+               public void run()
+               {
+                  callback.done();
+               }
+            });
+         }
       }
    }
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2009-11-18 21:33:55 UTC (rev 8318)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2009-11-19 01:12:10 UTC (rev 8319)
@@ -66,7 +66,7 @@
    // maxIO is ignored on NIO
    public SequentialFile createSequentialFile(final String fileName, final int maxIO)
    {
-      return new NIOSequentialFile(this, journalDir, fileName);
+      return new NIOSequentialFile(this, this.writeExecutor, journalDir, fileName);
    }
 
    public boolean isSupportsCallbacks()



More information about the hornetq-commits mailing list