[hornetq-commits] JBoss hornetq SVN: r8377 - in branches/ClebertCallback: src/main/org/hornetq/core/asyncio/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 23 12:44:07 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-23 12:44:05 -0500 (Mon, 23 Nov 2009)
New Revision: 8377

Modified:
   branches/ClebertCallback/native/src/AIOException.h
   branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
Log:
NIO should be totally Asynchronous now

Modified: branches/ClebertCallback/native/src/AIOException.h
===================================================================
--- branches/ClebertCallback/native/src/AIOException.h	2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/native/src/AIOException.h	2009-11-23 17:44:05 UTC (rev 8377)
@@ -29,7 +29,7 @@
 #define NATIVE_ERROR_CANT_ALLOCATE_QUEUE 206
 #define NATIVE_ERROR_PREALLOCATE_FILE 208
 #define NATIVE_ERROR_ALLOCATE_MEMORY 209
-#define NATIVE_ERROR_IO 210
+#define NATIVE_ERROR_IO 006
 #define NATIVE_ERROR_AIO_FULL 211
 
 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-23 17:44:05 UTC (rev 8377)
@@ -49,7 +49,7 @@
 
    private static boolean loaded = false;
 
-   private static int EXPECTED_NATIVE_VERSION = 26;
+   private static int EXPECTED_NATIVE_VERSION = 27;
 
    /** Used to determine the next writing sequence */
    private AtomicLong nextWritingSequence = new AtomicLong(0);
@@ -154,7 +154,7 @@
 
    private final VariableLatch pendingWrites = new VariableLatch();
 
-   private Semaphore writeSemaphore;
+   private Semaphore maxIOSemaphore;
 
    private BufferCallback bufferCallback;
 
@@ -195,7 +195,7 @@
          }
 
          this.maxIO = maxIO;
-         writeSemaphore = new Semaphore(this.maxIO);
+         maxIOSemaphore = new Semaphore(this.maxIO);
 
          this.fileName = fileName;
 
@@ -245,12 +245,12 @@
             log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
          }
 
-         while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+         while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
          {
             log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
          }
 
