[hornetq-commits] JBoss hornetq SVN: r8351 - branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 20 16:21:29 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-20 16:21:28 -0500 (Fri, 20 Nov 2009)
New Revision: 8351

Modified:
   branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
Log:
AIO order

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-20 18:16:16 UTC (rev 8350)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-20 21:21:28 UTC (rev 8351)
@@ -14,12 +14,14 @@
 package org.hornetq.core.asyncio.impl;
 
 import java.nio.ByteBuffer;
+import java.util.PriorityQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.hornetq.core.asyncio.AIOCallback;
@@ -48,14 +50,20 @@
    private static boolean loaded = false;
 
    private static int EXPECTED_NATIVE_VERSION = 26;
-   
+
    /** Used to determine the next writing sequence */
    private AtomicLong nextWritingSequence = new AtomicLong(0);
 
    /** Used to determine the next writing sequence.
     *  This is accessed from a single thread (the Poller Thread) */
-   private long readSequence = 0;
+   private long nextReadSequence = 0;
 
+   /** 
+    * AIO can't guarantee ordering over callbacks.
+    * We use thie PriorityQueue to hold values until they are in order
+    */
+   private PriorityQueue<CallbackHolder> pendingCallbacks = new PriorityQueue<CallbackHolder>();
+
    public static void addMax(final int io)
    {
       totalMaxIO.addAndGet(io);
@@ -132,6 +140,10 @@
 
    private String fileName;
 
+   /** Used while inside the callbackDone and callbackError
+    **/
+   private final Lock callbackLock = new ReentrantLock();
+
    private final VariableLatch pollerLatch = new VariableLatch();
 
    private volatile Runnable poller;
@@ -139,7 +151,7 @@
    private int maxIO;
 
    private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
-   
+
    private final VariableLatch pendingWrites = new VariableLatch();
 
    private Semaphore writeSemaphore;
@@ -156,7 +168,7 @@
    // serious performance problems. Because of that we make all the writes on
    // AIO using a single thread.
    private final Executor writeExecutor;
-   
+
    private final Executor pollerExecutor;
 
    // AsynchronousFile implementation ---------------------------------------------------
@@ -197,10 +209,10 @@
             if (e.getCode() == HornetQException.NATIVE_ERROR_CANT_INITIALIZE_AIO)
             {
                ex = new HornetQException(e.getCode(),
-                                           "Can't initialize AIO. Currently AIO in use = " + totalMaxIO.get() +
-                                                    ", trying to allocate more " +
-                                                    maxIO,
-                                           e);
+                                         "Can't initialize AIO. Currently AIO in use = " + totalMaxIO.get() +
+                                                  ", trying to allocate more " +
+                                                  maxIO,
+                                         e);
             }
             else
             {
@@ -211,7 +223,7 @@
          opened = true;
          addMax(this.maxIO);
          nextWritingSequence.set(0);
-         readSequence = 0;
+         nextReadSequence = 0;
       }
       finally
       {
@@ -232,7 +244,7 @@
          {
             log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
          }
-         
+
          while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
          {
             log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
@@ -273,7 +285,7 @@
       {
          startPoller();
       }
-      
+
       pendingWrites.up();
 
       if (writeExecutor != null)
@@ -283,7 +295,7 @@
             public void run()
             {
                writeSemaphore.acquireUninterruptibly();
-               
+
                long sequence = nextWritingSequence.getAndIncrement();
 
                try
@@ -296,7 +308,11 @@
                }
                catch (RuntimeException e)
                {
-                  callbackError(aioCallback, sequence, directByteBuffer, HornetQException.INTERNAL_ERROR, e.getMessage());
+                  callbackError(aioCallback,
+                                sequence,
+                                directByteBuffer,
+                                HornetQException.INTERNAL_ERROR,
+                                e.getMessage());
                }
             }
          });
@@ -411,9 +427,9 @@
       resetBuffer(buffer, buffer.limit());
       buffer.position(0);
    }
-   
+
    // Protected -------------------------------------------------------------------------
