[hornetq-commits] JBoss hornetq SVN: r8346 - in branches/ClebertTemporary/src/main/org/hornetq/core: journal/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 20 11:50:49 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-20 11:50:48 -0500 (Fri, 20 Nov 2009)
New Revision: 8346

Modified:
   branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
   branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
Log:
AIO order

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-20 16:50:48 UTC (rev 8346)
@@ -46,8 +46,15 @@
 
    private static boolean loaded = false;
 
-   private static int EXPECTED_NATIVE_VERSION = 25;
+   private static int EXPECTED_NATIVE_VERSION = 26;
+   
+   /** Used to determine the next writing sequence */
+   private AtomicInteger nextWritingSequence = new AtomicInteger(0);
 
+   /** Used to determine the next writing sequence.
+    *  This is accessed from a single thread (the Poller Thread) */
+   private int readSequence = 0;
+
    public static void addMax(final int io)
    {
       totalMaxIO.addAndGet(io);
@@ -149,10 +156,6 @@
    // AIO using a single thread.
    private final Executor writeExecutor;
    
-   // We can't use the same thread on the callbacks
-   // as the callbacks may perform other IO operations back what could cause dead locks
-   private final Executor callbackExecutor;
-
    private final Executor pollerExecutor;
 
    // AsynchronousFile implementation ---------------------------------------------------
@@ -161,11 +164,10 @@
     * @param writeExecutor It needs to be a single Thread executor. If null it will use the user thread to execute write operations
     * @param pollerExecutor The thread pool that will initialize poller handlers
     */
-   public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor, final Executor callbackExecutor)
+   public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor)
    {
       this.writeExecutor = writeExecutor;
       this.pollerExecutor = pollerExecutor;
-      this.callbackExecutor = callbackExecutor;
    }
 
    public void open(final String fileName, final int maxIO) throws HornetQException
@@ -207,6 +209,8 @@
          }
          opened = true;
          addMax(this.maxIO);
+         nextWritingSequence.set(0);
+         readSequence = 0;
       }
       finally
       {
@@ -278,18 +282,20 @@
             public void run()
             {
                writeSemaphore.acquireUninterruptibly();
+               
+               int sequence = nextWritingSequence.getAndIncrement();
 
                try
                {
-                  write(handler, position, size, directByteBuffer, aioCallback);
+                  write(handler, sequence, position, size, directByteBuffer, aioCallback);
                }
                catch (HornetQException e)
                {
-                  callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
+                  callbackError(aioCallback, sequence, directByteBuffer, e.getCode(), e.getMessage());
                }
                catch (RuntimeException e)
                {
-                  callbackError(aioCallback, directByteBuffer, HornetQException.INTERNAL_ERROR, e.getMessage());
+                  callbackError(aioCallback, sequence, directByteBuffer, HornetQException.INTERNAL_ERROR, e.getMessage());
                }
             }
          });
@@ -298,17 +304,19 @@
       {
          writeSemaphore.acquireUninterruptibly();
 
+         int sequence = nextWritingSequence.getAndIncrement();
+
          try
          {
-            write(handler, position, size, directByteBuffer, aioCallback);
+            write(handler, sequence, position, size, directByteBuffer, aioCallback);
          }
          catch (HornetQException e)
          {
-            callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
+            callbackError(aioCallback, sequence, directByteBuffer, e.getCode(), e.getMessage());
          }
          catch (RuntimeException e)
          {
-            callbackError(aioCallback, directByteBuffer, HornetQException.INTERNAL_ERROR, e.getMessage());
+            callbackError(aioCallback, sequence, directByteBuffer, HornetQException.INTERNAL_ERROR, e.getMessage());
          }
       }
 
@@ -419,17 +427,11 @@
    @SuppressWarnings("unused")
    // Called by the JNI layer.. just ignore the
    // warning