-         writeSemaphore = null;
+         maxIOSemaphore = null;
          if (poller != null)
          {
             stopPoller();
@@ -294,7 +294,7 @@
          {
             public void run()
             {
-               writeSemaphore.acquireUninterruptibly();
+               maxIOSemaphore.acquireUninterruptibly();
 
                long sequence = nextWritingSequence.getAndIncrement();
 
@@ -319,7 +319,7 @@
       }
       else
       {
-         writeSemaphore.acquireUninterruptibly();
+         maxIOSemaphore.acquireUninterruptibly();
 
          long sequence = nextWritingSequence.getAndIncrement();
 
@@ -350,7 +350,7 @@
          startPoller();
       }
       pendingWrites.up();
-      writeSemaphore.acquireUninterruptibly();
+      maxIOSemaphore.acquireUninterruptibly();
       try
       {
          read(handler, position, size, directByteBuffer, aioPackage);
@@ -358,14 +358,14 @@
       catch (HornetQException e)
       {
          // Release only if an exception happened
-         writeSemaphore.release();
+         maxIOSemaphore.release();
          pendingWrites.down();
          throw e;
       }
       catch (RuntimeException e)
       {
          // Release only if an exception happened
-         writeSemaphore.release();
+         maxIOSemaphore.release();
          pendingWrites.down();
          throw e;
       }
@@ -444,7 +444,7 @@
    @SuppressWarnings("unused")
    private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
    {
-      writeSemaphore.release();
+      maxIOSemaphore.release();
 
       pendingWrites.down();
 
@@ -511,7 +511,7 @@
    {
       log.warn("CallbackError: " + errorMessage);
 
-      writeSemaphore.release();
+      maxIOSemaphore.release();
 
       pendingWrites.down();
 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java	2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java	2009-11-23 17:44:05 UTC (rev 8377)
@@ -38,6 +38,9 @@
    
    public static final int UNBLOCKED = 005;
 
+   public static final int IO_ERROR = 006;
+
+
    public static final int QUEUE_DOES_NOT_EXIST = 100;
 
    public static final int QUEUE_EXISTS = 101;
@@ -85,8 +88,6 @@
 
    public static final int NATIVE_ERROR_ALLOCATE_MEMORY = 209;
 
-   public static final int NATIVE_ERROR_IO = 210;
-
    public static final int NATIVE_ERROR_AIO_FULL = 211;
    
    

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-23 17:44:05 UTC (rev 8377)
@@ -38,7 +38,7 @@
    boolean exists();
 
    /**
-    * For certain operations (like loading) we don't need open the file with full maxIO
+    * The maximum number of simultaneous writes accepted
     * @param maxIO
     * @throws Exception
     */

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-23 17:44:05 UTC (rev 8377)
@@ -15,9 +15,7 @@
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 
 import org.hornetq.core.asyncio.AsynchronousFile;
 import org.hornetq.core.asyncio.BufferCallback;
@@ -49,9 +47,6 @@
    /** The pool for Thread pollers */
    private final Executor pollerExecutor;
    
-   /** Context switch on AIO could fire unnecessary flushes, so we use a single thread for write */
-   private final Executor writerExecutor;
-   
 
    public AIOSequentialFile(final SequentialFileFactory factory,
                             final int bufferSize,
@@ -63,9 +58,8 @@
                             final Executor writerExecutor,
                             final Executor pollerExecutor)
    {
-      super(directory, new File(directory + "/" + fileName), factory);
+      super(directory, new File(directory + "/" + fileName), factory, writerExecutor);
       this.maxIO = maxIO;
-      this.writerExecutor = writerExecutor;
       this.bufferCallback = bufferCallback;
       this.pollerExecutor = pollerExecutor;
    }
@@ -102,27 +96,13 @@
       {
          return;
       }
+      
+      super.close();
+      
       opened = false;
 
       timedBuffer = null;
 
-      final CountDownLatch donelatch = new CountDownLatch(1);
-
-      writerExecutor.execute(new Runnable()
-      {
-         public void run()
-         {
-            donelatch.countDown();
-         }
-      });
-
-      while (!donelatch.await(60, TimeUnit.SECONDS))
-      {
-         log.warn("Executor on file " + getFile().getName() + " couldn't complete its tasks in 60 seconds.",
-                  new Exception("Warning: Executor on file " + getFile().getName() +
-                                " couldn't complete its tasks in 60 seconds."));
-      }
-
       aioFile.close();
       aioFile = null;
 
@@ -193,13 +173,18 @@
       open(maxIO);
    }
 
-   public synchronized void open(final int currentMaxIO) throws Exception
+   public synchronized void open(final int maxIO) throws Exception
    {
       opened = true;
+      
       aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor);
-      aioFile.open(getFile().getAbsolutePath(), currentMaxIO);
+      
+      aioFile.open(getFile().getAbsolutePath(), maxIO);
+      
       position.set(0);
+      
       aioFile.setBufferCallback(bufferCallback);
+      
       this.fileSize = aioFile.size();
    }
 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-23 17:44:05 UTC (rev 8377)
@@ -42,11 +42,6 @@
 
    private final ReuseBuffersController buffersControl = new ReuseBuffersController();
 
-   /** A single AIO write executor for every AIO File.
-    *  This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
-    *  And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
-   private ExecutorService writeExecutor;
-
    private ExecutorService pollerExecutor;
 
    // This method exists just to make debug easier.
@@ -149,9 +144,6 @@
    {
       super.start();
 
-      writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
-                                                                                 true));
-
       pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
                                                                               true));
 
@@ -162,19 +154,6 @@
    {
       buffersControl.stop();
 
-      writeExecutor.shutdown();
-
-      try
-      {
-         if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
-         {
-            log.warn("Timed out on AIO writer shutdown", new Exception("Timed out on AIO writer shutdown"));
-         }
-      }
-      catch (InterruptedException e)
-      {
-      }
-
       pollerExecutor.shutdown();
 
       try

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-23 17:44:05 UTC (rev 8377)
@@ -44,13 +44,6 @@
 
    private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
 
-   /** 
-    * 
-    * We can't execute callbacks directly from any of the IO module. We need to do it through another thread,
-    * So, we will use an executor for this. 
-    *   */
-   protected ExecutorService callbacksExecutor;
-
    protected final String journalDir;
 
    protected final TimedBuffer timedBuffer;
@@ -58,7 +51,16 @@
    protected final int bufferSize;
 
    protected final long bufferTimeout;
+   
+   /** 
+    * Asynchronous writes need to be done at another executor.
+    * This needs to be done at NIO, or else we would have the callers thread blocking for the return.
+    * At AIO this is necessary as context switches on writes would fire flushes at the kernel.
+    *  */
+   protected ExecutorService writeExecutor;
 
+   
+
    public AbstractSequentialFactory(final String journalDir,
                                     final boolean buffered,
                                     final int bufferSize,
@@ -86,13 +88,13 @@
          timedBuffer.stop();
       }
 
-      if (callbacksExecutor != null)
+      if (isSupportsCallbacks())
       {
-         callbacksExecutor.shutdown();
-
+         writeExecutor.shutdown();
+   
          try
          {
-            if (!callbacksExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+            if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
             {
                log.warn("Timed out on AIO writer shutdown", new Exception("Timed out on AIO writer shutdown"));
             }
@@ -101,6 +103,8 @@
          {
          }
       }
+
+      
    }
 
    public void start()
@@ -109,17 +113,14 @@
       {
          timedBuffer.start();
       }
-
+      
       if (isSupportsCallbacks())
       {
-         callbacksExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-callbacks" + System.identityHashCode(this),
+         writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
                                                                                     true));
       }
-      else
-      {
-         callbacksExecutor = null;
-      }
 
+
    }
 
    /* (non-Javadoc)

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-23 17:44:05 UTC (rev 8377)
@@ -16,7 +16,9 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.journal.IOAsyncTask;
@@ -43,7 +45,7 @@
    private File file;
 
    private final String directory;
-   
+
    protected final SequentialFileFactory factory;
 
    protected long fileSize = 0;
@@ -56,6 +58,9 @@
     *  This is the class returned to the factory when the file is being activated. */
    protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
 
+   /** Used for asynchronous writes */
+   protected final Executor writerExecutor;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -64,12 +69,16 @@
     * @param file
     * @param directory
     */
-   public AbstractSequentialFile(final String directory, final File file, final SequentialFileFactory factory)
+   public AbstractSequentialFile(final String directory,
+                                 final File file,
+                                 final SequentialFileFactory factory,
+                                 final Executor writerExecutor)
    {
       super();
       this.file = file;
       this.directory = directory;
       this.factory = factory;
+      this.writerExecutor = writerExecutor;
    }
 
    // Public --------------------------------------------------------
@@ -116,6 +125,29 @@
       }
    }
 