-   
+
    protected void finalize()
    {
       if (opened)
@@ -424,32 +440,109 @@
 
    // Private ---------------------------------------------------------------------------
 
-   /** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
+   /** */
    @SuppressWarnings("unused")
-   // Called by the JNI layer.. just ignore the
-   // warning
    private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
    {
       writeSemaphore.release();
+
       pendingWrites.down();
-      callback.done();
-      
-      // The buffer is not sent on callback for read operations
-      if (bufferCallback != null && buffer != null)
+
+      callbackLock.lock();
+
+      try
       {
-         bufferCallback.bufferDone(buffer);
+
+         if (sequence == -1)
+         {
+            callback.done();
+         }
+         else
+         {
+            if (sequence == nextReadSequence)
+            {
+               nextReadSequence++;
+               callback.done();
+               flushCallbacks();
+            }
+            else
+            {
+               // System.out.println("Buffering callback");
+               pendingCallbacks.add(new CallbackHolder(sequence, callback));
+            }
+         }
+
+         // The buffer is not sent on callback for read operations
+         if (bufferCallback != null && buffer != null)
+         {
+            bufferCallback.bufferDone(buffer);
+         }
       }
+      finally
+      {
+         callbackLock.unlock();
+      }
    }
 
+   private void flushCallbacks()
+   {
+      while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence == nextReadSequence)
+      {
+         CallbackHolder holder = pendingCallbacks.poll();
+         if (holder.isError())
+         {
+            ErrorCallback error = (ErrorCallback) holder;
+            holder.callback.onError(error.errorCode, error.message);
+         }
+         else
+         {
+            holder.callback.done();
+         }
+         nextReadSequence++;
+      }
+   }
+
    // Called by the JNI layer.. just ignore the
    // warning
-   private void callbackError(final AIOCallback callback, final long sequence, final ByteBuffer buffer, final int errorCode, final String errorMessage)
+   private void callbackError(final AIOCallback callback,
+                                           final long sequence,
+                                           final ByteBuffer buffer,
+                                           final int errorCode,
+                                           final String errorMessage)
    {
       log.warn("CallbackError: " + errorMessage);
+
       writeSemaphore.release();
+
       pendingWrites.down();
-      callback.onError(errorCode, errorMessage);
 
+      callbackLock.lock();
+
+      try
+      {
+         if (sequence == -1)
+         {
+            callback.onError(errorCode, errorMessage);
+         }
+         else
+         {
+            if (sequence == nextReadSequence)
+            {
+               nextReadSequence++;
+               callback.onError(errorCode, errorMessage);
+               flushCallbacks();
+            }
+            else
+            {
+               pendingCallbacks.add(new ErrorCallback(sequence, callback, errorCode, errorMessage));
+            }
+         }
+      }
+      finally
+      {
+         callbackLock.unlock();
+      }
+
       // The buffer is not sent on callback for read operations
       if (bufferCallback != null && buffer != null)
       {
@@ -518,10 +611,10 @@
    private static native void resetBuffer(ByteBuffer directByteBuffer, int size);
 
    public static native void destroyBuffer(ByteBuffer buffer);
-   
+
    /** Instead of passing the nanoSeconds through the stack call every time, we set it statically inside the native method */
    public static native void setNanoSleepInterval(int nanoseconds);
-   
+
    public static native void nanoSleep();
 
    private static native ByteBuffer newNativeBuffer(long size);
@@ -530,7 +623,12 @@
 
    private native long size0(long handle) throws HornetQException;
 
-   private native void write(long handle, long sequence, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
+   private native void write(long handle,
+                             long sequence,
+                             long position,
+                             long size,
+                             ByteBuffer buffer,
+                             AIOCallback aioPackage) throws HornetQException;
 
    private native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
 
@@ -548,6 +646,53 @@
 
    // Inner classes ---------------------------------------------------------------------
 
+   private static class CallbackHolder implements Comparable<CallbackHolder>
+   {
+      final long sequence;
+
+      final AIOCallback callback;
+
+      public boolean isError()
+      {
+         return false;
+      }
+
+      public CallbackHolder(final long sequence, final AIOCallback callback)
+      {
+         this.sequence = sequence;
+         this.callback = callback;
+      }
+
+      public int compareTo(CallbackHolder o)
+      {
+         // It shouldn't be equals in any case
+         if (sequence <= o.sequence)
+         {
+            return -1;
+         }
+         else
+         {
+            return 1;
+         }
+      }
+   }
+
+   private static class ErrorCallback extends CallbackHolder
+   {
+      final int errorCode;
+      
+      final String message;
+
+      public ErrorCallback(final long sequence, final AIOCallback callback, int errorCode, String message)
+      {
+         super(sequence, callback);
+         
+         this.errorCode = errorCode;
+         
+         this.message = message;
+      }
+   }
+
    private class PollerRunnable implements Runnable
    {
       PollerRunnable()



More information about the hornetq-commits mailing list