-   private void callbackDone(final AIOCallback callback, final ByteBuffer buffer)
+   private void callbackDone(final AIOCallback callback, final int sequence, final ByteBuffer buffer)
    {
       writeSemaphore.release();
       pendingWrites.down();
-      callbackExecutor.execute(new Runnable()
-      {
-         public void run()
-         {
-            callback.done();
-         }
-      });
+      callback.done();
       
       // The buffer is not sent on callback for read operations
       if (bufferCallback != null && buffer != null)
@@ -440,7 +442,7 @@
 
    // Called by the JNI layer.. just ignore the
    // warning
-   private void callbackError(final AIOCallback callback, final ByteBuffer buffer, final int errorCode, final String errorMessage)
+   private void callbackError(final AIOCallback callback, final int sequence, final ByteBuffer buffer, final int errorCode, final String errorMessage)
    {
       log.warn("CallbackError: " + errorMessage);
       writeSemaphore.release();
@@ -527,7 +529,7 @@
 
    private native long size0(long handle) throws HornetQException;
 
-   private native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
+   private native void write(long handle, int sequence, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
 
    private native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
 

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-20 16:50:48 UTC (rev 8346)
@@ -60,11 +60,10 @@
                             final String fileName,
                             final int maxIO,
                             final BufferCallback bufferCallback,
-                            final Executor callbackExecutor,
                             final Executor writerExecutor,
                             final Executor pollerExecutor)
    {
-      super(callbackExecutor, directory, new File(directory + "/" + fileName), factory);
+      super(directory, new File(directory + "/" + fileName), factory);
       this.maxIO = maxIO;
       this.writerExecutor = writerExecutor;
       this.bufferCallback = bufferCallback;
@@ -94,7 +93,7 @@
 
    public SequentialFile copy()
    {
-      return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(), maxIO, bufferCallback, callbackExecutor, writerExecutor, pollerExecutor);
+      return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(), maxIO, bufferCallback, writerExecutor, pollerExecutor);
    }
 
    public synchronized void close() throws Exception
@@ -197,7 +196,7 @@
    public synchronized void open(final int currentMaxIO) throws Exception
    {
       opened = true;
-      aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor, callbackExecutor);
+      aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor);
       aioFile.open(getFile().getAbsolutePath(), currentMaxIO);
       position.set(0);
       aioFile.setBufferCallback(bufferCallback);

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-20 16:50:48 UTC (rev 8346)
@@ -84,7 +84,6 @@
                                    fileName,
                                    maxIO,
                                    buffersControl.callback,
-                                   callbacksExecutor,
                                    writeExecutor,
                                    pollerExecutor);
    }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-20 16:50:48 UTC (rev 8346)
@@ -44,11 +44,6 @@
 
    private final String directory;
    
-   /** We can't execute callbacks while inside the locks, as more IO operations could be performed, what could cause serious dead locks. */
-   protected final Executor callbackExecutor;
-
-
-
    protected final SequentialFileFactory factory;
 
    protected long fileSize = 0;
@@ -69,13 +64,12 @@
     * @param file
     * @param directory
     */
-   public AbstractSequentialFile(final Executor executor, final String directory, final File file, final SequentialFileFactory factory)
+   public AbstractSequentialFile(final String directory, final File file, final SequentialFileFactory factory)
    {
       super();
       this.file = file;
       this.directory = directory;
       this.factory = factory;
-      this.callbackExecutor = executor;
    }
 
    // Public --------------------------------------------------------

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-20 16:50:48 UTC (rev 8346)
@@ -41,14 +41,14 @@
 
    private RandomAccessFile rfile;
 
-   public NIOSequentialFile(final SequentialFileFactory factory, final Executor executor, final String directory, final String fileName)
+   public NIOSequentialFile(final SequentialFileFactory factory, final String directory, final String fileName)
    {
-      super(executor, directory, new File(directory + "/" + fileName), factory);
+      super(directory, new File(directory + "/" + fileName), factory);
    }
 
    public NIOSequentialFile(final SequentialFileFactory factory, final File file)
    {
-      super(null, file.getParent(), new File(file.getPath()), factory);
+      super(file.getParent(), new File(file.getPath()), factory);
    }
 
    public int getAlignment()
@@ -240,20 +240,7 @@
 
       if (callback != null)
       {
-         if (callbackExecutor == null)
-         {
-            callback.done();
-         }
-         else
-         {
-            callbackExecutor.execute(new Runnable()
-            {
-               public void run()
-               {
-                  callback.done();
-               }
-            });
-         }
+         callback.done();
       }
    }
 }

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2009-11-20 16:50:48 UTC (rev 8346)
@@ -66,7 +66,7 @@
    // maxIO is ignored on NIO
    public SequentialFile createSequentialFile(final String fileName, final int maxIO)
    {
-      return new NIOSequentialFile(this, this.callbacksExecutor, journalDir, fileName);
+      return new NIOSequentialFile(this, journalDir, fileName);
    }
 
    public boolean isSupportsCallbacks()

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-20 16:50:48 UTC (rev 8346)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.persistence.impl.journal;
 
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -116,12 +117,15 @@
    {
       if (stored >= minimalStore && replicated >= minimalReplicated)
       {
-         for (TaskHolder holder : tasks)
+         Iterator<TaskHolder> iter = tasks.iterator();
+         while (iter.hasNext())
          {
+            TaskHolder holder = iter.next();
             if (!holder.executed && stored >= holder.storeLined && replicated >= holder.replicationLined)
             {
                holder.executed = true;
                holder.task.done();
+               iter.remove();
             }
          }
       }



More information about the hornetq-commits mailing list