+   public synchronized void close() throws Exception
+   {
+      final CountDownLatch donelatch = new CountDownLatch(1);
+
+      if (writerExecutor != null)
+      {
+         writerExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               donelatch.countDown();
+            }
+         });
+
+         while (!donelatch.await(60, TimeUnit.SECONDS))
+         {
+            log.warn("Executor on file " + getFile().getName() + " couldn't complete its tasks in 60 seconds.",
+                     new Exception("Warning: Executor on file " + getFile().getName() +
+                                   " couldn't complete its tasks in 60 seconds."));
+         }
+      }
+   }
+
    public final boolean fits(final int size)
    {
       if (timedBuffer == null)

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-23 17:44:05 UTC (rev 8377)
@@ -19,7 +19,10 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
@@ -41,14 +44,23 @@
 
    private RandomAccessFile rfile;
 
-   public NIOSequentialFile(final SequentialFileFactory factory, final String directory, final String fileName)
+   /** The write semaphore here is only used when writing asynchronously */
+   private Semaphore maxIOSemaphore;
+   
+   private final int defaultMaxIO;
+   
+   private int maxIO;
+
+   public NIOSequentialFile(final SequentialFileFactory factory, final String directory, final String fileName, final int maxIO, final Executor writerExecutor)
    {
-      super(directory, new File(directory + "/" + fileName), factory);
+      super(directory, new File(directory + "/" + fileName), factory, writerExecutor);
+      this.defaultMaxIO = maxIO;
    }
 
-   public NIOSequentialFile(final SequentialFileFactory factory, final File file)
+   public NIOSequentialFile(final SequentialFileFactory factory, final File file, final int maxIO, final Executor writerExecutor)
    {
-      super(file.getParent(), new File(file.getPath()), factory);
+      super(file.getParent(), new File(file.getPath()), factory, writerExecutor);
+      this.defaultMaxIO = maxIO;
    }
 
    public int getAlignment()
@@ -66,20 +78,28 @@
       return channel != null;
    }
 
