Author: clebert.suconic(a)jboss.com
Date: 2009-11-20 11:50:48 -0500 (Fri, 20 Nov 2009)
New Revision: 8346
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
Log:
AIO order
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20
16:44:22 UTC (rev 8345)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20
16:50:48 UTC (rev 8346)
@@ -46,8 +46,15 @@
private static boolean loaded = false;
- private static int EXPECTED_NATIVE_VERSION = 25;
+ private static int EXPECTED_NATIVE_VERSION = 26;
+
+ /** Used to determine the next writing sequence */
+ private AtomicInteger nextWritingSequence = new AtomicInteger(0);
+ /** Used to determine the next writing sequence.
+ * This is accessed from a single thread (the Poller Thread) */
+ private int readSequence = 0;
+
public static void addMax(final int io)
{
totalMaxIO.addAndGet(io);
@@ -149,10 +156,6 @@
// AIO using a single thread.
private final Executor writeExecutor;
- // We can't use the same thread on the callbacks
- // as the callbacks may perform other IO operations back what could cause dead locks
- private final Executor callbackExecutor;
-
private final Executor pollerExecutor;
// AsynchronousFile implementation
---------------------------------------------------
@@ -161,11 +164,10 @@
* @param writeExecutor It needs to be a single Thread executor. If null it will use
the user thread to execute write operations
* @param pollerExecutor The thread pool that will initialize poller handlers
*/
- public AsynchronousFileImpl(final Executor writeExecutor, final Executor
pollerExecutor, final Executor callbackExecutor)
+ public AsynchronousFileImpl(final Executor writeExecutor, final Executor
pollerExecutor)
{
this.writeExecutor = writeExecutor;
this.pollerExecutor = pollerExecutor;
- this.callbackExecutor = callbackExecutor;
}
public void open(final String fileName, final int maxIO) throws HornetQException
@@ -207,6 +209,8 @@
}
opened = true;
addMax(this.maxIO);
+ nextWritingSequence.set(0);
+ readSequence = 0;
}
finally
{
@@ -278,18 +282,20 @@
public void run()
{
writeSemaphore.acquireUninterruptibly();
+
+ int sequence = nextWritingSequence.getAndIncrement();
try
{
- write(handler, position, size, directByteBuffer, aioCallback);
+ write(handler, sequence, position, size, directByteBuffer,
aioCallback);
}
catch (HornetQException e)
{
- callbackError(aioCallback, directByteBuffer, e.getCode(),
e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer, e.getCode(),
e.getMessage());
}
catch (RuntimeException e)
{
- callbackError(aioCallback, directByteBuffer,
HornetQException.INTERNAL_ERROR, e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer,
HornetQException.INTERNAL_ERROR, e.getMessage());
}
}
});
@@ -298,17 +304,19 @@
{
writeSemaphore.acquireUninterruptibly();
+ int sequence = nextWritingSequence.getAndIncrement();
+
try
{
- write(handler, position, size, directByteBuffer, aioCallback);
+ write(handler, sequence, position, size, directByteBuffer, aioCallback);
}
catch (HornetQException e)
{
- callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer, e.getCode(),
e.getMessage());
}
catch (RuntimeException e)
{
- callbackError(aioCallback, directByteBuffer, HornetQException.INTERNAL_ERROR,
e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer,
HornetQException.INTERNAL_ERROR, e.getMessage());
}
}
@@ -419,17 +427,11 @@
@SuppressWarnings("unused")
// Called by the JNI layer.. just ignore the
// warning
- private void callbackDone(final AIOCallback callback, final ByteBuffer buffer)
+ private void callbackDone(final AIOCallback callback, final int sequence, final
ByteBuffer buffer)
{
writeSemaphore.release();
pendingWrites.down();
- callbackExecutor.execute(new Runnable()
- {
- public void run()
- {
- callback.done();
- }
- });
+ callback.done();
// The buffer is not sent on callback for read operations
if (bufferCallback != null && buffer != null)
@@ -440,7 +442,7 @@
// Called by the JNI layer.. just ignore the
// warning
- private void callbackError(final AIOCallback callback, final ByteBuffer buffer, final
int errorCode, final String errorMessage)
+ private void callbackError(final AIOCallback callback, final int sequence, final
ByteBuffer buffer, final int errorCode, final String errorMessage)
{
log.warn("CallbackError: " + errorMessage);
writeSemaphore.release();
@@ -527,7 +529,7 @@
private native long size0(long handle) throws HornetQException;
- private native void write(long handle, long position, long size, ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
+ private native void write(long handle, int sequence, long position, long size,
ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
private native void read(long handle, long position, long size, ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-20
16:44:22 UTC (rev 8345)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-20
16:50:48 UTC (rev 8346)
@@ -60,11 +60,10 @@
final String fileName,
final int maxIO,
final BufferCallback bufferCallback,
- final Executor callbackExecutor,
final Executor writerExecutor,
final Executor pollerExecutor)
{
- super(callbackExecutor, directory, new File(directory + "/" + fileName),
factory);
+ super(directory, new File(directory + "/" + fileName), factory);
this.maxIO = maxIO;
this.writerExecutor = writerExecutor;
this.bufferCallback = bufferCallback;
@@ -94,7 +93,7 @@
public SequentialFile copy()
{
- return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(),
maxIO, bufferCallback, callbackExecutor, writerExecutor, pollerExecutor);
+ return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(),
maxIO, bufferCallback, writerExecutor, pollerExecutor);
}
public synchronized void close() throws Exception
@@ -197,7 +196,7 @@
public synchronized void open(final int currentMaxIO) throws Exception
{
opened = true;
- aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor,
callbackExecutor);
+ aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor);
aioFile.open(getFile().getAbsolutePath(), currentMaxIO);
position.set(0);
aioFile.setBufferCallback(bufferCallback);
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-20
16:44:22 UTC (rev 8345)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-20
16:50:48 UTC (rev 8346)
@@ -84,7 +84,6 @@
fileName,
maxIO,
buffersControl.callback,
- callbacksExecutor,
writeExecutor,
pollerExecutor);
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-20
16:44:22 UTC (rev 8345)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-20
16:50:48 UTC (rev 8346)
@@ -44,11 +44,6 @@
private final String directory;
- /** We can't execute callbacks while inside the locks, as more IO operations could
be performed, what could cause serious dead locks. */
- protected final Executor callbackExecutor;
-
-
-
protected final SequentialFileFactory factory;
protected long fileSize = 0;
@@ -69,13 +64,12 @@
* @param file
* @param directory
*/
- public AbstractSequentialFile(final Executor executor, final String directory, final
File file, final SequentialFileFactory factory)
+ public AbstractSequentialFile(final String directory, final File file, final
SequentialFileFactory factory)
{
super();
this.file = file;
this.directory = directory;
this.factory = factory;
- this.callbackExecutor = executor;
}
// Public --------------------------------------------------------
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-20
16:44:22 UTC (rev 8345)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-20
16:50:48 UTC (rev 8346)
@@ -41,14 +41,14 @@
private RandomAccessFile rfile;
- public NIOSequentialFile(final SequentialFileFactory factory, final Executor executor,
final String directory, final String fileName)
+ public NIOSequentialFile(final SequentialFileFactory factory, final String directory,
final String fileName)
{
- super(executor, directory, new File(directory + "/" + fileName),
factory);
+ super(directory, new File(directory + "/" + fileName), factory);
}
public NIOSequentialFile(final SequentialFileFactory factory, final File file)
{
- super(null, file.getParent(), new File(file.getPath()), factory);
+ super(file.getParent(), new File(file.getPath()), factory);
}
public int getAlignment()
@@ -240,20 +240,7 @@
if (callback != null)
{
- if (callbackExecutor == null)
- {
- callback.done();
- }
- else
- {
- callbackExecutor.execute(new Runnable()
- {
- public void run()
- {
- callback.done();
- }
- });
- }
+ callback.done();
}
}
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-20
16:44:22 UTC (rev 8345)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-20
16:50:48 UTC (rev 8346)
@@ -66,7 +66,7 @@
// maxIO is ignored on NIO
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
- return new NIOSequentialFile(this, this.callbacksExecutor, journalDir, fileName);
+ return new NIOSequentialFile(this, journalDir, fileName);
}
public boolean isSupportsCallbacks()
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-20
16:44:22 UTC (rev 8345)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-20
16:50:48 UTC (rev 8346)
@@ -13,6 +13,7 @@
package org.hornetq.core.persistence.impl.journal;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -116,12 +117,15 @@
{
if (stored >= minimalStore && replicated >= minimalReplicated)
{
- for (TaskHolder holder : tasks)
+ Iterator<TaskHolder> iter = tasks.iterator();
+ while (iter.hasNext())
{
+ TaskHolder holder = iter.next();
if (!holder.executed && stored >= holder.storeLined &&
replicated >= holder.replicationLined)
{
holder.executed = true;
holder.task.done();
+ iter.remove();
}
}
}