[jboss-cvs] JBoss Messaging SVN: r7106 - branches/Branch_JBM2_Perf_Clebert.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu May 28 14:41:50 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-05-28 14:41:50 -0400 (Thu, 28 May 2009)
New Revision: 7106
Added:
branches/Branch_JBM2_Perf_Clebert/revert-aio.patch
Log:
Patch to revert AIO changes, that I'm using on tests
Added: branches/Branch_JBM2_Perf_Clebert/revert-aio.patch
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/revert-aio.patch (rev 0)
+++ branches/Branch_JBM2_Perf_Clebert/revert-aio.patch 2009-05-28 18:41:50 UTC (rev 7106)
@@ -0,0 +1,2662 @@
+Index: native/src/LibAIOController.cpp
+===================================================================
+--- native/src/LibAIOController.cpp (revision 7105)
++++ native/src/LibAIOController.cpp (working copy)
+@@ -35,6 +35,8 @@
+ #include "Version.h"
+
+
++
++
+ /*
+ * Class: org_jboss_jaio_libaioimpl_LibAIOController
+ * Method: init
+@@ -108,8 +110,6 @@
+ }
+ }
+
+-
+-// Fast memset on buffer
+ JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_resetBuffer
+ (JNIEnv *env, jclass, jobject jbuffer, jint size)
+ {
+@@ -125,46 +125,7 @@
+
+ }
+
+-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_destroyBuffer
+- (JNIEnv * env, jclass, jobject jbuffer)
+-{
+- void * buffer = env->GetDirectBufferAddress(jbuffer);
+- free(buffer);
+-}
+
+-JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_newNativeBuffer
+- (JNIEnv * env, jclass, jlong size)
+-{
+- try
+- {
+-
+- if (size % ALIGNMENT)
+- {
+- throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Buffer size needs to be aligned to 512");
+- return 0;
+- }
+-
+-
+- // This will allocate a buffer, aligned by 512.
+- // Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
+- void * buffer = 0;
+- if (::posix_memalign(&buffer, 512, size))
+- {
+- throwException(env, NATIVE_ERROR_INTERNAL, "Error on posix_memalign");
+- return 0;
+- }
+-
+- memset(buffer, 0, (size_t)size);
+-
+- jobject jbuffer = env->NewDirectByteBuffer(buffer, size);
+- return jbuffer;
+- }
+- catch (AIOException& e)
+- {
+- throwException(env, e.getErrorCode(), e.what());
+- return 0;
+- }
+-}
+
+ JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
+ (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
+@@ -173,14 +134,12 @@
+ {
+ AIOController * controller = (AIOController *) controllerAddress;
+ void * buffer = env->GetDirectBufferAddress(jbuffer);
+-
+ if (buffer == 0)
+ {
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Direct Buffer used");
+ return;
+ }
+
+-
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
+
+ controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
+Index: native/src/AsyncFile.cpp
+===================================================================
+--- native/src/AsyncFile.cpp (revision 7105)
++++ native/src/AsyncFile.cpp (working copy)
+@@ -13,7 +13,7 @@
+ USA
+
+ The GNU Lesser General Public License is available in the file COPYING.
+-
++
+ Software written by Clebert Suconic (csuconic at redhat dot com)
+ */
+
+@@ -48,12 +48,12 @@
+ std::string io_error(int rc)
+ {
+ std::stringstream buffer;
+-
++
+ if (rc == -ENOSYS)
+ buffer << "AIO not in this kernel";
+- else
++ else
+ buffer << "Error:= " << strerror(-rc);
+-
++
+ return buffer.str();
+ }
+
+@@ -62,27 +62,27 @@
+ {
+ ::pthread_mutex_init(&fileMutex,0);
+ ::pthread_mutex_init(&pollerMutex,0);
+-
++
+ maxIO = _maxIO;
+ fileName = _fileName;
+ if (io_queue_init(maxIO, &aioContext))
+ {
+- throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio, out of AIO Handlers");
++ throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio");
+ }
+
+ fileHandle = ::open(fileName.data(), O_RDWR | O_CREAT | O_DIRECT, 0666);
+ if (fileHandle < 0)
+ {
+ io_queue_release(aioContext);
+- throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file");
++ throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file");
+ }
+-
++
+ #ifdef DEBUG
+ fprintf (stderr,"File Handle %d", fileHandle);
+ #endif
+
+ events = (struct io_event *)malloc (maxIO * sizeof (struct io_event));
+-
++
+ if (events == 0)
+ {
+ throw AIOException (NATIVE_ERROR_CANT_ALLOCATE_QUEUE, "Can't allocate ioEvents");
+@@ -112,17 +112,17 @@
+
+ void AsyncFile::pollEvents(THREAD_CONTEXT threadContext)
+ {
+-
++
+ LockClass lock(&pollerMutex);
+ pollerRunning=1;
+-
++
+ // TODO: Maybe we don't need to wait for one second.... we just keep waiting forever, and use something to interrupt it
+ // maybe an invalid write to interrupt it.
+ struct timespec oneSecond;
+ oneSecond.tv_sec = 1;
+ oneSecond.tv_nsec = 0;
+-
+-
++
++
+ while (pollerRunning)
+ {
+ if (isException(threadContext))
+@@ -130,15 +130,15 @@
+ return;
+ }
+ int result = io_getevents(this->aioContext, 1, maxIO, events, 0);
+-
+-
++
++
+ #ifdef DEBUG
+ fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
+ #endif
+-
++
+ if (result > 0)
+ {
+-
++
+ #ifdef DEBUG
+ fprintf (stdout, "Received %d events\n", result);
+ fflush(stdout);
+@@ -147,9 +147,9 @@
+
+ for (int i=0; i<result; i++)
+ {
+-
++
+ struct iocb * iocbp = events[i].obj;
+-
++
+ if (iocbp->data == (void *) -1)
+ {
+ pollerRunning = 0;
+@@ -160,7 +160,7 @@
+ else
+ {
+ CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data;
+-
++
+ long result = events[i].res;
+ if (result < 0)
+ {
+@@ -172,13 +172,13 @@
+ adapter->done(threadContext);
+ }
+ }
+-
++
+ delete iocbp;
+ }
+ }
+ #ifdef DEBUG
+ controller->log(threadContext, 2, "Poller finished execution");
+-#endif
++#endif
+ }
+
+
+@@ -189,18 +189,18 @@
+ {
+ throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "You can only pre allocate files in multiples of 512");
+ }
+-
++
+ void * preAllocBuffer = 0;
+ if (posix_memalign(&preAllocBuffer, 512, size))
+ {
+ throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign");
+ }
+-
++
+ memset(preAllocBuffer, fillChar, size);
+-
+-
++
++
+ if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+-
++
+ for (int i=0; i<blocks; i++)
+ {
+ if (::write(fileHandle, preAllocBuffer, size)<0)
+@@ -208,9 +208,9 @@
+ throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "Error pre allocating the file");
+ }
+ }
+-
++
+ if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (NATIVE_ERROR_IO, "Error positioning the file");
+-
++
+ free (preAllocBuffer);
+ }
+
+@@ -223,7 +223,7 @@
+
+ int tries = 0;
+ int result = 0;
+-
++
+ while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+ {
+ #ifdef DEBUG
+@@ -237,7 +237,7 @@
+ #endif
+ controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
+ }
+-
++
+ if (tries > TRIES_BEFORE_ERROR)
+ {
+ #ifdef DEBUG
+@@ -247,7 +247,7 @@
+ }
+ ::usleep(WAIT_FOR_SPOT);
+ }
+-
++
+ if (result<0)
+ {
+ std::stringstream str;
+@@ -265,7 +265,7 @@
+
+ int tries = 0;
+ int result = 0;
+-
++
+ while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+ {
+ #ifdef DEBUG
+@@ -279,7 +279,7 @@
+ #endif
+ controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
+ }
+-
++
+ if (tries > TRIES_BEFORE_ERROR)
+ {
+ #ifdef DEBUG
+@@ -289,7 +289,7 @@
+ }
+ ::usleep(WAIT_FOR_SPOT);
+ }
+-
++
+ if (result<0)
+ {
+ std::stringstream str;
+@@ -301,7 +301,7 @@
+ long AsyncFile::getSize()
+ {
+ struct stat64 statBuffer;
+-
++
+ if (fstat64(fileHandle, &statBuffer) < 0)
+ {
+ return -1l;
+@@ -313,21 +313,21 @@
+ void AsyncFile::stopPoller(THREAD_CONTEXT threadContext)
+ {
+ pollerRunning = 0;
+-
+-
++
++
+ struct iocb * iocb = new struct iocb();
+ ::io_prep_pwrite(iocb, fileHandle, 0, 0, 0);
+ iocb->data = (void *) -1;
+
+ int result = 0;
+-
++
+ while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+ {
+ fprintf(stderr, "Couldn't send request to stop poller, trying again");
+ controller->log(threadContext, 1, "Couldn't send request to stop poller, trying again");
+ ::usleep(WAIT_FOR_SPOT);
+ }
+-
++
+ // Waiting the Poller to finish (by giving up the lock)
+ LockClass lock(&pollerMutex);
+ }
+Index: native/src/Version.h
+===================================================================
+--- native/src/Version.h (revision 7105)
++++ native/src/Version.h (working copy)
+@@ -1,5 +1,5 @@
+
+ #ifndef _VERSION_NATIVE_AIO
+-#define _VERSION_NATIVE_AIO 19
++#define _VERSION_NATIVE_AIO 17
+ #endif
+
+Index: native/bin/libJBMLibAIO64.so
+===================================================================
+Cannot display: file marked as a binary type.
+svn:mime-type = application/octet-stream
+Index: src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java
+===================================================================
+--- src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java (revision 7105)
++++ src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java (working copy)
+@@ -60,7 +60,7 @@
+ throw new NullPointerException("buffer");
+ }
+
+- this.buffer = buffer;
++ this.buffer = buffer.slice();
+ capacity = buffer.remaining();
+ }
+
+Index: src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
+===================================================================
+--- src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java (revision 7105)
++++ src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java (working copy)
+@@ -23,7 +23,6 @@
+ package org.jboss.messaging.core.asyncio.impl;
+
+ import java.nio.ByteBuffer;
+-import java.util.concurrent.Executor;
+ import java.util.concurrent.Semaphore;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+@@ -35,7 +34,6 @@
+ import org.jboss.messaging.core.asyncio.BufferCallback;
+ import org.jboss.messaging.core.exception.MessagingException;
+ import org.jboss.messaging.core.logging.Logger;
+-import org.jboss.messaging.utils.VariableLatch;
+
+ /**
+ *
+@@ -47,7 +45,8 @@
+ */
+ public class AsynchronousFileImpl implements AsynchronousFile
+ {
+- // Static ----------------------------------------------------------------------------
++ // Static
++ // -------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(AsynchronousFileImpl.class);
+
+@@ -55,7 +54,7 @@
+
+ private static boolean loaded = false;
+
+- private static int EXPECTED_NATIVE_VERSION = 19;
++ private static int EXPECTED_NATIVE_VERSION = 17;
+
+ public static void addMax(final int io)
+ {
+@@ -125,21 +124,18 @@
+ return loaded;
+ }
+
+- // Attributes ------------------------------------------------------------------------
++ // Attributes
++ // ---------------------------------------------------------------------------------
+
+ private boolean opened = false;
+
+ private String fileName;
+
+- private final VariableLatch pollerLatch = new VariableLatch();
++ private volatile Thread poller;
+
+- private volatile Runnable poller;
+-
+ private int maxIO;
+
+ private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
+-
+- private final VariableLatch pendingWrites = new VariableLatch();
+
+ private Semaphore writeSemaphore;
+
+@@ -150,28 +146,11 @@
+ */
+ private long handler;
+
+- // A context switch on AIO would make it to synchronize the disk before
+- // switching to the new thread, what would cause
+- // serious performance problems. Because of that we make all the writes on
+- // AIO using a single thread.
+- private final Executor writeExecutor;
++ // AsynchronousFile implementation
++ // ------------------------------------------------------------------------------------
+
+- private final Executor pollerExecutor;
+-
+- // AsynchronousFile implementation ---------------------------------------------------
+-
+- /**
+- * @param writeExecutor It needs to be a single Thread executor. If null it will use the user thread to execute write operations
+- * @param pollerExecutor The thread pool that will initialize poller handlers
+- */
+- public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor)
++ public void open(final String fileName, final int maxIO)
+ {
+- this.writeExecutor = writeExecutor;
+- this.pollerExecutor = pollerExecutor;
+- }
+-
+- public void open(final String fileName, final int maxIO) throws MessagingException
+- {
+ writeLock.lock();
+
+ try
+@@ -186,27 +165,7 @@
+
+ this.fileName = fileName;
+
+- try
+- {
+- handler = init(fileName, this.maxIO, log);
+- }
+- catch (MessagingException e)
+- {
+- MessagingException ex = null;
+- if (e.getCode() == MessagingException.NATIVE_ERROR_CANT_INITIALIZE_AIO)
+- {
+- ex = new MessagingException(e.getCode(),
+- "Can't initialize AIO. Currently AIO in use = " + totalMaxIO.get() +
+- ", trying to allocate more " +
+- maxIO,
+- e);
+- }
+- else
+- {
+- ex = e;
+- }
+- throw ex;
+- }
++ handler = init(fileName, this.maxIO, log);
+ opened = true;
+ addMax(this.maxIO);
+ }
+@@ -225,20 +184,19 @@
+ try
+ {
+
+- while (!pendingWrites.waitCompletion(60000))
+- {
+- log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
+- }
+-
+ while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+ {
+- log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
++ log.warn("Couldn't acquire lock after 60 seconds on AIO",
++ new Exception("Warning: Couldn't acquire lock after 60 seconds on AIO"));
+ }
+-
+ writeSemaphore = null;
+ if (poller != null)
+ {
+- stopPoller();
++ Thread currentPoller = poller;
++ stopPoller(handler);
++ // We need to make sure we won't call close until Poller is
++ // completely done, or we might get beautiful GPFs
++ currentPoller.join();
+ }
+
+ closeInternal(handler);
+@@ -258,61 +216,30 @@
+ public void write(final long position,
+ final long size,
+ final ByteBuffer directByteBuffer,
+- final AIOCallback aioCallback)
++ final AIOCallback aioPackage) throws MessagingException
+ {
+- if (aioCallback == null)
+- {
+- throw new NullPointerException("Null Callback");
+- }
+-
+ checkOpened();
+ if (poller == null)
+ {
+ startPoller();
+ }
+-
+- pendingWrites.up();
+-
+- if (writeExecutor != null)
++ writeSemaphore.acquireUninterruptibly();
++ try
+ {
+- writeExecutor.execute(new Runnable()
+- {
+- public void run()
+- {
+- writeSemaphore.acquireUninterruptibly();
+-
+- try
+- {
+- write(handler, position, size, directByteBuffer, aioCallback);
+- }
+- catch (MessagingException e)
+- {
+- callbackError(aioCallback, e.getCode(), e.getMessage());
+- }
+- catch (RuntimeException e)
+- {
+- callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+- }
+- }
+- });
++ write(handler, position, size, directByteBuffer, aioPackage);
+ }
+- else
++ catch (MessagingException e)
+ {
+- writeSemaphore.acquireUninterruptibly();
+-
+- try
+- {
+- write(handler, position, size, directByteBuffer, aioCallback);
+- }
+- catch (MessagingException e)
+- {
+- callbackError(aioCallback, e.getCode(), e.getMessage());
+- }
+- catch (RuntimeException e)
+- {
+- callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+- }
++ // Release only if an exception happened
++ writeSemaphore.release();
++ throw e;
+ }
++ catch (RuntimeException e)
++ {
++ // Release only if an exception happened
++ writeSemaphore.release();
++ throw e;
++ }
+
+ }
+
+@@ -326,7 +253,6 @@
+ {
+ startPoller();
+ }
+- pendingWrites.up();
+ writeSemaphore.acquireUninterruptibly();
+ try
+ {
+@@ -336,14 +262,12 @@
+ {
+ // Release only if an exception happened
+ writeSemaphore.release();
+- pendingWrites.down();
+ throw e;
+ }
+ catch (RuntimeException e)
+ {
+ // Release only if an exception happened
+ writeSemaphore.release();
+- pendingWrites.down();
+ throw e;
+ }
+ }
+@@ -370,22 +294,15 @@
+ return fileName;
+ }
+
+- /**
+- * This needs to be synchronized because of
+- * http://bugs.sun.com/view_bug.do?bug_id=6791815
+- * http://mail.openjdk.java.net/pipermail/hotspot-runtime-dev/2009-January/000386.html
+- *
+- * @param size
+- * @return
+- */
+- public synchronized static ByteBuffer newBuffer(final int size)
++ // Should we make this method static?
++ public ByteBuffer newBuffer(final int size)
+ {
+- if (size % 512 != 0)
++ if (size % getBlockSize() != 0)
+ {
+ throw new RuntimeException("Buffer size needs to be aligned to 512");
+ }
+
+- return newNativeBuffer(size);
++ return ByteBuffer.allocateDirect(size);
+ }
+
+ public void setBufferCallback(final BufferCallback callback)
+@@ -393,20 +310,9 @@
+ bufferCallback = callback;
+ }
+
+- /** Return the JNI handler used on C++ */
+- public long getHandler()
+- {
+- return handler;
+- }
++ // Private
++ // ---------------------------------------------------------------------------------
+
+- public static void clearBuffer(final ByteBuffer buffer)
+- {
+- resetBuffer(buffer, buffer.limit());
+- buffer.position(0);
+- }
+-
+- // Private ---------------------------------------------------------------------------
+-
+ /** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
+ @SuppressWarnings("unused")
+ // Called by the JNI layer.. just ignore the
+@@ -414,7 +320,6 @@
+ private void callbackDone(final AIOCallback callback, final ByteBuffer buffer)
+ {
+ writeSemaphore.release();
+- pendingWrites.down();
+ callback.done();
+ if (bufferCallback != null)
+ {
+@@ -422,13 +327,13 @@
+ }
+ }
+
++ @SuppressWarnings("unused")
+ // Called by the JNI layer.. just ignore the
+ // warning
+ private void callbackError(final AIOCallback callback, final int errorCode, final String errorMessage)
+ {
+ log.warn("CallbackError: " + errorMessage);
+ writeSemaphore.release();
+- pendingWrites.down();
+ callback.onError(errorCode, errorMessage);
+ }
+
+@@ -450,11 +355,10 @@
+
+ if (poller == null)
+ {
+- pollerLatch.up();
+- poller = new PollerRunnable();
++ poller = new PollerThread();
+ try
+ {
+- pollerExecutor.execute(poller);
++ poller.start();
+ }
+ catch (Exception ex)
+ {
+@@ -476,28 +380,13 @@
+ }
+ }
+
+- /**
+- * @throws MessagingException
+- * @throws InterruptedException
+- */
+- private void stopPoller() throws MessagingException, InterruptedException
+- {
+- stopPoller(handler);
+- // We need to make sure we won't call close until Poller is
+- // completely done, or we might get beautiful GPFs
+- pollerLatch.waitCompletion();
+- }
++ // Native
++ // ------------------------------------------------------------------------------------------
+
+- // Native ----------------------------------------------------------------------------
++ public static native void resetBuffer(ByteBuffer directByteBuffer, int size);
+
+- private static native void resetBuffer(ByteBuffer directByteBuffer, int size);
++ private static native long init(String fileName, int maxIO, Logger logger);
+
+- public static native void destroyBuffer(ByteBuffer buffer);
+-
+- private static native ByteBuffer newNativeBuffer(long size);
+-
+- private static native long init(String fileName, int maxIO, Logger logger) throws MessagingException;
+-
+ private native long size0(long handle) throws MessagingException;
+
+ private native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws MessagingException;
+@@ -516,14 +405,17 @@
+ /** Poll asynchrounous events from internal queues */
+ private static native void internalPollEvents(long handler);
+
+- // Inner classes ---------------------------------------------------------------------
++ // Inner classes
++ // -----------------------------------------------------------------------------------------
+
+- private class PollerRunnable implements Runnable
++ private class PollerThread extends Thread
+ {
+- PollerRunnable()
++ PollerThread()
+ {
++ super("NativePoller for " + fileName);
+ }
+
++ @Override
+ public void run()
+ {
+ try
+@@ -536,7 +428,6 @@
+ // Case the poller thread is interrupted, this will allow us to
+ // restart the thread when required
+ poller = null;
+- pollerLatch.down();
+ }
+ }
+ }
+Index: src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
+===================================================================
+--- src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java (revision 7105)
++++ src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java (working copy)
+@@ -40,9 +40,8 @@
+ * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error
+ * @param fileName
+ * @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
+- * @throws MessagingException
+ */
+- void open(String fileName, int maxIO) throws MessagingException;
++ void open(String fileName, int maxIO);
+
+ /**
+ * Warning: This function will perform a synchronous IO, probably translating to a fstat call
+@@ -50,13 +49,14 @@
+ * */
+ long size() throws MessagingException;
+
+- /** Any error will be reported on the callback interface */
+- void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
++ void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage) throws MessagingException;
+
+- void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws MessagingException;
++ void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage) throws MessagingException;
+
+ void fill(long position, int blocks, long size, byte fillChar) throws MessagingException;
+
++ ByteBuffer newBuffer(int size);
++
+ void setBufferCallback(BufferCallback callback);
+
+ int getBlockSize();
+Index: src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java (revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java (working copy)
+@@ -54,11 +54,10 @@
+
+ BufferCallback bufferCallback;
+
+- public NIOSequentialFile(final String directory, final String fileName, final BufferCallback bufferCallback)
++ public NIOSequentialFile(final String directory, final String fileName)
+ {
+ this.directory = directory;
+ file = new File(directory + "/" + fileName);
+- this.bufferCallback = bufferCallback;
+ }
+
+ public int getAlignment()
+@@ -93,6 +92,11 @@
+ open();
+ }
+
++ public void setBufferCallback(final BufferCallback callback)
++ {
++ bufferCallback = callback;
++ }
++
+ public void fill(final int position, final int size, final byte fillCharacter) throws Exception
+ {
+ ByteBuffer bb = ByteBuffer.allocateDirect(size);
+Index: src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java (revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java (working copy)
+@@ -25,7 +25,8 @@
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.concurrent.CountDownLatch;
+-import java.util.concurrent.Executor;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+
+@@ -59,26 +60,18 @@
+ private AsynchronousFile aioFile;
+
+ private final AtomicLong position = new AtomicLong(0);
+-
+- private BufferCallback bufferCallback;
+
+- /** A context switch on AIO would make it to synchronize the disk before
+- switching to the new thread, what would cause
+- serious performance problems. Because of that we make all the writes on
+- AIO using a single thread. */
+- private final Executor executor;
++ // A context switch on AIO would make it to synchronize the disk before
++ // switching to the new thread, what would cause
++ // serious performance problems. Because of that we make all the writes on
++ // AIO using a single thread.
++ private ExecutorService executor;
+
+- /** The pool for Thread pollers */
+- private final Executor pollerExecutor;
+-
+- public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final BufferCallback bufferCallback, final Executor executor, final Executor pollerExecutor)
++ public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO)
+ {
+ this.journalDir = journalDir;
+ this.fileName = fileName;
+ this.maxIO = maxIO;
+- this.bufferCallback = bufferCallback;
+- this.executor = executor;
+- this.pollerExecutor = pollerExecutor;
+ }
+
+ public boolean isOpen()
+@@ -106,20 +99,10 @@
+ {
+ checkOpened();
+ opened = false;
++ executor.shutdown();
+
+- final CountDownLatch donelatch = new CountDownLatch(1);
+-
+- executor.execute(new Runnable()
++ while (!executor.awaitTermination(60, TimeUnit.SECONDS))
+ {
+- public void run()
+- {
+- donelatch.countDown();
+- }
+- });
+-
+-
+- while (!donelatch.await(60, TimeUnit.SECONDS))
+- {
+ log.warn("Executor on file " + fileName + " couldn't complete its tasks in 60 seconds.",
+ new Exception("Warning: Executor on file " + fileName + " couldn't complete its tasks in 60 seconds."));
+ }
+@@ -207,10 +190,10 @@
+ public synchronized void open(final int currentMaxIO) throws Exception
+ {
+ opened = true;
++ executor = Executors.newSingleThreadExecutor();
+ aioFile = newFile();
+ aioFile.open(journalDir + "/" + fileName, currentMaxIO);
+ position.set(0);
+- aioFile.setBufferCallback(bufferCallback);
+
+ }
+
+@@ -306,7 +289,7 @@
+ */
+ protected AsynchronousFile newFile()
+ {
+- return new AsynchronousFileImpl(executor, pollerExecutor);
++ return new AsynchronousFileImpl();
+ }
+
+ // Private methods
+@@ -317,7 +300,24 @@
+ final int bytesToWrite,
+ final long positionToWrite)
+ {
+- aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
++ executor.execute(new Runnable()
++ {
++ public void run()
++ {
++ try
++ {
++ aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
++ }
++ catch (Exception e)
++ {
++ log.warn(e.getMessage(), e);
++ if (callback != null)
++ {
++ callback.onError(-1, e.getMessage());
++ }
++ }
++ }
++ });
+ }
+
+ private void checkOpened() throws Exception
+Index: src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java (revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java (working copy)
+@@ -28,7 +28,6 @@
+ import java.util.Arrays;
+ import java.util.List;
+
+-import org.jboss.messaging.core.journal.BufferCallback;
+ import org.jboss.messaging.core.journal.SequentialFileFactory;
+ import org.jboss.messaging.core.logging.Logger;
+
+@@ -45,8 +44,6 @@
+ private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
+
+ protected final String journalDir;
+-
+- protected BufferCallback bufferCallback;
+
+ public AbstractSequentialFactory(final String journalDir)
+ {
+@@ -88,22 +85,4 @@
+ return Arrays.asList(fileNames);
+ }
+
+- /**
+- * @return the bufferCallback
+- */
+- public BufferCallback getBufferCallback()
+- {
+- return bufferCallback;
+- }
+-
+- /**
+- * @param bufferCallback the bufferCallback to set
+- */
+- public void setBufferCallback(BufferCallback bufferCallback)
+- {
+- this.bufferCallback = bufferCallback;
+- }
+-
+-
+-
+ }
+Index: src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java (revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java (working copy)
+@@ -53,7 +53,7 @@
+ // maxIO is ignored on NIO
+ public SequentialFile createSequentialFile(final String fileName, final int maxIO)
+ {
+- return new NIOSequentialFile(journalDir, fileName, bufferCallback);
++ return new NIOSequentialFile(journalDir, fileName);
+ }
+
+ public boolean isSupportsCallbacks()
+@@ -94,12 +94,4 @@
+ return bytes;
+ }
+
+- /* (non-Javadoc)
+- * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+- */
+- public void releaseBuffer(ByteBuffer buffer)
+- {
+- // nothing to be done here
+- }
+-
+ }
+Index: src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java (revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java (working copy)
+@@ -23,12 +23,9 @@
+ package org.jboss.messaging.core.journal.impl;
+
+ import java.nio.ByteBuffer;
+-import java.util.concurrent.Executor;
+-import java.util.concurrent.Executors;
+
+ import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+ import org.jboss.messaging.core.journal.SequentialFile;
+-import org.jboss.messaging.utils.JBMThreadFactory;
+
+ /**
+ *
+@@ -39,16 +36,6 @@
+ */
+ public class AIOSequentialFileFactory extends AbstractSequentialFactory
+ {
+-
+- /** A single AIO write executor for every AIO File.
+- * This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
+- * And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
+- private final Executor writeExecutor = Executors.newSingleThreadExecutor(new JBMThreadFactory("JBM-AIO-writer-pool" + System.identityHashCode(this), true));
+-
+-
+- private final Executor pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), true));
+-
+-
+ public AIOSequentialFileFactory(final String journalDir)
+ {
+ super(journalDir);
+@@ -56,7 +43,7 @@
+
+ public SequentialFile createSequentialFile(final String fileName, final int maxIO)
+ {
+- return new AIOSequentialFile(journalDir, fileName, maxIO, bufferCallback, writeExecutor, pollerExecutor);
++ return new AIOSequentialFile(journalDir, fileName, maxIO);
+ }
+
+ public boolean isSupportsCallbacks()
+@@ -75,12 +62,12 @@
+ {
+ size = (size / 512 + 1) * 512;
+ }
+- return AsynchronousFileImpl.newBuffer(size);
++ return ByteBuffer.allocateDirect(size);
+ }
+
+ public void clearBuffer(final ByteBuffer directByteBuffer)
+ {
+- AsynchronousFileImpl.clearBuffer(directByteBuffer);
++ AsynchronousFileImpl.resetBuffer(directByteBuffer, directByteBuffer.limit());
+ }
+
+ public int getAlignment()
+@@ -104,12 +91,4 @@
+
+ return pos;
+ }
+-
+- /* (non-Javadoc)
+- * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+- */
+- public void releaseBuffer(ByteBuffer buffer)
+- {
+- AsynchronousFileImpl.destroyBuffer(buffer);
+- }
+ }
+Index: src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java (revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java (working copy)
+@@ -274,8 +274,6 @@
+ this.syncNonTransactional = syncNonTransactional;
+
+ this.fileFactory = fileFactory;
+-
+- this.fileFactory.setBufferCallback(this.buffersControl.callback);
+
+ this.filePrefix = filePrefix;
+
+@@ -872,10 +870,6 @@
+ throw new IllegalStateException("Journal must be in started state");
+ }
+
+- // Disabling life cycle control on buffers, as we are reading the buffer
+- buffersControl.disable();
+-
+-
+ Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+
+ List<JournalFile> orderedFiles = orderFiles();
+@@ -886,12 +880,12 @@
+
+ for (JournalFile file : orderedFiles)
+ {
+- ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
+-
+ file.getFile().open(1);
+
+- int bytesRead = file.getFile().read(wholeFileBuffer);
++ ByteBuffer bb = fileFactory.newBuffer(fileSize);
+
++ int bytesRead = file.getFile().read(bb);
++
+ if (bytesRead != fileSize)
+ {
+ // FIXME - We should extract everything we can from this file
+@@ -906,19 +900,16 @@
+ file.getFile().getFileName());
+ }
+
+-
+- wholeFileBuffer.position(0);
+-
+ // First long is the ordering timestamp, we just jump its position
+- wholeFileBuffer.position(SIZE_HEADER);
++ bb.position(SIZE_HEADER);
+
+ boolean hasData = false;
+
+- while (wholeFileBuffer.hasRemaining())
++ while (bb.hasRemaining())
+ {
+- final int pos = wholeFileBuffer.position();
++ final int pos = bb.position();
+
+- byte recordType = wholeFileBuffer.get();
++ byte recordType = bb.get();
+
+ if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+ {
+@@ -928,7 +919,7 @@
+ continue;
+ }
+
+- if (wholeFileBuffer.position() + SIZE_INT > fileSize)
++ if (bb.position() + SIZE_INT > fileSize)
+ {
+ // II - Ignore this record, lets keep looking
+ continue;
+@@ -936,7 +927,7 @@
+
+ // III - Every record has the file-id.
+ // This is what supports us from not re-filling the whole file
+- int readFileId = wholeFileBuffer.getInt();
++ int readFileId = bb.getInt();
+
+ // IV - This record is from a previous file-usage. The file was
+ // reused and we need to ignore this record
+@@ -946,7 +937,7 @@
+ // next reclaiming will fix it
+ hasData = true;
+
+- wholeFileBuffer.position(pos + 1);
++ bb.position(pos + 1);
+
+ continue;
+ }
+@@ -955,24 +946,24 @@
+
+ if (isTransaction(recordType))
+ {
+- if (wholeFileBuffer.position() + SIZE_LONG > fileSize)
++ if (bb.position() + SIZE_LONG > fileSize)
+ {
+ continue;
+ }
+
+- transactionID = wholeFileBuffer.getLong();
++ transactionID = bb.getLong();
+ }
+
+ long recordID = 0;
+
+ if (!isCompleteTransaction(recordType))
+ {
+- if (wholeFileBuffer.position() + SIZE_LONG > fileSize)
++ if (bb.position() + SIZE_LONG > fileSize)
+ {
+ continue;
+ }
+
+- recordID = wholeFileBuffer.getLong();
++ recordID = bb.getLong();
+
+ maxID = Math.max(maxID, recordID);
+ }
+@@ -993,14 +984,14 @@
+
+ if (isContainsBody(recordType))
+ {
+- if (wholeFileBuffer.position() + SIZE_INT > fileSize)
++ if (bb.position() + SIZE_INT > fileSize)
+ {
+ continue;
+ }
+
+- variableSize = wholeFileBuffer.getInt();
++ variableSize = bb.getInt();
+
+- if (wholeFileBuffer.position() + variableSize > fileSize)
++ if (bb.position() + variableSize > fileSize)
+ {
+ log.warn("Record at position " + pos +
+ " file:" +
+@@ -1011,12 +1002,12 @@
+
+ if (recordType != DELETE_RECORD_TX)
+ {
+- userRecordType = wholeFileBuffer.get();
++ userRecordType = bb.get();
+ }
+
+ record = new byte[variableSize];
+
+- wholeFileBuffer.get(record);
++ bb.get(record);
+ }
+
+ if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+@@ -1024,11 +1015,11 @@
+ if (recordType == PREPARE_RECORD)
+ {
+ // Add the variable size required for preparedTransactions
+- preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
++ preparedTransactionExtraDataSize = bb.getInt();
+ }
+ // Both commit and record contain the recordSummary, and this is
+ // used to calculate the record-size on both record-types
+- variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
++ variableSize += bb.getInt() * SIZE_INT * 2;
+ }
+
+ int recordSize = getRecordSize(recordType);
+@@ -1051,11 +1042,11 @@
+ continue;
+ }
+
+- int oldPos = wholeFileBuffer.position();
++ int oldPos = bb.position();
+
+- wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
++ bb.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+
+- int checkSize = wholeFileBuffer.getInt();
++ int checkSize = bb.getInt();
+
+ // VII - The checkSize at the end has to match with the size
+ // informed at the beggining.
+@@ -1072,12 +1063,12 @@
+ // next reclaiming will fix it
+ hasData = true;
+
+- wholeFileBuffer.position(pos + SIZE_BYTE);
++ bb.position(pos + SIZE_BYTE);
+
+ continue;
+ }
+
+- wholeFileBuffer.position(oldPos);
++ bb.position(oldPos);
+
+ // At this point everything is checked. So we relax and just load
+ // the data now.
+@@ -1199,10 +1190,10 @@
+
+ byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+- wholeFileBuffer.get(extraData);
++ bb.get(extraData);
+
+ // Pair <FileID, NumberOfElements>
+- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
++ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+
+ tx.prepared = true;
+
+@@ -1240,7 +1231,7 @@
+ // We need to read it even if transaction was not found, or
+ // the reading checks would fail
+ // Pair <OrderId, NumberOfElements>
+- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
++ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+
+ // The commit could be alone on its own journal-file and the
+ // whole transaction body was reclaimed but not the
+@@ -1328,20 +1319,18 @@
+ }
+ }
+
+- checkSize = wholeFileBuffer.getInt();
++ checkSize = bb.getInt();
+
+ // This is a sanity check about the loading code itself.
+ // If this checkSize doesn't match, it means the reading method is
+ // not doing what it was supposed to do
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+ {
+- throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() + ", pos = " + pos);
++ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+ }
+
+- lastDataPos = wholeFileBuffer.position();
++ lastDataPos = bb.position();
+ }
+-
+- fileFactory.releaseBuffer(wholeFileBuffer);
+
+ file.getFile().close();
+
+@@ -1356,8 +1345,6 @@
+ }
+ }
+
+- buffersControl.enable();
+-
+ // Create any more files we need
+
+ // FIXME - size() involves a scan
+@@ -1390,6 +1377,11 @@
+ {
+ currentFile.getFile().open();
+
++ if (reuseBufferSize > 0)
++ {
++ currentFile.getFile().setBufferCallback(buffersControl.callback);
++ }
++
+ currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
+
+ currentFile.setOffset(currentFile.getFile().position());
+@@ -1688,8 +1680,6 @@
+ freeFiles.clear();
+
+ openedFiles.clear();
+-
+- buffersControl.clearPoll();
+
+ state = STATE_STOPPED;
+ }
+@@ -1942,19 +1932,8 @@
+ return recordSize;
+ }
+
+-
+- /**
+- * This method requires bufferControl disabled, or the reads are going to be invalid
+- * */
+ private List<JournalFile> orderFiles() throws Exception
+ {
+-
+- if (buffersControl.enabled)
+- {
+- // Sanity check, this shouldn't happen unless someone made an invalid change on the code
+- throw new IllegalStateException("Buffer life cycle control needs to be disabled at this point!!!");
+- }
+-
+ List<String> fileNames = fileFactory.listFiles(fileExtension);
+
+ List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
+@@ -1970,10 +1949,6 @@
+ file.read(bb);
+
+ int orderingID = bb.getInt();
+-
+- fileFactory.releaseBuffer(bb);
+-
+- bb = null;
+
+ if (nextOrderingId.get() < orderingID)
+ {
+@@ -2130,6 +2105,11 @@
+ file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
+
+ file.setOffset(file.getFile().calculateBlockStart(SIZE_HEADER));
++
++ if (reuseBufferSize > 0)
++ {
++ file.getFile().setBufferCallback(buffersControl.callback);
++ }
+ }
+
+ private int generateOrderingID()
+@@ -2407,22 +2387,9 @@
+ /** This queue is fed by {@link JournalImpl.ReuseBuffersController.LocalBufferCallback}} which is called directly by NIO or NIO.
+ * On the case of the AIO this is almost called by the native layer as soon as the buffer is not being used any more
+ * and ready to be reused or GCed */
+- private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
+-
+- /** During reload we may disable/enable buffer reuse */
+- private boolean enabled = true;
++ private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+
+ final BufferCallback callback = new LocalBufferCallback();
+-
+- public void enable()
+- {
+- this.enabled = true;
+- }
+-
+- public void disable()
+- {
+- this.enabled = false;
+- }
+
+ public ByteBuffer newBuffer(final int size)
+ {
+@@ -2431,11 +2398,11 @@
+ // just to cleanup this
+ if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
+ {
+- trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + " elements");
++ trace("Clearing reuse buffers queue with " + reuseBuffers.size() + " elements");
+
+ bufferReuseLastTime = System.currentTimeMillis();
+
+- clearPoll();
++ reuseBuffers.clear();
+ }
+
+ // if a buffer is bigger than the configured-size, we just create a new
+@@ -2451,7 +2418,7 @@
+ int alignedSize = fileFactory.calculateBlockSize(size);
+
+ // Try getting a buffer from the queue...
+- ByteBuffer buffer = reuseBuffersQueue.poll();
++ ByteBuffer buffer = reuseBuffers.poll();
+
+ if (buffer == null)
+ {
+@@ -2467,41 +2434,24 @@
+
+ fileFactory.clearBuffer(buffer);
+ }
+-
++
+ buffer.rewind();
+
+ return buffer;
+ }
+ }
+
+- public void clearPoll()
+- {
+- ByteBuffer reusedBuffer;
+-
+- while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
+- {
+- fileFactory.releaseBuffer(reusedBuffer);
+- }
+- }
+-
+ private class LocalBufferCallback implements BufferCallback
+ {
+ public void bufferDone(final ByteBuffer buffer)
+ {
+- if (enabled)
++ bufferReuseLastTime = System.currentTimeMillis();
++
++ // If a buffer has any other than the configured size, the buffer
++ // will be just sent to GC
++ if (buffer.capacity() == reuseBufferSize)
+ {
+- bufferReuseLastTime = System.currentTimeMillis();
+-
+- // If a buffer has any other than the configured size, the buffer
+- // will be just sent to GC
+- if (buffer.capacity() == reuseBufferSize)
+- {
+- reuseBuffersQueue.offer(buffer);
+- }
+- else
+- {
+- fileFactory.releaseBuffer(buffer);
+- }
++ reuseBuffers.offer(buffer);
+ }
+ }
+ }
+Index: src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java (revision 7105)
++++ src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java (working copy)
+@@ -42,14 +42,8 @@
+ boolean isSupportsCallbacks();
+
+ ByteBuffer newBuffer(int size);
+-
+- void releaseBuffer(ByteBuffer buffer);
+-
+- void setBufferCallback(BufferCallback bufferCallback);
+-
+- BufferCallback getBufferCallback();
+
+- // To be used in tests only
++ // Avoid using this method in production as it creates an unecessary copy
+ ByteBuffer wrapBuffer(byte[] bytes);
+
+ int getAlignment();
+Index: src/main/org/jboss/messaging/core/journal/SequentialFile.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/SequentialFile.java (revision 7105)
++++ src/main/org/jboss/messaging/core/journal/SequentialFile.java (working copy)
+@@ -48,6 +48,8 @@
+ */
+ void open(int maxIO) throws Exception;
+
++ void setBufferCallback(BufferCallback callback);
++
+ int getAlignment() throws Exception;
+
+ int calculateBlockStart(int position) throws Exception;
+Index: src/config/stand-alone/non-clustered/jbm-configuration.xml
+===================================================================
+--- src/config/stand-alone/non-clustered/jbm-configuration.xml (revision 7105)
++++ src/config/stand-alone/non-clustered/jbm-configuration.xml (working copy)
+@@ -56,4 +56,37 @@
+ </address-settings>
+
+
++<<<<<<< .working
+ </configuration>
++=======
++ <journal-directory>data/journal</journal-directory>
++
++ <create-journal-dir>true</create-journal-dir>
++
++ <journal-type>NIO</journal-type>
++
++ <!-- The journal will reuse any buffers where the size < journal-buffer-reuse-size on write operations
++ Set this to -1 to disable this feature -->
++ <journal-buffer-reuse-size>1536</journal-buffer-reuse-size>
++
++ <!-- Does the journal sync to disk on each transaction commit, prepare or rollback? -->
++ <journal-sync-transactional>true</journal-sync-transactional>
++
++ <!-- Does the journal sync to disk for every non transactional persistent operation? -->
++ <journal-sync-non-transactional>false</journal-sync-non-transactional>
++
++ <!-- 10 MB journal file size -->
++ <journal-file-size>10485760</journal-file-size>
++
++ <journal-min-files>15</journal-min-files>
++
++ <!-- Maximum simultaneous asynchronous writes accepted by the native layer.
++ (parameter ignored on NIO)
++ You can verify the max AIO on the OS level at /proc/sys/fs/aio_max_nr. (aio-nr will give you the current max-aio being used)
++ -->
++ <journal-max-aio>10000</journal-max-aio>
++
++ </configuration>
++
++</deployment>
++>>>>>>> .merge-right.r6682
+Index: tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java (working copy)
+@@ -22,7 +22,12 @@
+
+ package org.jboss.messaging.tests.unit.core.asyncio;
+
+-import java.lang.ref.WeakReference;
++import org.jboss.messaging.core.asyncio.AIOCallback;
++import org.jboss.messaging.core.asyncio.BufferCallback;
++import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
++import org.jboss.messaging.core.exception.MessagingException;
++import org.jboss.messaging.core.logging.Logger;
++
+ import java.nio.ByteBuffer;
+ import java.nio.CharBuffer;
+ import java.nio.charset.Charset;
+@@ -30,17 +35,7 @@
+ import java.util.ArrayList;
+ import java.util.Iterator;
+ import java.util.concurrent.CountDownLatch;
+-import java.util.concurrent.ExecutorService;
+-import java.util.concurrent.Executors;
+-import java.util.concurrent.atomic.AtomicInteger;
+
+-import org.jboss.messaging.core.asyncio.AIOCallback;
+-import org.jboss.messaging.core.asyncio.BufferCallback;
+-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+-import org.jboss.messaging.core.exception.MessagingException;
+-import org.jboss.messaging.core.logging.Logger;
+-import org.jboss.messaging.utils.JBMThreadFactory;
+-
+ /**
+ *
+ * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+@@ -58,40 +53,19 @@
+ private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
+
+ byte commonBuffer[] = null;
+-
+- ExecutorService executor;
+-
+- ExecutorService pollerExecutor;
+
+-
+ private static void debug(final String msg)
+ {
+ log.debug(msg);
+ }
+
+-
+-
+- protected void setUp() throws Exception
+- {
+- super.setUp();
+- pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), false));
+- executor = Executors.newSingleThreadExecutor();
+- }
+-
+- protected void tearDown() throws Exception
+- {
+- executor.shutdown();
+- pollerExecutor.shutdown();
+- super.tearDown();
+- }
+-
+ /**
+ * Opening and closing a file immediately can lead to races on the native layer,
+ * creating crash conditions.
+ * */
+ public void testOpenClose() throws Exception
+ {
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ for (int i = 0; i < 1000; i++)
+ {
+ controller.open(FILE_NAME, 10000);
+@@ -99,16 +73,16 @@
+
+ }
+ }
+-
++
+ public void testFileNonExistent() throws Exception
+ {
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ for (int i = 0; i < 1000; i++)
+ {
+ try
+ {
+ controller.open("/non-existent/IDontExist.error", 10000);
+- fail("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail.");
++ fail ("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail.");
+ }
+ catch (Throwable ignored)
+ {
+@@ -132,22 +106,21 @@
+ */
+ public void testTwoFiles() throws Exception
+ {
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+- final AsynchronousFileImpl controller2 = new AsynchronousFileImpl(executor, pollerExecutor);
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
++ final AsynchronousFileImpl controller2 = new AsynchronousFileImpl();
+ controller.open(FILE_NAME + ".1", 10000);
+ controller2.open(FILE_NAME + ".2", 10000);
+
+ int numberOfLines = 1000;
+ int size = 1024;
+
+- ByteBuffer buffer = null;
+ try
+ {
+ CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+ CountDownLatch latchDone2 = new CountDownLatch(numberOfLines);
+
+- buffer = AsynchronousFileImpl.newBuffer(size);
+- encodeBufer(buffer);
++ ByteBuffer block = controller.newBuffer(size);
++ encodeBufer(block);
+
+ preAlloc(controller, numberOfLines * size);
+ preAlloc(controller2, numberOfLines * size);
+@@ -171,8 +144,8 @@
+ {
+ CountDownCallback tmp2 = iter2.next();
+
+- controller.write(counter * size, size, buffer, tmp);
+- controller.write(counter * size, size, buffer, tmp2);
++ controller.write(counter * size, size, block, tmp);
++ controller.write(counter * size, size, block, tmp2);
+ if (++counter % 5000 == 0)
+ {
+ debug(5000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
+@@ -229,7 +202,6 @@
+ }
+ finally
+ {
+- AsynchronousFileImpl.destroyBuffer(buffer);
+ try
+ {
+ controller.close();
+@@ -277,8 +249,7 @@
+ }
+ }
+
+- AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+- ByteBuffer buffer = null;
++ AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+
+@@ -287,13 +258,13 @@
+ controller.open(FILE_NAME, 10);
+ controller.close();
+
+- controller = new AsynchronousFileImpl(executor, pollerExecutor);
++ controller = new AsynchronousFileImpl();
+
+ controller.open(FILE_NAME, 10);
+
+ controller.fill(0, 1, 512, (byte)'j');
+
+- buffer = AsynchronousFileImpl.newBuffer(SIZE);
++ ByteBuffer buffer = controller.newBuffer(SIZE);
+
+ buffer.clear();
+
+@@ -334,7 +305,8 @@
+ {
+ }
+
+- newBuffer = AsynchronousFileImpl.newBuffer(512);
++ // newBuffer = ByteBuffer.allocateDirect(512);
++ newBuffer = controller.newBuffer(512);
+ callbackLocal = new LocalCallback();
+ controller.read(0, 512, newBuffer, callbackLocal);
+ callbackLocal.latch.await();
+@@ -353,8 +325,6 @@
+ }
+ finally
+ {
+- AsynchronousFileImpl.destroyBuffer(buffer);
+-
+ try
+ {
+ controller.close();
+@@ -370,7 +340,7 @@
+ public void testBufferCallbackUniqueBuffers() throws Exception
+ {
+ boolean closed = false;
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+ final int NUMBER_LINES = 1000;
+@@ -396,7 +366,7 @@
+ CountDownCallback aio = new CountDownCallback(latch);
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+- ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
++ ByteBuffer buffer = controller.newBuffer(SIZE);
+ buffer.rewind();
+ for (int j = 0; j < SIZE; j++)
+ {
+@@ -430,11 +400,6 @@
+ }
+ }
+
+- for (ByteBuffer bufferTmp : buffers)
+- {
+- AsynchronousFileImpl.destroyBuffer(bufferTmp);
+- }
+-
+ buffers.clear();
+
+ }
+@@ -450,8 +415,7 @@
+ public void testBufferCallbackAwaysSameBuffer() throws Exception
+ {
+ boolean closed = false;
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+- ByteBuffer buffer = null;
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+ final int NUMBER_LINES = 1000;
+@@ -476,7 +440,7 @@
+ CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
+ CountDownCallback aio = new CountDownCallback(latch);
+
+- buffer = AsynchronousFileImpl.newBuffer(SIZE);
++ ByteBuffer buffer = controller.newBuffer(SIZE);
+ buffer.rewind();
+ for (int j = 0; j < SIZE; j++)
+ {
+@@ -518,7 +482,6 @@
+ }
+ finally
+ {
+- AsynchronousFileImpl.destroyBuffer(buffer);
+ if (!closed)
+ {
+ controller.close();
+@@ -528,22 +491,11 @@
+
+ public void testRead() throws Exception
+ {
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+- controller.setBufferCallback(new BufferCallback()
+- {
+-
+- public void bufferDone(ByteBuffer buffer)
+- {
+- AsynchronousFileImpl.destroyBuffer(buffer);
+- }
+-
+- });
+-
+- ByteBuffer readBuffer = null;
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+
+- final int NUMBER_LINES = 1000;
++ final int NUMBER_LINES = 5000;
+ final int SIZE = 1024;
+
+ controller.open(FILE_NAME, 1000);
+@@ -556,15 +508,13 @@
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+- if (i % 100 == 0)
++ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
++ addString("Str value " + i + "\n", buffer);
++ for (int j = buffer.position(); j < buffer.capacity() - 1; j++)
+ {
+- System.out.println("Wrote " + i + " lines");
++ buffer.put((byte)' ');
+ }
+- ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
+- for (int j = 0; j < SIZE; j++)
+- {
+- buffer.put(getSamplebyte(j));
+- }
++ buffer.put((byte)'\n');
+
+ controller.write(i * SIZE, SIZE, buffer, aio);
+ }
+@@ -577,46 +527,48 @@
+ // If you call close you're supposed to wait events to finish before
+ // closing it
+ controller.close();
+- controller.setBufferCallback(null);
+-
+ controller.open(FILE_NAME, 10);
+
+- readBuffer = AsynchronousFileImpl.newBuffer(SIZE);
++ ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+- if (i % 100 == 0)
++ newBuffer.clear();
++ addString("Str value " + i + "\n", newBuffer);
++ for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++)
+ {
+- System.out.println("Read " + i + " lines");
++ newBuffer.put((byte)' ');
+ }
+- AsynchronousFileImpl.clearBuffer(readBuffer);
++ newBuffer.put((byte)'\n');
+
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownCallback aio = new CountDownCallback(latch);
+
+- controller.read(i * SIZE, SIZE, readBuffer, aio);
++ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+
++ controller.read(i * SIZE, SIZE, buffer, aio);
+ latch.await();
+ assertFalse(aio.errorCalled);
+ assertTrue(aio.doneCalled);
+
+ byte bytesRead[] = new byte[SIZE];
+- readBuffer.get(bytesRead);
++ byte bytesCompare[] = new byte[SIZE];
+
++ newBuffer.rewind();
++ newBuffer.get(bytesCompare);
++ buffer.rewind();
++ buffer.get(bytesRead);
++
+ for (int count = 0; count < SIZE; count++)
+ {
+- assertEquals("byte position " + count + " differs on line " + i + " position = " + count,
+- getSamplebyte(count),
+- bytesRead[count]);
++ assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
+ }
++
++ assertTrue(buffer.equals(newBuffer));
+ }
+ }
+ finally
+ {
+- if (readBuffer != null)
+- {
+- AsynchronousFileImpl.destroyBuffer(readBuffer);
+- }
+ try
+ {
+ controller.close();
+@@ -635,7 +587,7 @@
+ * The file is also read after being written to validate its correctness */
+ public void testConcurrentClose() throws Exception
+ {
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+
+@@ -646,20 +598,10 @@
+ controller.open(FILE_NAME, 10000);
+
+ controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+-
+- controller.setBufferCallback(new BufferCallback()
+- {
+
+- public void bufferDone(ByteBuffer buffer)
+- {
+- AsynchronousFileImpl.destroyBuffer(buffer);
+- }
+-
+- });
+-
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+- ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
++ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+
+ buffer.clear();
+ addString("Str value " + i + "\n", buffer);
+@@ -676,16 +618,14 @@
+ // If you call close you're supposed to wait events to finish before
+ // closing it
+ controller.close();
+-
+- controller.setBufferCallback(null);
+
+ assertEquals(0, readLatch.getCount());
+ readLatch.await();
+ controller.open(FILE_NAME, 10);
+
+- ByteBuffer newBuffer = AsynchronousFileImpl.newBuffer(SIZE);
++ ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+
+- ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
++ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+@@ -719,9 +659,6 @@
+
+ assertTrue(buffer.equals(newBuffer));
+ }
+-
+- AsynchronousFileImpl.destroyBuffer(newBuffer);
+- AsynchronousFileImpl.destroyBuffer(buffer);
+
+ }
+ finally
+@@ -739,17 +676,15 @@
+
+ private void asyncData(final int numberOfLines, final int size, final int aioLimit) throws Exception
+ {
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ controller.open(FILE_NAME, aioLimit);
+-
+- ByteBuffer buffer = null;
+-
++
+ try
+ {
+ CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+
+- buffer = AsynchronousFileImpl.newBuffer(size);
+- encodeBufer(buffer);
++ ByteBuffer block = controller.newBuffer(size);
++ encodeBufer(block);
+
+ preAlloc(controller, numberOfLines * size);
+
+@@ -766,7 +701,7 @@
+ int counter = 0;
+ for (CountDownCallback tmp : list)
+ {
+- controller.write(counter * size, size, buffer, tmp);
++ controller.write(counter * size, size, block, tmp);
+ if (++counter % 20000 == 0)
+ {
+ debug(20000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
+@@ -801,7 +736,6 @@
+ }
+ finally
+ {
+- AsynchronousFileImpl.destroyBuffer(buffer);
+ try
+ {
+ controller.close();
+@@ -815,17 +749,16 @@
+
+ public void testDirectSynchronous() throws Exception
+ {
+- ByteBuffer buffer = null;
+ try
+ {
+ final int NUMBER_LINES = 3000;
+ final int SIZE = 1024;
+
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ controller.open(FILE_NAME, 2000);
+
+- buffer = AsynchronousFileImpl.newBuffer(SIZE);
+- encodeBufer(buffer);
++ ByteBuffer block = ByteBuffer.allocateDirect(SIZE);
++ encodeBufer(block);
+
+ preAlloc(controller, NUMBER_LINES * SIZE);
+
+@@ -835,7 +768,7 @@
+ {
+ CountDownLatch latchDone = new CountDownLatch(1);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone);
+- controller.write(i * 512, 512, buffer, aioBlock);
++ controller.write(i * 512, 512, block, aioBlock);
+ latchDone.await();
+ assertTrue(aioBlock.doneCalled);
+ assertFalse(aioBlock.errorCalled);
+@@ -860,33 +793,27 @@
+ {
+ throw e;
+ }
+- finally
+- {
+- AsynchronousFileImpl.destroyBuffer(buffer);
+- }
+
+ }
+
+ public void testInvalidWrite() throws Exception
+ {
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ controller.open(FILE_NAME, 2000);
+-
+- ByteBuffer buffer = null;
+-
++
+ try
+ {
+ final int SIZE = 512;
+
+- buffer = AsynchronousFileImpl.newBuffer(SIZE);
+- encodeBufer(buffer);
++ ByteBuffer block = controller.newBuffer(SIZE);
++ encodeBufer(block);
+
+ preAlloc(controller, 10 * 512);
+
+ CountDownLatch latchDone = new CountDownLatch(1);
+
+ CountDownCallback aioBlock = new CountDownCallback(latchDone);
+- controller.write(11, 512, buffer, aioBlock);
++ controller.write(11, 512, block, aioBlock);
+
+ latchDone.await();
+
+@@ -900,7 +827,6 @@
+ }
+ finally
+ {
+- AsynchronousFileImpl.destroyBuffer(buffer);
+ controller.close();
+ }
+
+@@ -908,10 +834,10 @@
+
+ public void testInvalidAlloc() throws Exception
+ {
++ AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+- @SuppressWarnings("unused")
+- ByteBuffer buffer = AsynchronousFileImpl.newBuffer(300);
++ ByteBuffer buffer = controller.newBuffer(300);
+ fail("Exception expected");
+ }
+ catch (Exception ignored)
+@@ -919,58 +845,10 @@
+ }
+
+ }
+-
+- // This is in particular testing for http://bugs.sun.com/view_bug.do?bug_id=6791815
+- public void testAllocations() throws Exception
+- {
+- final AtomicInteger errors = new AtomicInteger(0);
+-
+- Thread ts[] = new Thread[100];
+
+- final CountDownLatch align = new CountDownLatch(ts.length);
+- final CountDownLatch start = new CountDownLatch(1);
+-
+- for (int i = 0; i < ts.length; i++)
+- {
+- ts[i] = new Thread()
+- {
+- public void run()
+- {
+- try
+- {
+- align.countDown();
+- start.await();
+- for (int i = 0; i < 1000; i++)
+- {
+- ByteBuffer buffer = AsynchronousFileImpl.newBuffer(512);
+- AsynchronousFileImpl.destroyBuffer(buffer);
+- }
+- }
+- catch (Throwable e)
+- {
+- e.printStackTrace();
+- errors.incrementAndGet();
+- }
+- }
+- };
+- ts[i].start();
+- }
+-
+- align.await();
+- start.countDown();
+-
+- for (Thread t: ts)
+- {
+- t.join();
+- }
+-
+- assertEquals(0, errors.get());
+- }
+-
+-
+ public void testSize() throws Exception
+ {
+- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+
+ final int NUMBER_LINES = 10;
+ final int SIZE = 1024;
+Index: tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java (working copy)
+@@ -22,19 +22,18 @@
+
+ package org.jboss.messaging.tests.unit.core.asyncio;
+
++import org.jboss.messaging.core.asyncio.AIOCallback;
++import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
++import org.jboss.messaging.core.logging.Logger;
++
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.LinkedList;
+ import java.util.concurrent.CountDownLatch;
+-import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executor;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.atomic.AtomicInteger;
+
+-import org.jboss.messaging.core.asyncio.AIOCallback;
+-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+-import org.jboss.messaging.core.logging.Logger;
+-import org.jboss.messaging.utils.JBMThreadFactory;
+-
+ /**
+ *
+ * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+@@ -60,35 +59,18 @@
+
+ // Executor exec
+
+- ExecutorService executor;
+-
+- ExecutorService pollerExecutor;
++ Executor executor = Executors.newSingleThreadExecutor();
+
+-
+- private static void debug(final String msg)
+- {
+- log.debug(msg);
+- }
+-
+-
+-
++ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+- pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), false));
+- executor = Executors.newSingleThreadExecutor();
++ position.set(0);
+ }
+-
+- protected void tearDown() throws Exception
+- {
+- executor.shutdown();
+- pollerExecutor.shutdown();
+- super.tearDown();
+- }
+-
++
+ public void testMultipleASynchronousWrites() throws Throwable
+ {
+- executeTest(false);
++ executeTest(false);
+ }
+
+ public void testMultipleSynchronousWrites() throws Throwable
+@@ -98,15 +80,15 @@
+
+ private void executeTest(final boolean sync) throws Throwable
+ {
+- debug(sync ? "Sync test:" : "Async test");
+- AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl(executor, pollerExecutor);
++ log.debug(sync ? "Sync test:" : "Async test");
++ AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl();
+ jlibAIO.open(FILE_NAME, 21000);
+ try
+ {
+- debug("Preallocating file");
++ log.debug("Preallocating file");
+
+ jlibAIO.fill(0l, NUMBER_OF_THREADS, SIZE * NUMBER_OF_LINES, (byte)0);
+- debug("Done Preallocating file");
++ log.debug("Done Preallocating file");
+
+ CountDownLatch latchStart = new CountDownLatch(NUMBER_OF_THREADS + 1);
+
+@@ -133,7 +115,7 @@
+ }
+ long endTime = System.currentTimeMillis();
+
+- debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
++ log.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES *
+ 1000 /
+@@ -182,17 +164,11 @@
+ {
+ super.run();
+
+-
+- ByteBuffer buffer = null;
+-
+- synchronized (MultiThreadAsynchronousFileTest.class)
+- {
+- buffer = AsynchronousFileImpl.newBuffer(SIZE);
+- }
+-
+ try
+ {
+
++ ByteBuffer buffer = libaio.newBuffer(SIZE);
++
+ // I'm aways reusing the same buffer, as I don't want any noise from
+ // malloc on the measurement
+ // Encoding buffer
+@@ -249,7 +225,7 @@
+
+ long endtime = System.currentTimeMillis();
+
+- debug(Thread.currentThread().getName() + " Rec/Sec= " +
++ log.debug(Thread.currentThread().getName() + " Rec/Sec= " +
+ NUMBER_OF_LINES *
+ 1000 /
+ (endtime - startTime) +
+@@ -270,13 +246,6 @@
+ e.printStackTrace();
+ failed = e;
+ }
+- finally
+- {
+- synchronized (MultiThreadAsynchronousFileTest.class)
+- {
+- AsynchronousFileImpl.destroyBuffer(buffer);
+- }
+- }
+
+ }
+ }
+Index: tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java (working copy)
+@@ -22,7 +22,6 @@
+
+ package org.jboss.messaging.tests.unit.core.journal.impl;
+
+-import java.io.File;
+ import java.util.ArrayList;
+ import java.util.Iterator;
+ import java.util.LinkedHashMap;
+@@ -475,7 +474,7 @@
+ * @param expected
+ * @param actual
+ */
+- protected void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
++ private void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
+ {
+ System.out.println("***********************************************");
+ System.out.println("Expected list:");
+@@ -483,14 +482,10 @@
+ {
+ System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+ }
+- if (actual != null)
++ System.out.println("Actual list:");
++ for (RecordInfo info : actual)
+ {
+- System.out.println("***********************************************");
+- System.out.println("Actual list:");
+- for (RecordInfo info : actual)
+- {
+- System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+- }
++ System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+ }
+ System.out.println("***********************************************");
+ }
+Index: tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java (working copy)
+@@ -574,29 +574,4 @@
+ // nothing to be done on the fake Sequential file
+ }
+
+- /* (non-Javadoc)
+- * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+- */
+- public void releaseBuffer(ByteBuffer buffer)
+- {
+- }
+-
+- /* (non-Javadoc)
+- * @see org.jboss.messaging.core.journal.SequentialFileFactory#getBufferCallback()
+- */
+- public BufferCallback getBufferCallback()
+- {
+- // TODO Auto-generated method stub
+- return null;
+- }
+-
+- /* (non-Javadoc)
+- * @see org.jboss.messaging.core.journal.SequentialFileFactory#setBufferCallback(org.jboss.messaging.core.journal.BufferCallback)
+- */
+- public void setBufferCallback(BufferCallback bufferCallback)
+- {
+- // TODO Auto-generated method stub
+-
+- }
+-
+ }
+Index: tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java (working copy)
+@@ -3018,6 +3018,7 @@
+ assertEquals(0, journal.getDataFilesCount());
+ }
+
++
+ protected abstract int getAlignment();
+
+ }
+Index: tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java (working copy)
+@@ -202,7 +202,7 @@
+ {
+ Journal journal =
+ new JournalImpl(10 * 1024 * 1024, 10, true, true, getFileFactory(),
+- "jbm-data", "jbm", 5000, 10 * 1024);
++ "jbm-data", "jbm", 5000, 0);
+
+ journal.start();
+
+Index: tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java (working copy)
+@@ -882,11 +882,9 @@
+ configuration.setSecurityEnabled(false);
+ configuration.setBindingsDirectory(getBindingsDir(node, backup));
+ configuration.setJournalMinFiles(2);
+- configuration.setJournalMaxAIO(1000);
+ configuration.setJournalDirectory(getJournalDir(node, backup));
+ configuration.setJournalFileSize(100 * 1024);
+- configuration.setJournalType(JournalType.ASYNCIO);
+- configuration.setJournalMaxAIO(1000);
++ configuration.setJournalType(JournalType.NIO);
+ configuration.setPagingDirectory(getPageDir(node, backup));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, backup));
+ configuration.setClustered(true);
+@@ -980,8 +978,7 @@
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(node, false));
+ configuration.setJournalFileSize(100 * 1024);
+- configuration.setJournalType(JournalType.ASYNCIO);
+- configuration.setJournalMaxAIO(1000);
++ configuration.setJournalType(JournalType.NIO);
+ configuration.setPagingDirectory(getPageDir(node, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+ configuration.setClustered(true);
+Index: tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java (working copy)
+@@ -103,7 +103,7 @@
+ backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+ backupConf.setJournalFileSize(100 * 1024);
+
+- backupConf.setJournalType(JournalType.ASYNCIO);
++ backupConf.setJournalType(JournalType.NIO);
+
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+@@ -124,7 +124,7 @@
+
+ liveConf.setJournalFileSize(100 * 1024);
+
+- liveConf.setJournalType(JournalType.ASYNCIO);
++ liveConf.setJournalType(JournalType.NIO);
+
+
+ liveConf.setSecurityEnabled(false);
+Index: tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java (working copy)
+@@ -386,7 +386,7 @@
+ backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+ backupConf.setJournalFileSize(100 * 1024);
+
+- backupConf.setJournalType(JournalType.ASYNCIO);
++ backupConf.setJournalType(JournalType.NIO);
+
+ backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+ backupConf.setGlobalPagingSize(pageSize);
+@@ -426,7 +426,7 @@
+ liveConf.setGlobalPagingSize(pageSize);
+ liveConf.setJournalFileSize(100 * 1024);
+
+- liveConf.setJournalType(JournalType.ASYNCIO);
++ liveConf.setJournalType(JournalType.NIO);
+ }
+
+ if (fileBased)
+Index: tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java (working copy)
+@@ -124,7 +124,7 @@
+ backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+ backupConf.setJournalFileSize(100 * 1024);
+
+- backupConf.setJournalType(JournalType.ASYNCIO);
++ backupConf.setJournalType(JournalType.NIO);
+
+ backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+ backupConf.setGlobalPagingSize(pageSize);
+@@ -164,7 +164,7 @@
+ liveConf.setGlobalPagingSize(pageSize);
+ liveConf.setJournalFileSize(100 * 1024);
+
+- liveConf.setJournalType(JournalType.ASYNCIO);
++ liveConf.setJournalType(JournalType.NIO);
+ }
+
+ if (fileBased)
+@@ -208,7 +208,7 @@
+ backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+ backupConf.setJournalFileSize(100 * 1024);
+
+- backupConf.setJournalType(JournalType.ASYNCIO);
++ backupConf.setJournalType(JournalType.NIO);
+
+ backupConf.setPagingMaxGlobalSizeBytes(-1);
+ backupConf.setGlobalPagingSize(-1);
+@@ -263,7 +263,7 @@
+ liveConf.setGlobalPagingSize(-1);
+ liveConf.setJournalFileSize(100 * 1024);
+
+- liveConf.setJournalType(JournalType.ASYNCIO);
++ liveConf.setJournalType(JournalType.NIO);
+ liveServer = Messaging.newMessagingServer(liveConf);
+ }
+ else
+Index: tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java (working copy)
+@@ -85,7 +85,7 @@
+ backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+ backupConf.setJournalFileSize(100 * 1024);
+
+- backupConf.setJournalType(JournalType.ASYNCIO);
++ backupConf.setJournalType(JournalType.NIO);
+
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+@@ -106,7 +106,7 @@
+
+ liveConf.setJournalFileSize(100 * 1024);
+
+- liveConf.setJournalType(JournalType.ASYNCIO);
++ liveConf.setJournalType(JournalType.NIO);
+
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+Index: tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java (working copy)
+@@ -60,7 +60,7 @@
+
+ configuration.start();
+
+- configuration.setJournalType(JournalType.ASYNCIO);
++ configuration.setJournalType(JournalType.NIO);
+
+ final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+ journal.start();
+Index: tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java (working copy)
+@@ -23,6 +23,7 @@
+ package org.jboss.messaging.tests.integration.journal;
+
+ import java.io.File;
++import java.util.concurrent.Executors;
+
+ import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+ import org.jboss.messaging.core.journal.SequentialFileFactory;
+Index: tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java (working copy)
+@@ -25,6 +25,7 @@
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.Executors;
+ import java.util.concurrent.atomic.AtomicInteger;
+
+ import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+@@ -77,7 +78,6 @@
+ ByteBuffer buff = factory.newBuffer(10);
+ assertEquals(512, buff.limit());
+ file.close();
+- factory.releaseBuffer(buff);
+ }
+
+ public void testBlockCallback() throws Exception
+@@ -131,7 +131,7 @@
+
+ BlockCallback callback = new BlockCallback();
+
+- final int NUMBER_OF_RECORDS = 500;
++ final int NUMBER_OF_RECORDS = 10000;
+
+ SequentialFile file = factory.createSequentialFile("callbackBlock.log", 1000);
+ file.open();
+Index: tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java (working copy)
+@@ -321,7 +321,7 @@
+ config.setJournalFileSize(10 * 1024 * 1024);
+ config.setJournalMinFiles(5);
+
+- config.setJournalType(JournalType.ASYNCIO);
++ config.setJournalType(JournalType.NIO);
+
+ return config;
+ }
+Index: tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java (revision 7105)
++++ tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java (working copy)
+@@ -126,7 +126,7 @@
+ Configuration config = new ConfigurationImpl();
+ config.setJournalDirectory(getJournalDir());
+ config.setBindingsDirectory(getBindingsDir());
+- config.setJournalType(JournalType.ASYNCIO);
++ config.setJournalType(JournalType.NIO);
+ config.setLargeMessagesDirectory(getLargeMessagesDir());
+ return config;
+ }
+@@ -272,7 +272,7 @@
+ configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir(index, false));
+ configuration.setJournalFileSize(100 * 1024);
+- configuration.setJournalType(JournalType.ASYNCIO);
++ configuration.setJournalType(JournalType.NIO);
+ configuration.setPagingDirectory(getPageDir(index, false));
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir(index, false));
+
+@@ -299,7 +299,7 @@
+ configuration.setPagingDirectory(getPageDir());
+ configuration.setLargeMessagesDirectory(getLargeMessagesDir());
+
+- configuration.setJournalType(JournalType.ASYNCIO);
++ configuration.setJournalType(JournalType.NIO);
+
+ configuration.getAcceptorConfigurations().clear();
+
More information about the jboss-cvs-commits
mailing list