+   /** this.maxIO represents the default maxIO.
+    *  Some operations while initializing files on the journal may require a different maxIO */
    public synchronized void open() throws Exception
    {
+      open(this.defaultMaxIO);
+   }
+
+   public void open(final int maxIO) throws Exception
+   {
       rfile = new RandomAccessFile(getFile(), "rw");
 
       channel = rfile.getChannel();
 
       fileSize = channel.size();
+      
+      if (writerExecutor != null)
+      {
+         this.maxIOSemaphore = new Semaphore(maxIO);
+         this.maxIO = maxIO;
+      }
    }
 
-   public void open(final int currentMaxIO) throws Exception
-   {
-      open();
-   }
-
    public void fill(final int position, final int size, final byte fillCharacter) throws Exception
    {
       ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -112,6 +132,8 @@
 
    public synchronized void close() throws Exception
    {
+      super.close();
+      
       if (channel != null)
       {
          channel.close();
@@ -126,6 +148,14 @@
 
       rfile = null;
 
+      if (maxIOSemaphore != null)
+      {
+         while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+         {
+            log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.getFileName());
+         }
+      }
+
       notifyAll();
    }
 
@@ -153,7 +183,7 @@
       {
          if (callback != null)
          {
-            callback.onError(-1, e.getLocalizedMessage());
+            callback.onError(HornetQException.IO_ERROR, e.getLocalizedMessage());
          }
 
          throw e;
@@ -195,7 +225,7 @@
 
    public SequentialFile copy()
    {
-      return new NIOSequentialFile(factory, getFile());
+      return new NIOSequentialFile(factory, getFile(), maxIO, writerExecutor);
    }
 
    public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
@@ -220,6 +250,42 @@
       internalWrite(bytes, sync, null);
    }
 
+   private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
+   {
+      if (writerExecutor == null)
+      {
+         doInternalWrite(bytes, sync, callback);
+      }
+      else
+      {
+         // This is a flow control on writing, just like maxAIO on libaio
+         maxIOSemaphore.acquire();
+         
+         writerExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  try
+                  {
+                     doInternalWrite(bytes, sync, callback);
+                  }
+                  catch (Exception e)
+                  {
+                     log.warn("Exception on submitting write", e);
+                     callback.onError(HornetQException.IO_ERROR, e.getMessage());
+                  }
+               }
+               finally
+               {
+                  maxIOSemaphore.release();
+               }
+            }
+         });
+      }
+   }
+
    /**
     * @param bytes
     * @param sync
@@ -227,8 +293,9 @@
     * @throws IOException
     * @throws Exception
     */
-   private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
+   private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
    {
+      
       position.addAndGet(bytes.limit());
 
       channel.write(bytes);

Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2009-11-23 17:44:05 UTC (rev 8377)
@@ -64,9 +64,15 @@
    }
 
    // maxIO is ignored on NIO
-   public SequentialFile createSequentialFile(final String fileName, final int maxIO)
+   public SequentialFile createSequentialFile(final String fileName, int maxIO)
    {
-      return new NIOSequentialFile(this, journalDir, fileName);
+      if (maxIO < 0)
+      {
+         // A single threaded IO
+         maxIO = 1;
+      }
+      
+      return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
    }
 
    public boolean isSupportsCallbacks()



More information about the hornetq-commits mailing list