Author: clebert.suconic(a)jboss.com
Date: 2009-11-23 12:44:05 -0500 (Mon, 23 Nov 2009)
New Revision: 8377
Modified:
branches/ClebertCallback/native/src/AIOException.h
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
Log:
NIO should be totally Asynchronous now
Modified: branches/ClebertCallback/native/src/AIOException.h
===================================================================
--- branches/ClebertCallback/native/src/AIOException.h 2009-11-23 14:51:41 UTC (rev 8376)
+++ branches/ClebertCallback/native/src/AIOException.h 2009-11-23 17:44:05 UTC (rev 8377)
@@ -29,7 +29,7 @@
#define NATIVE_ERROR_CANT_ALLOCATE_QUEUE 206
#define NATIVE_ERROR_PREALLOCATE_FILE 208
#define NATIVE_ERROR_ALLOCATE_MEMORY 209
-#define NATIVE_ERROR_IO 210
+#define NATIVE_ERROR_IO 006
#define NATIVE_ERROR_AIO_FULL 211
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-23
14:51:41 UTC (rev 8376)
+++
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-23
17:44:05 UTC (rev 8377)
@@ -49,7 +49,7 @@
private static boolean loaded = false;
- private static int EXPECTED_NATIVE_VERSION = 26;
+ private static int EXPECTED_NATIVE_VERSION = 27;
/** Used to determine the next writing sequence */
private AtomicLong nextWritingSequence = new AtomicLong(0);
@@ -154,7 +154,7 @@
private final VariableLatch pendingWrites = new VariableLatch();
- private Semaphore writeSemaphore;
+ private Semaphore maxIOSemaphore;
private BufferCallback bufferCallback;
@@ -195,7 +195,7 @@
}
this.maxIO = maxIO;
- writeSemaphore = new Semaphore(this.maxIO);
+ maxIOSemaphore = new Semaphore(this.maxIO);
this.fileName = fileName;
@@ -245,12 +245,12 @@
log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.fileName);
}
- while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+ while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
{
log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.fileName);
}
- writeSemaphore = null;
+ maxIOSemaphore = null;
if (poller != null)
{
stopPoller();
@@ -294,7 +294,7 @@
{
public void run()
{
- writeSemaphore.acquireUninterruptibly();
+ maxIOSemaphore.acquireUninterruptibly();
long sequence = nextWritingSequence.getAndIncrement();
@@ -319,7 +319,7 @@
}
else
{
- writeSemaphore.acquireUninterruptibly();
+ maxIOSemaphore.acquireUninterruptibly();
long sequence = nextWritingSequence.getAndIncrement();
@@ -350,7 +350,7 @@
startPoller();
}
pendingWrites.up();
- writeSemaphore.acquireUninterruptibly();
+ maxIOSemaphore.acquireUninterruptibly();
try
{
read(handler, position, size, directByteBuffer, aioPackage);
@@ -358,14 +358,14 @@
catch (HornetQException e)
{
// Release only if an exception happened
- writeSemaphore.release();
+ maxIOSemaphore.release();
pendingWrites.down();
throw e;
}
catch (RuntimeException e)
{
// Release only if an exception happened
- writeSemaphore.release();
+ maxIOSemaphore.release();
pendingWrites.down();
throw e;
}
@@ -444,7 +444,7 @@
@SuppressWarnings("unused")
private void callbackDone(final AIOCallback callback, final long sequence, final
ByteBuffer buffer)
{
- writeSemaphore.release();
+ maxIOSemaphore.release();
pendingWrites.down();
@@ -511,7 +511,7 @@
{
log.warn("CallbackError: " + errorMessage);
- writeSemaphore.release();
+ maxIOSemaphore.release();
pendingWrites.down();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java 2009-11-23
14:51:41 UTC (rev 8376)
+++
branches/ClebertCallback/src/main/org/hornetq/core/exception/HornetQException.java 2009-11-23
17:44:05 UTC (rev 8377)
@@ -38,6 +38,9 @@
public static final int UNBLOCKED = 005;
+ public static final int IO_ERROR = 006;
+
+
public static final int QUEUE_DOES_NOT_EXIST = 100;
public static final int QUEUE_EXISTS = 101;
@@ -85,8 +88,6 @@
public static final int NATIVE_ERROR_ALLOCATE_MEMORY = 209;
- public static final int NATIVE_ERROR_IO = 210;
-
public static final int NATIVE_ERROR_AIO_FULL = 211;
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-23
14:51:41 UTC (rev 8376)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-23
17:44:05 UTC (rev 8377)
@@ -38,7 +38,7 @@
boolean exists();
/**
- * For certain operations (like loading) we don't need open the file with full
maxIO
+ * The maximum number of simultaneous writes accepted
* @param maxIO
* @throws Exception
*/
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-23
14:51:41 UTC (rev 8376)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-23
17:44:05 UTC (rev 8377)
@@ -15,9 +15,7 @@
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
@@ -49,9 +47,6 @@
/** The pool for Thread pollers */
private final Executor pollerExecutor;
- /** Context switch on AIO could fire unnecessary flushes, so we use a single thread
for write */
- private final Executor writerExecutor;
-
public AIOSequentialFile(final SequentialFileFactory factory,
final int bufferSize,
@@ -63,9 +58,8 @@
final Executor writerExecutor,
final Executor pollerExecutor)
{
- super(directory, new File(directory + "/" + fileName), factory);
+ super(directory, new File(directory + "/" + fileName), factory,
writerExecutor);
this.maxIO = maxIO;
- this.writerExecutor = writerExecutor;
this.bufferCallback = bufferCallback;
this.pollerExecutor = pollerExecutor;
}
@@ -102,27 +96,13 @@
{
return;
}
+
+ super.close();
+
opened = false;
timedBuffer = null;
- final CountDownLatch donelatch = new CountDownLatch(1);
-
- writerExecutor.execute(new Runnable()
- {
- public void run()
- {
- donelatch.countDown();
- }
- });
-
- while (!donelatch.await(60, TimeUnit.SECONDS))
- {
- log.warn("Executor on file " + getFile().getName() + "
couldn't complete its tasks in 60 seconds.",
- new Exception("Warning: Executor on file " +
getFile().getName() +
- " couldn't complete its tasks in 60
seconds."));
- }
-
aioFile.close();
aioFile = null;
@@ -193,13 +173,18 @@
open(maxIO);
}
- public synchronized void open(final int currentMaxIO) throws Exception
+ public synchronized void open(final int maxIO) throws Exception
{
opened = true;
+
aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor);
- aioFile.open(getFile().getAbsolutePath(), currentMaxIO);
+
+ aioFile.open(getFile().getAbsolutePath(), maxIO);
+
position.set(0);
+
aioFile.setBufferCallback(bufferCallback);
+
this.fileSize = aioFile.size();
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-23
14:51:41 UTC (rev 8376)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-23
17:44:05 UTC (rev 8377)
@@ -42,11 +42,6 @@
private final ReuseBuffersController buffersControl = new ReuseBuffersController();
- /** A single AIO write executor for every AIO File.
- * This is used only for AIO & instant operations. We only need one
executor-thread for the entire journal as we always have only one active file.
- * And even if we had multiple files at a given moment, this should still be ok, as
we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
- private ExecutorService writeExecutor;
-
private ExecutorService pollerExecutor;
// This method exists just to make debug easier.
@@ -149,9 +144,6 @@
{
super.start();
- writeExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
- true));
-
pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
true));
@@ -162,19 +154,6 @@
{
buffersControl.stop();
- writeExecutor.shutdown();
-
- try
- {
- if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
- {
- log.warn("Timed out on AIO writer shutdown", new
Exception("Timed out on AIO writer shutdown"));
- }
- }
- catch (InterruptedException e)
- {
- }
-
pollerExecutor.shutdown();
try
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-23
14:51:41 UTC (rev 8376)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-23
17:44:05 UTC (rev 8377)
@@ -44,13 +44,6 @@
private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
- /**
- *
- * We can't execute callbacks directly from any of the IO module. We need to do it
through another thread,
- * So, we will use an executor for this.
- * */
- protected ExecutorService callbacksExecutor;
-
protected final String journalDir;
protected final TimedBuffer timedBuffer;
@@ -58,7 +51,16 @@
protected final int bufferSize;
protected final long bufferTimeout;
+
+ /**
+ * Asynchronous writes need to be done at another executor.
+ * This needs to be done at NIO, or else we would have the callers thread blocking for
the return.
+ * At AIO this is necessary as context switches on writes would fire flushes at the
kernel.
+ * */
+ protected ExecutorService writeExecutor;
+
+
public AbstractSequentialFactory(final String journalDir,
final boolean buffered,
final int bufferSize,
@@ -86,13 +88,13 @@
timedBuffer.stop();
}
- if (callbacksExecutor != null)
+ if (isSupportsCallbacks())
{
- callbacksExecutor.shutdown();
-
+ writeExecutor.shutdown();
+
try
{
- if (!callbacksExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
{
log.warn("Timed out on AIO writer shutdown", new
Exception("Timed out on AIO writer shutdown"));
}
@@ -101,6 +103,8 @@
{
}
}
+
+
}
public void start()
@@ -109,17 +113,14 @@
{
timedBuffer.start();
}
-
+
if (isSupportsCallbacks())
{
- callbacksExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-callbacks" + System.identityHashCode(this),
+ writeExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" +
System.identityHashCode(this),
true));
}
- else
- {
- callbacksExecutor = null;
- }
+
}
/* (non-Javadoc)
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-23
14:51:41 UTC (rev 8376)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-23
17:44:05 UTC (rev 8377)
@@ -16,7 +16,9 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.journal.IOAsyncTask;
@@ -43,7 +45,7 @@
private File file;
private final String directory;
-
+
protected final SequentialFileFactory factory;
protected long fileSize = 0;
@@ -56,6 +58,9 @@
* This is the class returned to the factory when the file is being activated. */
protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
+ /** Used for asynchronous writes */
+ protected final Executor writerExecutor;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -64,12 +69,16 @@
* @param file
* @param directory
*/
- public AbstractSequentialFile(final String directory, final File file, final
SequentialFileFactory factory)
+ public AbstractSequentialFile(final String directory,
+ final File file,
+ final SequentialFileFactory factory,
+ final Executor writerExecutor)
{
super();
this.file = file;
this.directory = directory;
this.factory = factory;
+ this.writerExecutor = writerExecutor;
}
// Public --------------------------------------------------------
@@ -116,6 +125,29 @@
}
}
+ public synchronized void close() throws Exception
+ {
+ final CountDownLatch donelatch = new CountDownLatch(1);
+
+ if (writerExecutor != null)
+ {
+ writerExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ donelatch.countDown();
+ }
+ });
+
+ while (!donelatch.await(60, TimeUnit.SECONDS))
+ {
+ log.warn("Executor on file " + getFile().getName() + "
couldn't complete its tasks in 60 seconds.",
+ new Exception("Warning: Executor on file " +
getFile().getName() +
+ " couldn't complete its tasks in 60
seconds."));
+ }
+ }
+ }
+
public final boolean fits(final int size)
{
if (timedBuffer == null)
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-23
14:51:41 UTC (rev 8376)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-23
17:44:05 UTC (rev 8377)
@@ -19,7 +19,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -41,14 +44,23 @@
private RandomAccessFile rfile;
- public NIOSequentialFile(final SequentialFileFactory factory, final String directory,
final String fileName)
+ /** The write semaphore here is only used when writing asynchronously */
+ private Semaphore maxIOSemaphore;
+
+ private final int defaultMaxIO;
+
+ private int maxIO;
+
+ public NIOSequentialFile(final SequentialFileFactory factory, final String directory,
final String fileName, final int maxIO, final Executor writerExecutor)
{
- super(directory, new File(directory + "/" + fileName), factory);
+ super(directory, new File(directory + "/" + fileName), factory,
writerExecutor);
+ this.defaultMaxIO = maxIO;
}
- public NIOSequentialFile(final SequentialFileFactory factory, final File file)
+ public NIOSequentialFile(final SequentialFileFactory factory, final File file, final
int maxIO, final Executor writerExecutor)
{
- super(file.getParent(), new File(file.getPath()), factory);
+ super(file.getParent(), new File(file.getPath()), factory, writerExecutor);
+ this.defaultMaxIO = maxIO;
}
public int getAlignment()
@@ -66,20 +78,28 @@
return channel != null;
}
+ /** this.maxIO represents the default maxIO.
+ * Some operations while initializing files on the journal may require a different
maxIO */
public synchronized void open() throws Exception
{
+ open(this.defaultMaxIO);
+ }
+
+ public void open(final int maxIO) throws Exception
+ {
rfile = new RandomAccessFile(getFile(), "rw");
channel = rfile.getChannel();
fileSize = channel.size();
+
+ if (writerExecutor != null)
+ {
+ this.maxIOSemaphore = new Semaphore(maxIO);
+ this.maxIO = maxIO;
+ }
}
- public void open(final int currentMaxIO) throws Exception
- {
- open();
- }
-
public void fill(final int position, final int size, final byte fillCharacter) throws
Exception
{
ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -112,6 +132,8 @@
public synchronized void close() throws Exception
{
+ super.close();
+
if (channel != null)
{
channel.close();
@@ -126,6 +148,14 @@
rfile = null;
+ if (maxIOSemaphore != null)
+ {
+ while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+ {
+ log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.getFileName());
+ }
+ }
+
notifyAll();
}
@@ -153,7 +183,7 @@
{
if (callback != null)
{
- callback.onError(-1, e.getLocalizedMessage());
+ callback.onError(HornetQException.IO_ERROR, e.getLocalizedMessage());
}
throw e;
@@ -195,7 +225,7 @@
public SequentialFile copy()
{
- return new NIOSequentialFile(factory, getFile());
+ return new NIOSequentialFile(factory, getFile(), maxIO, writerExecutor);
}
public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask
callback)
@@ -220,6 +250,42 @@
internalWrite(bytes, sync, null);
}
+ private void internalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
+ {
+ if (writerExecutor == null)
+ {
+ doInternalWrite(bytes, sync, callback);
+ }
+ else
+ {
+ // This is a flow control on writing, just like maxAIO on libaio
+ maxIOSemaphore.acquire();
+
+ writerExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ try
+ {
+ doInternalWrite(bytes, sync, callback);
+ }
+ catch (Exception e)
+ {
+ log.warn("Exception on submitting write", e);
+ callback.onError(HornetQException.IO_ERROR, e.getMessage());
+ }
+ }
+ finally
+ {
+ maxIOSemaphore.release();
+ }
+ }
+ });
+ }
+ }
+
/**
* @param bytes
* @param sync
@@ -227,8 +293,9 @@
* @throws IOException
* @throws Exception
*/
- private void internalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
+ private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
{
+
position.addAndGet(bytes.limit());
channel.write(bytes);
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-23
14:51:41 UTC (rev 8376)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-23
17:44:05 UTC (rev 8377)
@@ -64,9 +64,15 @@
}
// maxIO is ignored on NIO
- public SequentialFile createSequentialFile(final String fileName, final int maxIO)
+ public SequentialFile createSequentialFile(final String fileName, int maxIO)
{
- return new NIOSequentialFile(this, journalDir, fileName);
+ if (maxIO < 0)
+ {
+ // A single threaded IO
+ maxIO = 1;
+ }
+
+ return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
}
public boolean isSupportsCallbacks()