Author: clebert.suconic(a)jboss.com
Date: 2009-11-20 16:21:28 -0500 (Fri, 20 Nov 2009)
New Revision: 8351
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
Log:
AIO order
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20
18:16:16 UTC (rev 8350)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20
21:21:28 UTC (rev 8351)
@@ -14,12 +14,14 @@
package org.hornetq.core.asyncio.impl;
import java.nio.ByteBuffer;
+import java.util.PriorityQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.asyncio.AIOCallback;
@@ -48,14 +50,20 @@
private static boolean loaded = false;
private static int EXPECTED_NATIVE_VERSION = 26;
-
+
/** Used to determine the next writing sequence */
private AtomicLong nextWritingSequence = new AtomicLong(0);
/** Used to determine the next writing sequence.
* This is accessed from a single thread (the Poller Thread) */
- private long readSequence = 0;
+ private long nextReadSequence = 0;
+ /**
+ * AIO can't guarantee ordering over callbacks.
+ * We use thie PriorityQueue to hold values until they are in order
+ */
+ private PriorityQueue<CallbackHolder> pendingCallbacks = new
PriorityQueue<CallbackHolder>();
+
public static void addMax(final int io)
{
totalMaxIO.addAndGet(io);
@@ -132,6 +140,10 @@
private String fileName;
+ /** Used while inside the callbackDone and callbackError
+ **/
+ private final Lock callbackLock = new ReentrantLock();
+
private final VariableLatch pollerLatch = new VariableLatch();
private volatile Runnable poller;
@@ -139,7 +151,7 @@
private int maxIO;
private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
-
+
private final VariableLatch pendingWrites = new VariableLatch();
private Semaphore writeSemaphore;
@@ -156,7 +168,7 @@
// serious performance problems. Because of that we make all the writes on
// AIO using a single thread.
private final Executor writeExecutor;
-
+
private final Executor pollerExecutor;
// AsynchronousFile implementation
---------------------------------------------------
@@ -197,10 +209,10 @@
if (e.getCode() == HornetQException.NATIVE_ERROR_CANT_INITIALIZE_AIO)
{
ex = new HornetQException(e.getCode(),
- "Can't initialize AIO. Currently AIO
in use = " + totalMaxIO.get() +
- ", trying to allocate more
" +
- maxIO,
- e);
+ "Can't initialize AIO. Currently AIO in
use = " + totalMaxIO.get() +
+ ", trying to allocate more "
+
+ maxIO,
+ e);
}
else
{
@@ -211,7 +223,7 @@
opened = true;
addMax(this.maxIO);
nextWritingSequence.set(0);
- readSequence = 0;
+ nextReadSequence = 0;
}
finally
{
@@ -232,7 +244,7 @@
{
log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.fileName);
}
-
+
while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
{
log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.fileName);
@@ -273,7 +285,7 @@
{
startPoller();
}
-
+
pendingWrites.up();
if (writeExecutor != null)
@@ -283,7 +295,7 @@
public void run()
{
writeSemaphore.acquireUninterruptibly();
-
+
long sequence = nextWritingSequence.getAndIncrement();
try
@@ -296,7 +308,11 @@
}
catch (RuntimeException e)
{
- callbackError(aioCallback, sequence, directByteBuffer,
HornetQException.INTERNAL_ERROR, e.getMessage());
+ callbackError(aioCallback,
+ sequence,
+ directByteBuffer,
+ HornetQException.INTERNAL_ERROR,
+ e.getMessage());
}
}
});
@@ -411,9 +427,9 @@
resetBuffer(buffer, buffer.limit());
buffer.position(0);
}
-
+
// Protected
-------------------------------------------------------------------------
-
+
protected void finalize()
{
if (opened)
@@ -424,32 +440,109 @@
// Private
---------------------------------------------------------------------------
- /** The JNI layer will call this method, so we could use it to unlock readWriteLocks
held in the java layer */
+ /** */
@SuppressWarnings("unused")
- // Called by the JNI layer.. just ignore the
- // warning
private void callbackDone(final AIOCallback callback, final long sequence, final
ByteBuffer buffer)
{
writeSemaphore.release();
+
pendingWrites.down();
- callback.done();
-
- // The buffer is not sent on callback for read operations
- if (bufferCallback != null && buffer != null)
+
+ callbackLock.lock();
+
+ try
{
- bufferCallback.bufferDone(buffer);
+
+ if (sequence == -1)
+ {
+ callback.done();
+ }
+ else
+ {
+ if (sequence == nextReadSequence)
+ {
+ nextReadSequence++;
+ callback.done();
+ flushCallbacks();
+ }
+ else
+ {
+ // System.out.println("Buffering callback");
+ pendingCallbacks.add(new CallbackHolder(sequence, callback));
+ }
+ }
+
+ // The buffer is not sent on callback for read operations
+ if (bufferCallback != null && buffer != null)
+ {
+ bufferCallback.bufferDone(buffer);
+ }
}
+ finally
+ {
+ callbackLock.unlock();
+ }
}
+ private void flushCallbacks()
+ {
+ while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence ==
nextReadSequence)
+ {
+ CallbackHolder holder = pendingCallbacks.poll();
+ if (holder.isError())
+ {
+ ErrorCallback error = (ErrorCallback) holder;
+ holder.callback.onError(error.errorCode, error.message);
+ }
+ else
+ {
+ holder.callback.done();
+ }
+ nextReadSequence++;
+ }
+ }
+
// Called by the JNI layer.. just ignore the
// warning
- private void callbackError(final AIOCallback callback, final long sequence, final
ByteBuffer buffer, final int errorCode, final String errorMessage)
+ private void callbackError(final AIOCallback callback,
+ final long sequence,
+ final ByteBuffer buffer,
+ final int errorCode,
+ final String errorMessage)
{
log.warn("CallbackError: " + errorMessage);
+
writeSemaphore.release();
+
pendingWrites.down();
- callback.onError(errorCode, errorMessage);
+ callbackLock.lock();
+
+ try
+ {
+ if (sequence == -1)
+ {
+ callback.onError(errorCode, errorMessage);
+ }
+ else
+ {
+ if (sequence == nextReadSequence)
+ {
+ nextReadSequence++;
+ callback.onError(errorCode, errorMessage);
+ flushCallbacks();
+ }
+ else
+ {
+ pendingCallbacks.add(new ErrorCallback(sequence, callback, errorCode,
errorMessage));
+ }
+ }
+ }
+ finally
+ {
+ callbackLock.unlock();
+ }
+
// The buffer is not sent on callback for read operations
if (bufferCallback != null && buffer != null)
{
@@ -518,10 +611,10 @@
private static native void resetBuffer(ByteBuffer directByteBuffer, int size);
public static native void destroyBuffer(ByteBuffer buffer);
-
+
/** Instead of passing the nanoSeconds through the stack call every time, we set it
statically inside the native method */
public static native void setNanoSleepInterval(int nanoseconds);
-
+
public static native void nanoSleep();
private static native ByteBuffer newNativeBuffer(long size);
@@ -530,7 +623,12 @@
private native long size0(long handle) throws HornetQException;
- private native void write(long handle, long sequence, long position, long size,
ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
+ private native void write(long handle,
+ long sequence,
+ long position,
+ long size,
+ ByteBuffer buffer,
+ AIOCallback aioPackage) throws HornetQException;
private native void read(long handle, long position, long size, ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
@@ -548,6 +646,53 @@
// Inner classes
---------------------------------------------------------------------
+ private static class CallbackHolder implements Comparable<CallbackHolder>
+ {
+ final long sequence;
+
+ final AIOCallback callback;
+
+ public boolean isError()
+ {
+ return false;
+ }
+
+ public CallbackHolder(final long sequence, final AIOCallback callback)
+ {
+ this.sequence = sequence;
+ this.callback = callback;
+ }
+
+ public int compareTo(CallbackHolder o)
+ {
+ // It shouldn't be equals in any case
+ if (sequence <= o.sequence)
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+ }
+
+ private static class ErrorCallback extends CallbackHolder
+ {
+ final int errorCode;
+
+ final String message;
+
+ public ErrorCallback(final long sequence, final AIOCallback callback, int
errorCode, String message)
+ {
+ super(sequence, callback);
+
+ this.errorCode = errorCode;
+
+ this.message = message;
+ }
+ }
+
private class PollerRunnable implements Runnable
{
PollerRunnable()