[jboss-cvs] JBoss Messaging SVN: r7106 - branches/Branch_JBM2_Perf_Clebert.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu May 28 14:41:50 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-05-28 14:41:50 -0400 (Thu, 28 May 2009)
New Revision: 7106

Added:
   branches/Branch_JBM2_Perf_Clebert/revert-aio.patch
Log:
Patch to revert AIO changes, that I'm using on tests

Added: branches/Branch_JBM2_Perf_Clebert/revert-aio.patch
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/revert-aio.patch	                        (rev 0)
+++ branches/Branch_JBM2_Perf_Clebert/revert-aio.patch	2009-05-28 18:41:50 UTC (rev 7106)
@@ -0,0 +1,2662 @@
+Index: native/src/LibAIOController.cpp
+===================================================================
+--- native/src/LibAIOController.cpp	(revision 7105)
++++ native/src/LibAIOController.cpp	(working copy)
+@@ -35,6 +35,8 @@
+ #include "Version.h"
+ 
+ 
++
++
+ /*
+  * Class:     org_jboss_jaio_libaioimpl_LibAIOController
+  * Method:    init
+@@ -108,8 +110,6 @@
+ 	}
+ }
+ 
+-
+-// Fast memset on buffer
+ JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_resetBuffer
+   (JNIEnv *env, jclass, jobject jbuffer, jint size)
+ {
+@@ -125,46 +125,7 @@
+ 	
+ }
+ 
+-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_destroyBuffer
+-  (JNIEnv * env, jclass, jobject jbuffer)
+-{
+-	void *  buffer = env->GetDirectBufferAddress(jbuffer);
+-	free(buffer);
+-}
+ 
+-JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_newNativeBuffer
+-  (JNIEnv * env, jclass, jlong size)
+-{
+-	try
+-	{
+-		
+-		if (size % ALIGNMENT)
+-		{
+-			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Buffer size needs to be aligned to 512");
+-			return 0;
+-		}
+-		
+-		
+-		// This will allocate a buffer, aligned by 512.
+-		// Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
+-		void * buffer = 0;
+-		if (::posix_memalign(&buffer, 512, size))
+-		{
+-			throwException(env, NATIVE_ERROR_INTERNAL, "Error on posix_memalign");
+-			return 0;
+-		}
+-		
+-		memset(buffer, 0, (size_t)size);
+-		
+-		jobject jbuffer = env->NewDirectByteBuffer(buffer, size);
+-		return jbuffer;
+-	}
+-	catch (AIOException& e)
+-	{
+-		throwException(env, e.getErrorCode(), e.what());
+-		return 0;
+-	}
+-}
+ 
+ JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
+   (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
+@@ -173,14 +134,12 @@
+ 	{
+ 		AIOController * controller = (AIOController *) controllerAddress;
+ 		void * buffer = env->GetDirectBufferAddress(jbuffer);
+-
+ 		if (buffer == 0)
+ 		{
+ 			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Direct Buffer used");
+ 			return;
+ 		}
+ 		
+-		
+ 		CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
+ 		
+ 		controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
+Index: native/src/AsyncFile.cpp
+===================================================================
+--- native/src/AsyncFile.cpp	(revision 7105)
++++ native/src/AsyncFile.cpp	(working copy)
+@@ -13,7 +13,7 @@
+     USA
+ 
+     The GNU Lesser General Public License is available in the file COPYING.
+-
++    
+     Software written by Clebert Suconic (csuconic at redhat dot com)
+ */
+ 
+@@ -48,12 +48,12 @@
+ std::string io_error(int rc)
+ {
+ 	std::stringstream buffer;
+-
++	
+ 	if (rc == -ENOSYS)
+ 		buffer << "AIO not in this kernel";
+-	else
++	else 
+ 		buffer << "Error:= " << strerror(-rc);
+-
++	
+ 	return buffer.str();
+ }
+ 
+@@ -62,27 +62,27 @@
+ {
+ 	::pthread_mutex_init(&fileMutex,0);
+ 	::pthread_mutex_init(&pollerMutex,0);
+-
++	
+ 	maxIO = _maxIO;
+ 	fileName = _fileName;
+ 	if (io_queue_init(maxIO, &aioContext))
+ 	{
+-		throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio, out of AIO Handlers");
++		throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio"); 
+ 	}
+ 
+ 	fileHandle = ::open(fileName.data(),  O_RDWR | O_CREAT | O_DIRECT, 0666);
+ 	if (fileHandle < 0)
+ 	{
+ 		io_queue_release(aioContext);
+-		throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file");
++		throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file"); 
+ 	}
+-
++	
+ #ifdef DEBUG
+ 	fprintf (stderr,"File Handle %d", fileHandle);
+ #endif
+ 
+ 	events = (struct io_event *)malloc (maxIO * sizeof (struct io_event));
+-
++	
+ 	if (events == 0)
+ 	{
+ 		throw AIOException (NATIVE_ERROR_CANT_ALLOCATE_QUEUE, "Can't allocate ioEvents");
+@@ -112,17 +112,17 @@
+ 
+ void AsyncFile::pollEvents(THREAD_CONTEXT threadContext)
+ {
+-
++	
+ 	LockClass lock(&pollerMutex);
+ 	pollerRunning=1;
+-
++	
+ 	// TODO: Maybe we don't need to wait for one second.... we just keep waiting forever, and use something to interrupt it
+ 	// maybe an invalid write to interrupt it.
+ 	struct timespec oneSecond;
+ 	oneSecond.tv_sec = 1;
+ 	oneSecond.tv_nsec = 0;
+-
+-
++	
++	
+ 	while (pollerRunning)
+ 	{
+ 		if (isException(threadContext))
+@@ -130,15 +130,15 @@
+ 			return;
+ 		}
+ 		int result = io_getevents(this->aioContext, 1, maxIO, events, 0);
+-
+-
++		
++		
+ #ifdef DEBUG
+ 		fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
+ #endif
+-
++		
+ 		if (result > 0)
+ 		{
+-
++			
+ #ifdef DEBUG
+ 			fprintf (stdout, "Received %d events\n", result);
+ 			fflush(stdout);
+@@ -147,9 +147,9 @@
+ 
+ 		for (int i=0; i<result; i++)
+ 		{
+-
++			
+ 			struct iocb * iocbp = events[i].obj;
+-
++	
+ 			if (iocbp->data == (void *) -1)
+ 			{
+ 				pollerRunning = 0;
+@@ -160,7 +160,7 @@
+ 			else
+ 			{
+ 				CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data;
+-
++				
+ 				long result = events[i].res;
+ 				if (result < 0)
+ 				{
+@@ -172,13 +172,13 @@
+ 					adapter->done(threadContext);
+ 				}
+ 			}
+-
++			
+ 			delete iocbp;
+ 		}
+ 	}
+ #ifdef DEBUG
+ 	controller->log(threadContext, 2, "Poller finished execution");
+-#endif
++#endif	
+ }
+ 
+ 
+@@ -189,18 +189,18 @@
+ 	{
+ 		throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "You can only pre allocate files in multiples of 512");
+ 	}
+-
++	
+ 	void * preAllocBuffer = 0;
+ 	if (posix_memalign(&preAllocBuffer, 512, size))
+ 	{
+ 		throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign");
+ 	}
+-
++	
+ 	memset(preAllocBuffer, fillChar, size);
+-
+-
++	
++	
+ 	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+-
++	
+ 	for (int i=0; i<blocks; i++)
+ 	{
+ 		if (::write(fileHandle, preAllocBuffer, size)<0)
+@@ -208,9 +208,9 @@
+ 			throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "Error pre allocating the file");
+ 		}
+ 	}
+-
++	
+ 	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (NATIVE_ERROR_IO, "Error positioning the file");
+-
++	
+ 	free (preAllocBuffer);
+ }
+ 
+@@ -223,7 +223,7 @@
+ 
+ 	int tries = 0;
+ 	int result = 0;
+-
++	
+ 	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+ 	{
+ #ifdef DEBUG
+@@ -237,7 +237,7 @@
+ #endif
+ 			controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
+ 		}
+-
++		
+ 		if (tries > TRIES_BEFORE_ERROR)
+ 		{
+ #ifdef DEBUG
+@@ -247,7 +247,7 @@
+ 		}
+ 		::usleep(WAIT_FOR_SPOT);
+ 	}
+-
++	
+ 	if (result<0)
+ 	{
+ 		std::stringstream str;
+@@ -265,7 +265,7 @@
+ 
+ 	int tries = 0;
+ 	int result = 0;
+-
++	
+ 	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+ 	{
+ #ifdef DEBUG
+@@ -279,7 +279,7 @@
+ #endif
+ 			controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
+ 		}
+-
++		
+ 		if (tries > TRIES_BEFORE_ERROR)
+ 		{
+ #ifdef DEBUG
+@@ -289,7 +289,7 @@
+ 		}
+ 		::usleep(WAIT_FOR_SPOT);
+ 	}
+-
++	
+ 	if (result<0)
+ 	{
+ 		std::stringstream str;
+@@ -301,7 +301,7 @@
+ long AsyncFile::getSize()
+ {
+ 	struct stat64 statBuffer;
+-
++	
+ 	if (fstat64(fileHandle, &statBuffer) < 0)
+ 	{
+ 		return -1l;
+@@ -313,21 +313,21 @@
+ void AsyncFile::stopPoller(THREAD_CONTEXT threadContext)
+ {
+ 	pollerRunning = 0;
+-
+-
++	
++	
+ 	struct iocb * iocb = new struct iocb();
+ 	::io_prep_pwrite(iocb, fileHandle, 0, 0, 0);
+ 	iocb->data = (void *) -1;
+ 
+ 	int result = 0;
+-
++	
+ 	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+ 	{
+ 		fprintf(stderr, "Couldn't send request to stop poller, trying again");
+ 		controller->log(threadContext, 1, "Couldn't send request to stop poller, trying again");
+ 		::usleep(WAIT_FOR_SPOT);
+ 	}
+-
++	
+ 	// Waiting the Poller to finish (by giving up the lock)
+ 	LockClass lock(&pollerMutex);
+ }
+Index: native/src/Version.h
+===================================================================
+--- native/src/Version.h	(revision 7105)
++++ native/src/Version.h	(working copy)
+@@ -1,5 +1,5 @@
+ 
+ #ifndef _VERSION_NATIVE_AIO
+-#define _VERSION_NATIVE_AIO 19
++#define _VERSION_NATIVE_AIO 17
+ #endif
+ 
+Index: native/bin/libJBMLibAIO64.so
+===================================================================
+Cannot display: file marked as a binary type.
+svn:mime-type = application/octet-stream
+Index: src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java
+===================================================================
+--- src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java	(working copy)
+@@ -60,7 +60,7 @@
+          throw new NullPointerException("buffer");
+       }
+ 
+-      this.buffer = buffer;
++      this.buffer = buffer.slice();
+       capacity = buffer.remaining();
+    }
+ 
+Index: src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
+===================================================================
+--- src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	(working copy)
+@@ -23,7 +23,6 @@
+ package org.jboss.messaging.core.asyncio.impl;
+ 
+ import java.nio.ByteBuffer;
+-import java.util.concurrent.Executor;
+ import java.util.concurrent.Semaphore;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+@@ -35,7 +34,6 @@
+ import org.jboss.messaging.core.asyncio.BufferCallback;
+ import org.jboss.messaging.core.exception.MessagingException;
+ import org.jboss.messaging.core.logging.Logger;
+-import org.jboss.messaging.utils.VariableLatch;
+ 
+ /**
+  * 
+@@ -47,7 +45,8 @@
+  */
+ public class AsynchronousFileImpl implements AsynchronousFile
+ {
+-   // Static ----------------------------------------------------------------------------
++   // Static
++   // -------------------------------------------------------------------------------
+ 
+    private static final Logger log = Logger.getLogger(AsynchronousFileImpl.class);
+ 
+@@ -55,7 +54,7 @@
+ 
+    private static boolean loaded = false;
+ 
+-   private static int EXPECTED_NATIVE_VERSION = 19;
++   private static int EXPECTED_NATIVE_VERSION = 17;
+ 
+    public static void addMax(final int io)
+    {
+@@ -125,21 +124,18 @@
+       return loaded;
+    }
+ 
+-   // Attributes ------------------------------------------------------------------------
++   // Attributes
++   // ---------------------------------------------------------------------------------
+ 
+    private boolean opened = false;
+ 
+    private String fileName;
+ 
+-   private final VariableLatch pollerLatch = new VariableLatch();
++   private volatile Thread poller;
+ 
+-   private volatile Runnable poller;
+-
+    private int maxIO;
+ 
+    private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
+-   
+-   private final VariableLatch pendingWrites = new VariableLatch();
+ 
+    private Semaphore writeSemaphore;
+ 
+@@ -150,28 +146,11 @@
+     */
+    private long handler;
+ 
+-   // A context switch on AIO would make it to synchronize the disk before
+-   // switching to the new thread, what would cause
+-   // serious performance problems. Because of that we make all the writes on
+-   // AIO using a single thread.
+-   private final Executor writeExecutor;
++   // AsynchronousFile implementation
++   // ------------------------------------------------------------------------------------
+ 
+-   private final Executor pollerExecutor;
+-
+-   // AsynchronousFile implementation ---------------------------------------------------
+-
+-   /**
+-    * @param writeExecutor It needs to be a single Thread executor. If null it will use the user thread to execute write operations
+-    * @param pollerExecutor The thread pool that will initialize poller handlers
+-    */
+-   public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor)
++   public void open(final String fileName, final int maxIO)
+    {
+-      this.writeExecutor = writeExecutor;
+-      this.pollerExecutor = pollerExecutor;
+-   }
+-
+-   public void open(final String fileName, final int maxIO) throws MessagingException
+-   {
+       writeLock.lock();
+ 
+       try
+@@ -186,27 +165,7 @@
+ 
+          this.fileName = fileName;
+ 
+-         try
+-         {
+-            handler = init(fileName, this.maxIO, log);
+-         }
+-         catch (MessagingException e)
+-         {
+-            MessagingException ex = null;
+-            if (e.getCode() == MessagingException.NATIVE_ERROR_CANT_INITIALIZE_AIO)
+-            {
+-               ex = new MessagingException(e.getCode(),
+-                                           "Can't initialize AIO. Currently AIO in use = " + totalMaxIO.get() +
+-                                                    ", trying to allocate more " +
+-                                                    maxIO,
+-                                           e);
+-            }
+-            else
+-            {
+-               ex = e;
+-            }
+-            throw ex;
+-         }
++         handler = init(fileName, this.maxIO, log);
+          opened = true;
+          addMax(this.maxIO);
+       }
+@@ -225,20 +184,19 @@
+       try
+       {
+ 
+-         while (!pendingWrites.waitCompletion(60000))
+-         {
+-            log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
+-         }
+-         
+          while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+          {
+-            log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
++            log.warn("Couldn't acquire lock after 60 seconds on AIO",
++                     new Exception("Warning: Couldn't acquire lock after 60 seconds on AIO"));
+          }
+-
+          writeSemaphore = null;
+          if (poller != null)
+          {
+-            stopPoller();
++            Thread currentPoller = poller;
++            stopPoller(handler);
++            // We need to make sure we won't call close until Poller is
++            // completely done, or we might get beautiful GPFs
++            currentPoller.join();
+          }
+ 
+          closeInternal(handler);
+@@ -258,61 +216,30 @@
+    public void write(final long position,
+                      final long size,
+                      final ByteBuffer directByteBuffer,
+-                     final AIOCallback aioCallback)
++                     final AIOCallback aioPackage) throws MessagingException
+    {
+-      if (aioCallback == null)
+-      {
+-         throw new NullPointerException("Null Callback");
+-      }
+-
+       checkOpened();
+       if (poller == null)
+       {
+          startPoller();
+       }
+-      
+-      pendingWrites.up();
+-
+-      if (writeExecutor != null)
++      writeSemaphore.acquireUninterruptibly();
++      try
+       {
+-         writeExecutor.execute(new Runnable()
+-         {
+-            public void run()
+-            {
+-               writeSemaphore.acquireUninterruptibly();
+-
+-               try
+-               {
+-                  write(handler, position, size, directByteBuffer, aioCallback);
+-               }
+-               catch (MessagingException e)
+-               {
+-                  callbackError(aioCallback, e.getCode(), e.getMessage());
+-               }
+-               catch (RuntimeException e)
+-               {
+-                  callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+-               }
+-            }
+-         });
++         write(handler, position, size, directByteBuffer, aioPackage);
+       }
+-      else
++      catch (MessagingException e)
+       {
+-         writeSemaphore.acquireUninterruptibly();
+-
+-         try
+-         {
+-            write(handler, position, size, directByteBuffer, aioCallback);
+-         }
+-         catch (MessagingException e)
+-         {
+-            callbackError(aioCallback, e.getCode(), e.getMessage());
+-         }
+-         catch (RuntimeException e)
+-         {
+-            callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+-         }
++         // Release only if an exception happened
++         writeSemaphore.release();
++         throw e;
+       }
++      catch (RuntimeException e)
++      {
++         // Release only if an exception happened
++         writeSemaphore.release();
++         throw e;
++      }
+ 
+    }
+ 
+@@ -326,7 +253,6 @@
+       {
+          startPoller();
+       }
+-      pendingWrites.up();
+       writeSemaphore.acquireUninterruptibly();
+       try
+       {
+@@ -336,14 +262,12 @@
+       {
+          // Release only if an exception happened
+          writeSemaphore.release();
+-         pendingWrites.down();
+          throw e;
+       }
+       catch (RuntimeException e)
+       {
+          // Release only if an exception happened
+          writeSemaphore.release();
+-         pendingWrites.down();
+          throw e;
+       }
+    }
+@@ -370,22 +294,15 @@
+       return fileName;
+    }
+ 
+-   /**
+-    * This needs to be synchronized because of 
+-    * http://bugs.sun.com/view_bug.do?bug_id=6791815
+-    * http://mail.openjdk.java.net/pipermail/hotspot-runtime-dev/2009-January/000386.html
+-    * 
+-    * @param size
+-    * @return
+-    */
+-   public synchronized static ByteBuffer newBuffer(final int size)
++   // Should we make this method static?
++   public ByteBuffer newBuffer(final int size)
+    {
+-      if (size % 512 != 0)
++      if (size % getBlockSize() != 0)
+       {
+          throw new RuntimeException("Buffer size needs to be aligned to 512");
+       }
+ 
+-      return newNativeBuffer(size);
++      return ByteBuffer.allocateDirect(size);
+    }
+ 
+    public void setBufferCallback(final BufferCallback callback)
+@@ -393,20 +310,9 @@
+       bufferCallback = callback;
+    }
+ 
+-   /** Return the JNI handler used on C++ */
+-   public long getHandler()
+-   {
+-      return handler;
+-   }
++   // Private
++   // ---------------------------------------------------------------------------------
+ 
+-   public static void clearBuffer(final ByteBuffer buffer)
+-   {
+-      resetBuffer(buffer, buffer.limit());
+-      buffer.position(0);
+-   }
+-
+-   // Private ---------------------------------------------------------------------------
+-
+    /** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
+    @SuppressWarnings("unused")
+    // Called by the JNI layer.. just ignore the
+@@ -414,7 +320,6 @@
+    private void callbackDone(final AIOCallback callback, final ByteBuffer buffer)
+    {
+       writeSemaphore.release();
+-      pendingWrites.down();
+       callback.done();
+       if (bufferCallback != null)
+       {
+@@ -422,13 +327,13 @@
+       }
+    }
+ 
++   @SuppressWarnings("unused")
+    // Called by the JNI layer.. just ignore the
+    // warning
+    private void callbackError(final AIOCallback callback, final int errorCode, final String errorMessage)
+    {
+       log.warn("CallbackError: " + errorMessage);
+       writeSemaphore.release();
+-      pendingWrites.down();
+       callback.onError(errorCode, errorMessage);
+    }
+ 
+@@ -450,11 +355,10 @@
+ 
+          if (poller == null)
+          {
+-            pollerLatch.up();
+-            poller = new PollerRunnable();
++            poller = new PollerThread();
+             try
+             {
+-               pollerExecutor.execute(poller);
++               poller.start();
+             }
+             catch (Exception ex)
+             {
+@@ -476,28 +380,13 @@
+       }
+    }
+ 
+-   /**
+-    * @throws MessagingException
+-    * @throws InterruptedException
+-    */
+-   private void stopPoller() throws MessagingException, InterruptedException
+-   {
+-      stopPoller(handler);
+-      // We need to make sure we won't call close until Poller is
+-      // completely done, or we might get beautiful GPFs
+-      pollerLatch.waitCompletion();
+-   }
++   // Native
++   // ------------------------------------------------------------------------------------------
+ 
+-   // Native ----------------------------------------------------------------------------
++   public static native void resetBuffer(ByteBuffer directByteBuffer, int size);
+ 
+-   private static native void resetBuffer(ByteBuffer directByteBuffer, int size);
++   private static native long init(String fileName, int maxIO, Logger logger);
+ 
+-   public static native void destroyBuffer(ByteBuffer buffer);
+-
+-   private static native ByteBuffer newNativeBuffer(long size);
+-
+-   private static native long init(String fileName, int maxIO, Logger logger) throws MessagingException;
+-
+    private native long size0(long handle) throws MessagingException;
+ 
+    private native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws MessagingException;
+@@ -516,14 +405,17 @@
+    /** Poll asynchrounous events from internal queues */
+    private static native void internalPollEvents(long handler);
+ 
+-   // Inner classes ---------------------------------------------------------------------
++   // Inner classes
++   // -----------------------------------------------------------------------------------------
+ 
+-   private class PollerRunnable implements Runnable
++   private class PollerThread extends Thread
+    {
+-      PollerRunnable()
++      PollerThread()
+       {
++         super("NativePoller for " + fileName);
+       }
+ 
++      @Override
+       public void run()
+       {
+          try
+@@ -536,7 +428,6 @@
+             // Case the poller thread is interrupted, this will allow us to
+             // restart the thread when required
+             poller = null;
+-            pollerLatch.down();
+          }
+       }
+    }
+Index: src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
+===================================================================
+--- src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	(working copy)
+@@ -40,9 +40,8 @@
+     * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error 
+     * @param fileName
+     * @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
+-    * @throws MessagingException 
+     */
+-   void open(String fileName, int maxIO) throws MessagingException;
++   void open(String fileName, int maxIO);
+ 
+    /** 
+     * Warning: This function will perform a synchronous IO, probably translating to a fstat call
+@@ -50,13 +49,14 @@
+     * */
+    long size() throws MessagingException;
+ 
+-   /** Any error will be reported on the callback interface */ 
+-   void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
++   void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage) throws MessagingException;
+ 
+-   void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws MessagingException;
++   void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage) throws MessagingException;
+ 
+    void fill(long position, int blocks, long size, byte fillChar) throws MessagingException;
+ 
++   ByteBuffer newBuffer(int size);
++
+    void setBufferCallback(BufferCallback callback);
+ 
+    int getBlockSize();
+Index: src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	(working copy)
+@@ -54,11 +54,10 @@
+ 
+    BufferCallback bufferCallback;
+ 
+-   public NIOSequentialFile(final String directory, final String fileName, final BufferCallback bufferCallback)
++   public NIOSequentialFile(final String directory, final String fileName)
+    {
+       this.directory = directory;
+       file = new File(directory + "/" + fileName);
+-      this.bufferCallback = bufferCallback;
+    }
+ 
+    public int getAlignment()
+@@ -93,6 +92,11 @@
+       open();
+    }
+ 
++   public void setBufferCallback(final BufferCallback callback)
++   {
++      bufferCallback = callback;
++   }
++
+    public void fill(final int position, final int size, final byte fillCharacter) throws Exception
+    {
+       ByteBuffer bb = ByteBuffer.allocateDirect(size);
+Index: src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	(working copy)
+@@ -25,7 +25,8 @@
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.concurrent.CountDownLatch;
+-import java.util.concurrent.Executor;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+@@ -59,26 +60,18 @@
+    private AsynchronousFile aioFile;
+ 
+    private final AtomicLong position = new AtomicLong(0);
+-   
+-   private BufferCallback bufferCallback;
+ 
+-   /** A context switch on AIO would make it to synchronize the disk before
+-       switching to the new thread, what would cause
+-       serious performance problems. Because of that we make all the writes on
+-       AIO using a single thread. */
+-   private final Executor executor;
++   // A context switch on AIO would make it to synchronize the disk before
++   // switching to the new thread, what would cause
++   // serious performance problems. Because of that we make all the writes on
++   // AIO using a single thread.
++   private ExecutorService executor;
+ 
+-   /** The pool for Thread pollers */
+-   private final Executor pollerExecutor;
+-
+-   public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final BufferCallback bufferCallback, final Executor executor, final Executor pollerExecutor)
++   public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO)
+    {
+       this.journalDir = journalDir;
+       this.fileName = fileName;
+       this.maxIO = maxIO;
+-      this.bufferCallback = bufferCallback;
+-      this.executor = executor;
+-      this.pollerExecutor = pollerExecutor;
+    }
+ 
+    public boolean isOpen() 
+@@ -106,20 +99,10 @@
+    {
+       checkOpened();
+       opened = false;
++      executor.shutdown();
+ 
+-      final CountDownLatch donelatch = new CountDownLatch(1);
+-      
+-      executor.execute(new Runnable()
++      while (!executor.awaitTermination(60, TimeUnit.SECONDS))
+       {
+-         public void run()
+-         {
+-            donelatch.countDown();
+-         }
+-      });
+-      
+-      
+-      while (!donelatch.await(60, TimeUnit.SECONDS))
+-      {
+          log.warn("Executor on file " + fileName + " couldn't complete its tasks in 60 seconds.",
+                   new Exception("Warning: Executor on file " + fileName + " couldn't complete its tasks in 60 seconds."));
+       }
+@@ -207,10 +190,10 @@
+    public synchronized void open(final int currentMaxIO) throws Exception
+    {
+       opened = true;
++      executor = Executors.newSingleThreadExecutor();
+       aioFile = newFile();
+       aioFile.open(journalDir + "/" + fileName, currentMaxIO);
+       position.set(0);
+-      aioFile.setBufferCallback(bufferCallback);
+ 
+    }
+ 
+@@ -306,7 +289,7 @@
+     */
+    protected AsynchronousFile newFile()
+    {
+-      return new AsynchronousFileImpl(executor, pollerExecutor);
++      return new AsynchronousFileImpl();
+    }
+ 
+    // Private methods
+@@ -317,7 +300,24 @@
+                           final int bytesToWrite,
+                           final long positionToWrite)
+    {
+-      aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
++      executor.execute(new Runnable()
++      {
++         public void run()
++         {
++            try
++            {
++               aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
++            }
++            catch (Exception e)
++            {
++               log.warn(e.getMessage(), e);
++               if (callback != null)
++               {
++                  callback.onError(-1, e.getMessage());
++               }
++            }
++         }
++      });
+    }
+ 
+    private void checkOpened() throws Exception
+Index: src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	(working copy)
+@@ -28,7 +28,6 @@
+ import java.util.Arrays;
+ import java.util.List;
+ 
+-import org.jboss.messaging.core.journal.BufferCallback;
+ import org.jboss.messaging.core.journal.SequentialFileFactory;
+ import org.jboss.messaging.core.logging.Logger;
+ 
+@@ -45,8 +44,6 @@
+    private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
+ 
+    protected final String journalDir;
+-   
+-   protected BufferCallback bufferCallback;
+ 
+    public AbstractSequentialFactory(final String journalDir)
+    {
+@@ -88,22 +85,4 @@
+       return Arrays.asList(fileNames);
+    }
+ 
+-   /**
+-    * @return the bufferCallback
+-    */
+-   public BufferCallback getBufferCallback()
+-   {
+-      return bufferCallback;
+-   }
+-
+-   /**
+-    * @param bufferCallback the bufferCallback to set
+-    */
+-   public void setBufferCallback(BufferCallback bufferCallback)
+-   {
+-      this.bufferCallback = bufferCallback;
+-   }
+-
+-   
+-   
+ }
+Index: src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	(working copy)
+@@ -53,7 +53,7 @@
+    // maxIO is ignored on NIO
+    public SequentialFile createSequentialFile(final String fileName, final int maxIO)
+    {
+-      return new NIOSequentialFile(journalDir, fileName, bufferCallback);
++      return new NIOSequentialFile(journalDir, fileName);
+    }
+ 
+    public boolean isSupportsCallbacks()
+@@ -94,12 +94,4 @@
+       return bytes;
+    }
+ 
+-   /* (non-Javadoc)
+-    * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+-    */
+-   public void releaseBuffer(ByteBuffer buffer)
+-   {
+-      // nothing to be done here
+-   }
+-
+ }
+Index: src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	(working copy)
+@@ -23,12 +23,9 @@
+ package org.jboss.messaging.core.journal.impl;
+ 
+ import java.nio.ByteBuffer;
+-import java.util.concurrent.Executor;
+-import java.util.concurrent.Executors;
+ 
+ import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+ import org.jboss.messaging.core.journal.SequentialFile;
+-import org.jboss.messaging.utils.JBMThreadFactory;
+ 
+ /**
+  * 
+@@ -39,16 +36,6 @@
+  */
+ public class AIOSequentialFileFactory extends AbstractSequentialFactory
+ {
+-   
+-   /** A single AIO write executor for every AIO File.
+-    *  This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
+-    *  And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
+-   private final Executor writeExecutor = Executors.newSingleThreadExecutor(new JBMThreadFactory("JBM-AIO-writer-pool" + System.identityHashCode(this), true));
+-   
+-
+-   private final Executor pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), true));
+-
+-
+    public AIOSequentialFileFactory(final String journalDir)
+    {
+       super(journalDir);
+@@ -56,7 +43,7 @@
+ 
+    public SequentialFile createSequentialFile(final String fileName, final int maxIO)
+    {
+-      return new AIOSequentialFile(journalDir, fileName, maxIO, bufferCallback, writeExecutor, pollerExecutor);
++      return new AIOSequentialFile(journalDir, fileName, maxIO);
+    }
+ 
+    public boolean isSupportsCallbacks()
+@@ -75,12 +62,12 @@
+       {
+          size = (size / 512 + 1) * 512;
+       }
+-      return AsynchronousFileImpl.newBuffer(size);
++      return ByteBuffer.allocateDirect(size);
+    }
+ 
+    public void clearBuffer(final ByteBuffer directByteBuffer)
+    {
+-      AsynchronousFileImpl.clearBuffer(directByteBuffer);
++      AsynchronousFileImpl.resetBuffer(directByteBuffer, directByteBuffer.limit());
+    }
+ 
+    public int getAlignment()
+@@ -104,12 +91,4 @@
+ 
+       return pos;
+    }
+-
+-   /* (non-Javadoc)
+-    * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+-    */
+-   public void releaseBuffer(ByteBuffer buffer)
+-   {
+-      AsynchronousFileImpl.destroyBuffer(buffer);
+-   }
+ }
+Index: src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	(working copy)
+@@ -274,8 +274,6 @@
+       this.syncNonTransactional = syncNonTransactional;
+ 
+       this.fileFactory = fileFactory;
+-      
+-      this.fileFactory.setBufferCallback(this.buffersControl.callback);
+ 
+       this.filePrefix = filePrefix;
+ 
+@@ -872,10 +870,6 @@
+          throw new IllegalStateException("Journal must be in started state");
+       }
+ 
+-      // Disabling life cycle control on buffers, as we are reading the buffer 
+-      buffersControl.disable();
+-
+-
+       Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+ 
+       List<JournalFile> orderedFiles = orderFiles();
+@@ -886,12 +880,12 @@
+ 
+       for (JournalFile file : orderedFiles)
+       {
+-         ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
+-
+          file.getFile().open(1);
+ 
+-         int bytesRead = file.getFile().read(wholeFileBuffer);
++         ByteBuffer bb = fileFactory.newBuffer(fileSize);
+ 
++         int bytesRead = file.getFile().read(bb);
++
+          if (bytesRead != fileSize)
+          {
+             // FIXME - We should extract everything we can from this file
+@@ -906,19 +900,16 @@
+                                             file.getFile().getFileName());
+          }
+ 
+-         
+-         wholeFileBuffer.position(0);
+-         
+          // First long is the ordering timestamp, we just jump its position
+-         wholeFileBuffer.position(SIZE_HEADER);
++         bb.position(SIZE_HEADER);
+ 
+          boolean hasData = false;
+ 
+-         while (wholeFileBuffer.hasRemaining())
++         while (bb.hasRemaining())
+          {
+-            final int pos = wholeFileBuffer.position();
++            final int pos = bb.position();
+ 
+-            byte recordType = wholeFileBuffer.get();
++            byte recordType = bb.get();
+ 
+             if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+             {
+@@ -928,7 +919,7 @@
+                continue;
+             }
+ 
+-            if (wholeFileBuffer.position() + SIZE_INT > fileSize)
++            if (bb.position() + SIZE_INT > fileSize)
+             {
+                // II - Ignore this record, lets keep looking
+                continue;
+@@ -936,7 +927,7 @@
+ 
+             // III - Every record has the file-id.
+             // This is what supports us from not re-filling the whole file
+-            int readFileId = wholeFileBuffer.getInt();
++            int readFileId = bb.getInt();
+ 
+             // IV - This record is from a previous file-usage. The file was
+             // reused and we need to ignore this record
+@@ -946,7 +937,7 @@
+                // next reclaiming will fix it
+                hasData = true;
+ 
+-               wholeFileBuffer.position(pos + 1);
++               bb.position(pos + 1);
+ 
+                continue;
+             }
+@@ -955,24 +946,24 @@
+ 
+             if (isTransaction(recordType))
+             {
+-               if (wholeFileBuffer.position() + SIZE_LONG > fileSize)
++               if (bb.position() + SIZE_LONG > fileSize)
+                {
+                   continue;
+                }
+ 
+-               transactionID = wholeFileBuffer.getLong();
++               transactionID = bb.getLong();
+             }
+ 
+             long recordID = 0;
+ 
+             if (!isCompleteTransaction(recordType))
+             {
+-               if (wholeFileBuffer.position() + SIZE_LONG > fileSize)
++               if (bb.position() + SIZE_LONG > fileSize)
+                {
+                   continue;
+                }
+ 
+-               recordID = wholeFileBuffer.getLong();
++               recordID = bb.getLong();
+ 
+                maxID = Math.max(maxID, recordID);
+             }
+@@ -993,14 +984,14 @@
+ 
+             if (isContainsBody(recordType))
+             {
+-               if (wholeFileBuffer.position() + SIZE_INT > fileSize)
++               if (bb.position() + SIZE_INT > fileSize)
+                {
+                   continue;
+                }
+ 
+-               variableSize = wholeFileBuffer.getInt();
++               variableSize = bb.getInt();
+ 
+-               if (wholeFileBuffer.position() + variableSize > fileSize)
++               if (bb.position() + variableSize > fileSize)
+                {
+                   log.warn("Record at position " + pos +
+                            " file:" +
+@@ -1011,12 +1002,12 @@
+ 
+                if (recordType != DELETE_RECORD_TX)
+                {
+-                  userRecordType = wholeFileBuffer.get();
++                  userRecordType = bb.get();
+                }
+ 
+                record = new byte[variableSize];
+ 
+-               wholeFileBuffer.get(record);
++               bb.get(record);
+             }
+ 
+             if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+@@ -1024,11 +1015,11 @@
+                if (recordType == PREPARE_RECORD)
+                {
+                   // Add the variable size required for preparedTransactions
+-                  preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
++                  preparedTransactionExtraDataSize = bb.getInt();
+                }
+                // Both commit and record contain the recordSummary, and this is
+                // used to calculate the record-size on both record-types
+-               variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
++               variableSize += bb.getInt() * SIZE_INT * 2;
+             }
+ 
+             int recordSize = getRecordSize(recordType);
+@@ -1051,11 +1042,11 @@
+                continue;
+             }
+ 
+-            int oldPos = wholeFileBuffer.position();
++            int oldPos = bb.position();
+ 
+-            wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
++            bb.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+ 
+-            int checkSize = wholeFileBuffer.getInt();
++            int checkSize = bb.getInt();
+ 
+             // VII - The checkSize at the end has to match with the size
+             // informed at the beggining.
+@@ -1072,12 +1063,12 @@
+                // next reclaiming will fix it
+                hasData = true;
+ 
+-               wholeFileBuffer.position(pos + SIZE_BYTE);
++               bb.position(pos + SIZE_BYTE);
+ 
+                continue;
+             }
+ 
+-            wholeFileBuffer.position(oldPos);
++            bb.position(oldPos);
+ 
+             // At this point everything is checked. So we relax and just load
+             // the data now.
+@@ -1199,10 +1190,10 @@
+ 
+                   byte extraData[] = new byte[preparedTransactionExtraDataSize];
+ 
+-                  wholeFileBuffer.get(extraData);
++                  bb.get(extraData);
+ 
+                   // Pair <FileID, NumberOfElements>
+-                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
++                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+ 
+                   tx.prepared = true;
+ 
+@@ -1240,7 +1231,7 @@
+                   // We need to read it even if transaction was not found, or
+                   // the reading checks would fail
+                   // Pair <OrderId, NumberOfElements>
+-                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
++                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+ 
+                   // The commit could be alone on its own journal-file and the
+                   // whole transaction body was reclaimed but not the
+@@ -1328,20 +1319,18 @@
+                }
+             }
+ 
+-            checkSize = wholeFileBuffer.getInt();
++            checkSize = bb.getInt();
+ 
+             // This is a sanity check about the loading code itself.
+             // If this checkSize doesn't match, it means the reading method is
+             // not doing what it was supposed to do
+             if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+             {
+-               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() + ", pos = " + pos);
++               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+             }
+ 
+-            lastDataPos = wholeFileBuffer.position();
++            lastDataPos = bb.position();
+          }
+-         
+-         fileFactory.releaseBuffer(wholeFileBuffer);
+ 
+          file.getFile().close();
+ 
+@@ -1356,8 +1345,6 @@
+          }
+       }
+ 
+-      buffersControl.enable();
+-      
+       // Create any more files we need
+ 
+       // FIXME - size() involves a scan
+@@ -1390,6 +1377,11 @@
+       {
+          currentFile.getFile().open();
+ 
++         if (reuseBufferSize > 0)
++         {
++            currentFile.getFile().setBufferCallback(buffersControl.callback);
++         }
++
+          currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
+ 
+          currentFile.setOffset(currentFile.getFile().position());
+@@ -1688,8 +1680,6 @@
+          freeFiles.clear();
+ 
+          openedFiles.clear();
+-         
+-         buffersControl.clearPoll();
+ 
+          state = STATE_STOPPED;
+       }
+@@ -1942,19 +1932,8 @@
+       return recordSize;
+    }
+ 
+-   
+-   /** 
+-    * This method requires bufferControl disabled, or the reads are going to be invalid
+-    * */
+    private List<JournalFile> orderFiles() throws Exception
+    {
+-      
+-      if (buffersControl.enabled)
+-      {
+-         // Sanity check, this shouldn't happen unless someone made an invalid change on the code
+-         throw new IllegalStateException("Buffer life cycle control needs to be disabled at this point!!!");
+-      }
+-      
+       List<String> fileNames = fileFactory.listFiles(fileExtension);
+ 
+       List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
+@@ -1970,10 +1949,6 @@
+          file.read(bb);
+ 
+          int orderingID = bb.getInt();
+-         
+-         fileFactory.releaseBuffer(bb);
+-         
+-         bb = null;
+ 
+          if (nextOrderingId.get() < orderingID)
+          {
+@@ -2130,6 +2105,11 @@
+       file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
+ 
+       file.setOffset(file.getFile().calculateBlockStart(SIZE_HEADER));
++
++      if (reuseBufferSize > 0)
++      {
++         file.getFile().setBufferCallback(buffersControl.callback);
++      }
+    }
+ 
+    private int generateOrderingID()
+@@ -2407,22 +2387,9 @@
+       /** This queue is fed by {@link JournalImpl.ReuseBuffersController.LocalBufferCallback}} which is called directly by NIO or NIO.
+        * On the case of the AIO this is almost called by the native layer as soon as the buffer is not being used any more
+        * and ready to be reused or GCed */
+-      private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
+-      
+-      /** During reload we may disable/enable buffer reuse */
+-      private boolean enabled = true;
++      private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+ 
+       final BufferCallback callback = new LocalBufferCallback();
+-      
+-      public void enable()
+-      {
+-         this.enabled = true;
+-      }
+-      
+-      public void disable()
+-      {
+-         this.enabled = false;
+-      }
+ 
+       public ByteBuffer newBuffer(final int size)
+       {
+@@ -2431,11 +2398,11 @@
+          // just to cleanup this
+          if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
+          {
+-            trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + " elements");
++            trace("Clearing reuse buffers queue with " + reuseBuffers.size() + " elements");
+ 
+             bufferReuseLastTime = System.currentTimeMillis();
+ 
+-            clearPoll();
++            reuseBuffers.clear();
+          }
+ 
+          // if a buffer is bigger than the configured-size, we just create a new
+@@ -2451,7 +2418,7 @@
+             int alignedSize = fileFactory.calculateBlockSize(size);
+ 
+             // Try getting a buffer from the queue...
+-            ByteBuffer buffer = reuseBuffersQueue.poll();
++            ByteBuffer buffer = reuseBuffers.poll();
+ 
+             if (buffer == null)
+             {
+@@ -2467,41 +2434,24 @@
+ 
+                fileFactory.clearBuffer(buffer);
+             }
+-            
++
+             buffer.rewind();
+ 
+             return buffer;
+          }
+       }
+ 
+-      public void clearPoll()
+-      {
+-         ByteBuffer reusedBuffer;
+-         
+-         while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
+-         {
+-            fileFactory.releaseBuffer(reusedBuffer);
+-         }
+-      }
+-
+       private class LocalBufferCallback implements BufferCallback
+       {
+          public void bufferDone(final ByteBuffer buffer)
+          {
+-            if (enabled)
++            bufferReuseLastTime = System.currentTimeMillis();
++
++            // If a buffer has any other than the configured size, the buffer
++            // will be just sent to GC
++            if (buffer.capacity() == reuseBufferSize)
+             {
+-               bufferReuseLastTime = System.currentTimeMillis();
+-   
+-               // If a buffer has any other than the configured size, the buffer
+-               // will be just sent to GC
+-               if (buffer.capacity() == reuseBufferSize)
+-               {
+-                  reuseBuffersQueue.offer(buffer);
+-               }
+-               else
+-               {
+-                  fileFactory.releaseBuffer(buffer);
+-               }
++               reuseBuffers.offer(buffer);
+             }
+          }
+       }
+Index: src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	(working copy)
+@@ -42,14 +42,8 @@
+    boolean isSupportsCallbacks();
+ 
+    ByteBuffer newBuffer(int size);
+-   
+-   void releaseBuffer(ByteBuffer buffer);
+-   
+-   void setBufferCallback(BufferCallback bufferCallback);
+-   
+-   BufferCallback getBufferCallback();
+ 
+-   // To be used in tests only
++   // Avoid using this method in production as it creates an unecessary copy
+    ByteBuffer wrapBuffer(byte[] bytes);
+ 
+    int getAlignment();
+Index: src/main/org/jboss/messaging/core/journal/SequentialFile.java
+===================================================================
+--- src/main/org/jboss/messaging/core/journal/SequentialFile.java	(revision 7105)
++++ src/main/org/jboss/messaging/core/journal/SequentialFile.java	(working copy)
+@@ -48,6 +48,8 @@
+     */
+    void open(int maxIO) throws Exception;
+ 
++   void setBufferCallback(BufferCallback callback);
++
+    int getAlignment() throws Exception;
+ 
+    int calculateBlockStart(int position) throws Exception;
+Index: src/config/stand-alone/non-clustered/jbm-configuration.xml
+===================================================================
+--- src/config/stand-alone/non-clustered/jbm-configuration.xml	(revision 7105)
++++ src/config/stand-alone/non-clustered/jbm-configuration.xml	(working copy)
+@@ -56,4 +56,37 @@
+    </address-settings>
+ 
+ 
++<<<<<<< .working
+ </configuration>
++=======
++      <journal-directory>data/journal</journal-directory>
++
++      <create-journal-dir>true</create-journal-dir>
++
++      <journal-type>NIO</journal-type>
++
++      <!-- The journal will reuse any buffers where the size < journal-buffer-reuse-size on write operations
++           Set this to -1 to disable this feature -->
++      <journal-buffer-reuse-size>1536</journal-buffer-reuse-size>
++
++      <!-- Does the journal sync to disk on each transaction commit, prepare or rollback? -->
++      <journal-sync-transactional>true</journal-sync-transactional>
++      
++      <!-- Does the journal sync to disk for every non transactional persistent operation? -->
++      <journal-sync-non-transactional>false</journal-sync-non-transactional>
++
++      <!-- 10 MB journal file size -->
++      <journal-file-size>10485760</journal-file-size>
++
++      <journal-min-files>15</journal-min-files>
++
++      <!-- Maximum simultaneous asynchronous writes accepted by the native layer.
++      (parameter ignored on NIO)
++       You can verify the max AIO on the OS level at /proc/sys/fs/aio_max_nr. (aio-nr will give you the current max-aio being used)
++      -->
++      <journal-max-aio>10000</journal-max-aio>
++
++   </configuration>
++   
++</deployment>
++>>>>>>> .merge-right.r6682
+Index: tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java	(working copy)
+@@ -22,7 +22,12 @@
+ 
+ package org.jboss.messaging.tests.unit.core.asyncio;
+ 
+-import java.lang.ref.WeakReference;
++import org.jboss.messaging.core.asyncio.AIOCallback;
++import org.jboss.messaging.core.asyncio.BufferCallback;
++import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
++import org.jboss.messaging.core.exception.MessagingException;
++import org.jboss.messaging.core.logging.Logger;
++
+ import java.nio.ByteBuffer;
+ import java.nio.CharBuffer;
+ import java.nio.charset.Charset;
+@@ -30,17 +35,7 @@
+ import java.util.ArrayList;
+ import java.util.Iterator;
+ import java.util.concurrent.CountDownLatch;
+-import java.util.concurrent.ExecutorService;
+-import java.util.concurrent.Executors;
+-import java.util.concurrent.atomic.AtomicInteger;
+ 
+-import org.jboss.messaging.core.asyncio.AIOCallback;
+-import org.jboss.messaging.core.asyncio.BufferCallback;
+-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+-import org.jboss.messaging.core.exception.MessagingException;
+-import org.jboss.messaging.core.logging.Logger;
+-import org.jboss.messaging.utils.JBMThreadFactory;
+-
+ /**
+  * 
+  * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+@@ -58,40 +53,19 @@
+    private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
+ 
+    byte commonBuffer[] = null;
+-   
+-   ExecutorService executor;
+-   
+-   ExecutorService pollerExecutor;
+ 
+-
+    private static void debug(final String msg)
+    {
+       log.debug(msg);
+    }
+ 
+-   
+-   
+-   protected void setUp() throws Exception
+-   {
+-      super.setUp();
+-      pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), false));
+-      executor = Executors.newSingleThreadExecutor();
+-   }
+-   
+-   protected void tearDown() throws Exception
+-   {
+-      executor.shutdown();
+-      pollerExecutor.shutdown();
+-      super.tearDown();
+-   }
+-   
+    /** 
+     * Opening and closing a file immediately can lead to races on the native layer,
+     * creating crash conditions.
+     * */
+    public void testOpenClose() throws Exception
+    {
+-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       for (int i = 0; i < 1000; i++)
+       {
+          controller.open(FILE_NAME, 10000);
+@@ -99,16 +73,16 @@
+ 
+       }
+    }
+-
++   
+    public void testFileNonExistent() throws Exception
+    {
+-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       for (int i = 0; i < 1000; i++)
+       {
+          try
+          {
+             controller.open("/non-existent/IDontExist.error", 10000);
+-            fail("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail.");
++            fail ("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail.");
+          }
+          catch (Throwable ignored)
+          {
+@@ -132,22 +106,21 @@
+     */
+    public void testTwoFiles() throws Exception
+    {
+-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+-      final AsynchronousFileImpl controller2 = new AsynchronousFileImpl(executor, pollerExecutor);
++      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
++      final AsynchronousFileImpl controller2 = new AsynchronousFileImpl();
+       controller.open(FILE_NAME + ".1", 10000);
+       controller2.open(FILE_NAME + ".2", 10000);
+ 
+       int numberOfLines = 1000;
+       int size = 1024;
+ 
+-      ByteBuffer buffer = null;
+       try
+       {
+          CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+          CountDownLatch latchDone2 = new CountDownLatch(numberOfLines);
+ 
+-         buffer = AsynchronousFileImpl.newBuffer(size);
+-         encodeBufer(buffer);
++         ByteBuffer block = controller.newBuffer(size);
++         encodeBufer(block);
+ 
+          preAlloc(controller, numberOfLines * size);
+          preAlloc(controller2, numberOfLines * size);
+@@ -171,8 +144,8 @@
+          {
+             CountDownCallback tmp2 = iter2.next();
+ 
+-            controller.write(counter * size, size, buffer, tmp);
+-            controller.write(counter * size, size, buffer, tmp2);
++            controller.write(counter * size, size, block, tmp);
++            controller.write(counter * size, size, block, tmp2);
+             if (++counter % 5000 == 0)
+             {
+                debug(5000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
+@@ -229,7 +202,6 @@
+       }
+       finally
+       {
+-         AsynchronousFileImpl.destroyBuffer(buffer);
+          try
+          {
+             controller.close();
+@@ -277,8 +249,7 @@
+          }
+       }
+ 
+-      AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+-      ByteBuffer buffer = null;
++      AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       try
+       {
+ 
+@@ -287,13 +258,13 @@
+          controller.open(FILE_NAME, 10);
+          controller.close();
+ 
+-         controller = new AsynchronousFileImpl(executor, pollerExecutor);
++         controller = new AsynchronousFileImpl();
+ 
+          controller.open(FILE_NAME, 10);
+ 
+          controller.fill(0, 1, 512, (byte)'j');
+ 
+-         buffer = AsynchronousFileImpl.newBuffer(SIZE);
++         ByteBuffer buffer = controller.newBuffer(SIZE);
+ 
+          buffer.clear();
+ 
+@@ -334,7 +305,8 @@
+          {
+          }
+ 
+-         newBuffer = AsynchronousFileImpl.newBuffer(512);
++         // newBuffer = ByteBuffer.allocateDirect(512);
++         newBuffer = controller.newBuffer(512);
+          callbackLocal = new LocalCallback();
+          controller.read(0, 512, newBuffer, callbackLocal);
+          callbackLocal.latch.await();
+@@ -353,8 +325,6 @@
+       }
+       finally
+       {
+-         AsynchronousFileImpl.destroyBuffer(buffer);
+-         
+          try
+          {
+             controller.close();
+@@ -370,7 +340,7 @@
+    public void testBufferCallbackUniqueBuffers() throws Exception
+    {
+       boolean closed = false;
+-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       try
+       {
+          final int NUMBER_LINES = 1000;
+@@ -396,7 +366,7 @@
+          CountDownCallback aio = new CountDownCallback(latch);
+          for (int i = 0; i < NUMBER_LINES; i++)
+          {
+-            ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
++            ByteBuffer buffer = controller.newBuffer(SIZE);
+             buffer.rewind();
+             for (int j = 0; j < SIZE; j++)
+             {
+@@ -430,11 +400,6 @@
+             }
+          }
+ 
+-         for (ByteBuffer bufferTmp : buffers)
+-         {
+-            AsynchronousFileImpl.destroyBuffer(bufferTmp);
+-         }
+-
+          buffers.clear();
+ 
+       }
+@@ -450,8 +415,7 @@
+    public void testBufferCallbackAwaysSameBuffer() throws Exception
+    {
+       boolean closed = false;
+-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+-      ByteBuffer buffer = null;
++      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       try
+       {
+          final int NUMBER_LINES = 1000;
+@@ -476,7 +440,7 @@
+          CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
+          CountDownCallback aio = new CountDownCallback(latch);
+ 
+-         buffer = AsynchronousFileImpl.newBuffer(SIZE);
++         ByteBuffer buffer = controller.newBuffer(SIZE);
+          buffer.rewind();
+          for (int j = 0; j < SIZE; j++)
+          {
+@@ -518,7 +482,6 @@
+       }
+       finally
+       {
+-         AsynchronousFileImpl.destroyBuffer(buffer);
+          if (!closed)
+          {
+             controller.close();
+@@ -528,22 +491,11 @@
+ 
+    public void testRead() throws Exception
+    {
+-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+-      controller.setBufferCallback(new BufferCallback()
+-      {
+-
+-         public void bufferDone(ByteBuffer buffer)
+-         {
+-            AsynchronousFileImpl.destroyBuffer(buffer);
+-         }
+-
+-      });
+-
+-      ByteBuffer readBuffer = null;
++      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       try
+       {
+ 
+-         final int NUMBER_LINES = 1000;
++         final int NUMBER_LINES = 5000;
+          final int SIZE = 1024;
+ 
+          controller.open(FILE_NAME, 1000);
+@@ -556,15 +508,13 @@
+ 
+             for (int i = 0; i < NUMBER_LINES; i++)
+             {
+-               if (i % 100 == 0)
++               ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
++               addString("Str value " + i + "\n", buffer);
++               for (int j = buffer.position(); j < buffer.capacity() - 1; j++)
+                {
+-                  System.out.println("Wrote " + i + " lines");
++                  buffer.put((byte)' ');
+                }
+-               ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
+-               for (int j = 0; j < SIZE; j++)
+-               {
+-                  buffer.put(getSamplebyte(j));
+-               }
++               buffer.put((byte)'\n');
+ 
+                controller.write(i * SIZE, SIZE, buffer, aio);
+             }
+@@ -577,46 +527,48 @@
+          // If you call close you're supposed to wait events to finish before
+          // closing it
+          controller.close();
+-         controller.setBufferCallback(null);
+-
+          controller.open(FILE_NAME, 10);
+ 
+-         readBuffer = AsynchronousFileImpl.newBuffer(SIZE);
++         ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+ 
+          for (int i = 0; i < NUMBER_LINES; i++)
+          {
+-            if (i % 100 == 0)
++            newBuffer.clear();
++            addString("Str value " + i + "\n", newBuffer);
++            for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++)
+             {
+-               System.out.println("Read " + i + " lines");
++               newBuffer.put((byte)' ');
+             }
+-            AsynchronousFileImpl.clearBuffer(readBuffer);
++            newBuffer.put((byte)'\n');
+ 
+             CountDownLatch latch = new CountDownLatch(1);
+             CountDownCallback aio = new CountDownCallback(latch);
+ 
+-            controller.read(i * SIZE, SIZE, readBuffer, aio);
++            ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+ 
++            controller.read(i * SIZE, SIZE, buffer, aio);
+             latch.await();
+             assertFalse(aio.errorCalled);
+             assertTrue(aio.doneCalled);
+ 
+             byte bytesRead[] = new byte[SIZE];
+-            readBuffer.get(bytesRead);
++            byte bytesCompare[] = new byte[SIZE];
+ 
++            newBuffer.rewind();
++            newBuffer.get(bytesCompare);
++            buffer.rewind();
++            buffer.get(bytesRead);
++
+             for (int count = 0; count < SIZE; count++)
+             {
+-               assertEquals("byte position " + count + " differs on line " + i + " position = " + count,
+-                            getSamplebyte(count),
+-                            bytesRead[count]);
++               assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
+             }
++
++            assertTrue(buffer.equals(newBuffer));
+          }
+       }
+       finally
+       {
+-         if (readBuffer != null)
+-         {
+-            AsynchronousFileImpl.destroyBuffer(readBuffer);
+-         }
+          try
+          {
+             controller.close();
+@@ -635,7 +587,7 @@
+     *  The file is also read after being written to validate its correctness */
+    public void testConcurrentClose() throws Exception
+    {
+-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       try
+       {
+ 
+@@ -646,20 +598,10 @@
+          controller.open(FILE_NAME, 10000);
+ 
+          controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+-         
+-         controller.setBufferCallback(new BufferCallback()
+-         {
+ 
+-            public void bufferDone(ByteBuffer buffer)
+-            {
+-               AsynchronousFileImpl.destroyBuffer(buffer);
+-            }
+-            
+-         });
+-
+          for (int i = 0; i < NUMBER_LINES; i++)
+          {
+-            ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
++            ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+ 
+             buffer.clear();
+             addString("Str value " + i + "\n", buffer);
+@@ -676,16 +618,14 @@
+          // If you call close you're supposed to wait events to finish before
+          // closing it
+          controller.close();
+-         
+-         controller.setBufferCallback(null);
+ 
+          assertEquals(0, readLatch.getCount());
+          readLatch.await();
+          controller.open(FILE_NAME, 10);
+ 
+-         ByteBuffer newBuffer = AsynchronousFileImpl.newBuffer(SIZE);
++         ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+ 
+-         ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
++         ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+ 
+          for (int i = 0; i < NUMBER_LINES; i++)
+          {
+@@ -719,9 +659,6 @@
+ 
+             assertTrue(buffer.equals(newBuffer));
+          }
+-         
+-         AsynchronousFileImpl.destroyBuffer(newBuffer);
+-         AsynchronousFileImpl.destroyBuffer(buffer);
+ 
+       }
+       finally
+@@ -739,17 +676,15 @@
+ 
+    private void asyncData(final int numberOfLines, final int size, final int aioLimit) throws Exception
+    {
+-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       controller.open(FILE_NAME, aioLimit);
+-      
+-      ByteBuffer buffer = null;
+-      
++
+       try
+       {
+          CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+ 
+-         buffer = AsynchronousFileImpl.newBuffer(size);
+-         encodeBufer(buffer);
++         ByteBuffer block = controller.newBuffer(size);
++         encodeBufer(block);
+ 
+          preAlloc(controller, numberOfLines * size);
+ 
+@@ -766,7 +701,7 @@
+          int counter = 0;
+          for (CountDownCallback tmp : list)
+          {
+-            controller.write(counter * size, size, buffer, tmp);
++            controller.write(counter * size, size, block, tmp);
+             if (++counter % 20000 == 0)
+             {
+                debug(20000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
+@@ -801,7 +736,6 @@
+       }
+       finally
+       {
+-         AsynchronousFileImpl.destroyBuffer(buffer);
+          try
+          {
+             controller.close();
+@@ -815,17 +749,16 @@
+ 
+    public void testDirectSynchronous() throws Exception
+    {
+-      ByteBuffer buffer = null;
+       try
+       {
+          final int NUMBER_LINES = 3000;
+          final int SIZE = 1024;
+ 
+-         final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++         final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+          controller.open(FILE_NAME, 2000);
+ 
+-         buffer = AsynchronousFileImpl.newBuffer(SIZE);
+-         encodeBufer(buffer);
++         ByteBuffer block = ByteBuffer.allocateDirect(SIZE);
++         encodeBufer(block);
+ 
+          preAlloc(controller, NUMBER_LINES * SIZE);
+ 
+@@ -835,7 +768,7 @@
+          {
+             CountDownLatch latchDone = new CountDownLatch(1);
+             CountDownCallback aioBlock = new CountDownCallback(latchDone);
+-            controller.write(i * 512, 512, buffer, aioBlock);
++            controller.write(i * 512, 512, block, aioBlock);
+             latchDone.await();
+             assertTrue(aioBlock.doneCalled);
+             assertFalse(aioBlock.errorCalled);
+@@ -860,33 +793,27 @@
+       {
+          throw e;
+       }
+-      finally
+-      {
+-         AsynchronousFileImpl.destroyBuffer(buffer);
+-      }
+ 
+    }
+ 
+    public void testInvalidWrite() throws Exception
+    {
+-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       controller.open(FILE_NAME, 2000);
+-      
+-      ByteBuffer buffer = null;
+-      
++
+       try
+       {
+          final int SIZE = 512;
+ 
+-         buffer = AsynchronousFileImpl.newBuffer(SIZE);
+-         encodeBufer(buffer);
++         ByteBuffer block = controller.newBuffer(SIZE);
++         encodeBufer(block);
+ 
+          preAlloc(controller, 10 * 512);
+ 
+          CountDownLatch latchDone = new CountDownLatch(1);
+ 
+          CountDownCallback aioBlock = new CountDownCallback(latchDone);
+-         controller.write(11, 512, buffer, aioBlock);
++         controller.write(11, 512, block, aioBlock);
+ 
+          latchDone.await();
+ 
+@@ -900,7 +827,6 @@
+       }
+       finally
+       {
+-         AsynchronousFileImpl.destroyBuffer(buffer);
+          controller.close();
+       }
+ 
+@@ -908,10 +834,10 @@
+ 
+    public void testInvalidAlloc() throws Exception
+    {
++      AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       try
+       {
+-         @SuppressWarnings("unused")
+-         ByteBuffer buffer = AsynchronousFileImpl.newBuffer(300);
++         ByteBuffer buffer = controller.newBuffer(300);
+          fail("Exception expected");
+       }
+       catch (Exception ignored)
+@@ -919,58 +845,10 @@
+       }
+ 
+    }
+-   
+-   // This is in particular testing for http://bugs.sun.com/view_bug.do?bug_id=6791815
+-   public void testAllocations() throws Exception
+-   {
+-      final AtomicInteger errors = new AtomicInteger(0);
+- 
+-      Thread ts[] = new Thread[100];
+ 
+-      final CountDownLatch align = new CountDownLatch(ts.length);
+-      final CountDownLatch start = new CountDownLatch(1);
+-
+-      for (int i = 0; i < ts.length; i++)
+-      {
+-         ts[i] = new Thread()
+-         {
+-            public void run()
+-            {
+-               try
+-               {
+-                  align.countDown();
+-                  start.await();
+-                  for (int i = 0; i < 1000; i++)
+-                  {
+-                     ByteBuffer buffer = AsynchronousFileImpl.newBuffer(512);
+-                     AsynchronousFileImpl.destroyBuffer(buffer);
+-                  }
+-               }
+-               catch (Throwable e)
+-               {
+-                  e.printStackTrace();
+-                  errors.incrementAndGet();
+-               }
+-            }
+-         };
+-         ts[i].start();
+-      }
+-      
+-      align.await();
+-      start.countDown();
+-      
+-      for (Thread t: ts)
+-      {
+-         t.join();
+-      }
+-      
+-      assertEquals(0, errors.get());
+-   }
+-
+-
+    public void testSize() throws Exception
+    {
+-      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
++      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ 
+       final int NUMBER_LINES = 10;
+       final int SIZE = 1024;
+Index: tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java	(working copy)
+@@ -22,19 +22,18 @@
+ 
+ package org.jboss.messaging.tests.unit.core.asyncio;
+ 
++import org.jboss.messaging.core.asyncio.AIOCallback;
++import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
++import org.jboss.messaging.core.logging.Logger;
++
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.LinkedList;
+ import java.util.concurrent.CountDownLatch;
+-import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executor;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
+-import org.jboss.messaging.core.asyncio.AIOCallback;
+-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+-import org.jboss.messaging.core.logging.Logger;
+-import org.jboss.messaging.utils.JBMThreadFactory;
+-
+ /**
+  * 
+  * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+@@ -60,35 +59,18 @@
+ 
+    // Executor exec
+ 
+-   ExecutorService executor;
+-   
+-   ExecutorService pollerExecutor;
++   Executor executor = Executors.newSingleThreadExecutor();
+ 
+-
+-   private static void debug(final String msg)
+-   {
+-      log.debug(msg);
+-   }
+-
+-   
+-   
++   @Override
+    protected void setUp() throws Exception
+    {
+       super.setUp();
+-      pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), false));
+-      executor = Executors.newSingleThreadExecutor();
++      position.set(0);
+    }
+-   
+-   protected void tearDown() throws Exception
+-   {
+-      executor.shutdown();
+-      pollerExecutor.shutdown();
+-      super.tearDown();
+-   }
+-   
++
+    public void testMultipleASynchronousWrites() throws Throwable
+    {
+-         executeTest(false);
++      executeTest(false);
+    }
+ 
+    public void testMultipleSynchronousWrites() throws Throwable
+@@ -98,15 +80,15 @@
+ 
+    private void executeTest(final boolean sync) throws Throwable
+    {
+-      debug(sync ? "Sync test:" : "Async test");
+-      AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl(executor, pollerExecutor);
++      log.debug(sync ? "Sync test:" : "Async test");
++      AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl();
+       jlibAIO.open(FILE_NAME, 21000);
+       try
+       {
+-         debug("Preallocating file");
++         log.debug("Preallocating file");
+ 
+          jlibAIO.fill(0l, NUMBER_OF_THREADS, SIZE * NUMBER_OF_LINES, (byte)0);
+-         debug("Done Preallocating file");
++         log.debug("Done Preallocating file");
+ 
+          CountDownLatch latchStart = new CountDownLatch(NUMBER_OF_THREADS + 1);
+ 
+@@ -133,7 +115,7 @@
+          }
+          long endTime = System.currentTimeMillis();
+ 
+-         debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
++         log.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
+                    NUMBER_OF_THREADS *
+                    NUMBER_OF_LINES *
+                    1000 /
+@@ -182,17 +164,11 @@
+       {
+          super.run();
+ 
+-
+-         ByteBuffer buffer = null;
+-         
+-         synchronized (MultiThreadAsynchronousFileTest.class)
+-         {
+-            buffer = AsynchronousFileImpl.newBuffer(SIZE);
+-         }
+-
+          try
+          {
+ 
++            ByteBuffer buffer = libaio.newBuffer(SIZE);
++
+             // I'm aways reusing the same buffer, as I don't want any noise from
+             // malloc on the measurement
+             // Encoding buffer
+@@ -249,7 +225,7 @@
+ 
+             long endtime = System.currentTimeMillis();
+ 
+-            debug(Thread.currentThread().getName() + " Rec/Sec= " +
++            log.debug(Thread.currentThread().getName() + " Rec/Sec= " +
+                       NUMBER_OF_LINES *
+                       1000 /
+                       (endtime - startTime) +
+@@ -270,13 +246,6 @@
+             e.printStackTrace();
+             failed = e;
+          }
+-         finally
+-         {
+-            synchronized (MultiThreadAsynchronousFileTest.class)
+-            {
+-               AsynchronousFileImpl.destroyBuffer(buffer);
+-            }
+-         }
+ 
+       }
+    }
+Index: tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	(working copy)
+@@ -22,7 +22,6 @@
+ 
+ package org.jboss.messaging.tests.unit.core.journal.impl;
+ 
+-import java.io.File;
+ import java.util.ArrayList;
+ import java.util.Iterator;
+ import java.util.LinkedHashMap;
+@@ -475,7 +474,7 @@
+     * @param expected
+     * @param actual
+     */
+-   protected void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
++   private void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
+    {
+       System.out.println("***********************************************");
+       System.out.println("Expected list:");
+@@ -483,14 +482,10 @@
+       {
+          System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+       }
+-      if (actual != null)
++      System.out.println("Actual list:");
++      for (RecordInfo info : actual)
+       {
+-         System.out.println("***********************************************");
+-         System.out.println("Actual list:");
+-         for (RecordInfo info : actual)
+-         {
+-            System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+-         }
++         System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+       }
+       System.out.println("***********************************************");
+    }
+Index: tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	(working copy)
+@@ -574,29 +574,4 @@
+       // nothing to be done on the fake Sequential file
+    }
+ 
+-   /* (non-Javadoc)
+-    * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+-    */
+-   public void releaseBuffer(ByteBuffer buffer)
+-   {
+-   }
+-
+-   /* (non-Javadoc)
+-    * @see org.jboss.messaging.core.journal.SequentialFileFactory#getBufferCallback()
+-    */
+-   public BufferCallback getBufferCallback()
+-   {
+-      // TODO Auto-generated method stub
+-      return null;
+-   }
+-
+-   /* (non-Javadoc)
+-    * @see org.jboss.messaging.core.journal.SequentialFileFactory#setBufferCallback(org.jboss.messaging.core.journal.BufferCallback)
+-    */
+-   public void setBufferCallback(BufferCallback bufferCallback)
+-   {
+-      // TODO Auto-generated method stub
+-      
+-   }
+-
+ }
+Index: tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	(working copy)
+@@ -3018,6 +3018,7 @@
+       assertEquals(0, journal.getDataFilesCount());
+    }
+ 
++
+    protected abstract int getAlignment();
+ 
+ }
+Index: tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java	(working copy)
+@@ -202,7 +202,7 @@
+    {
+       Journal journal =
+          new JournalImpl(10 * 1024 * 1024, 10, true, true, getFileFactory(),
+-               "jbm-data", "jbm", 5000, 10 * 1024);
++               "jbm-data", "jbm", 5000, 0);
+       
+       journal.start();
+       
+Index: tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	(working copy)
+@@ -882,11 +882,9 @@
+       configuration.setSecurityEnabled(false);
+       configuration.setBindingsDirectory(getBindingsDir(node, backup));
+       configuration.setJournalMinFiles(2);
+-      configuration.setJournalMaxAIO(1000);
+       configuration.setJournalDirectory(getJournalDir(node, backup));
+       configuration.setJournalFileSize(100 * 1024);
+-      configuration.setJournalType(JournalType.ASYNCIO);
+-      configuration.setJournalMaxAIO(1000);
++      configuration.setJournalType(JournalType.NIO);
+       configuration.setPagingDirectory(getPageDir(node, backup));
+       configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, backup));
+       configuration.setClustered(true);
+@@ -980,8 +978,7 @@
+       configuration.setJournalMinFiles(2);
+       configuration.setJournalDirectory(getJournalDir(node, false));
+       configuration.setJournalFileSize(100 * 1024);
+-      configuration.setJournalType(JournalType.ASYNCIO);
+-      configuration.setJournalMaxAIO(1000);
++      configuration.setJournalType(JournalType.NIO);
+       configuration.setPagingDirectory(getPageDir(node, false));
+       configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
+       configuration.setClustered(true);
+Index: tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java	(working copy)
+@@ -103,7 +103,7 @@
+       backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+       backupConf.setJournalFileSize(100 * 1024);
+       
+-      backupConf.setJournalType(JournalType.ASYNCIO);
++      backupConf.setJournalType(JournalType.NIO);
+ 
+       backupConf.setSecurityEnabled(false);
+       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+@@ -124,7 +124,7 @@
+ 
+       liveConf.setJournalFileSize(100 * 1024);
+       
+-      liveConf.setJournalType(JournalType.ASYNCIO);
++      liveConf.setJournalType(JournalType.NIO);
+ 
+ 
+       liveConf.setSecurityEnabled(false);
+Index: tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java	(working copy)
+@@ -386,7 +386,7 @@
+          backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+          backupConf.setJournalFileSize(100 * 1024);
+ 
+-         backupConf.setJournalType(JournalType.ASYNCIO);
++         backupConf.setJournalType(JournalType.NIO);
+ 
+          backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+          backupConf.setGlobalPagingSize(pageSize);
+@@ -426,7 +426,7 @@
+          liveConf.setGlobalPagingSize(pageSize);
+          liveConf.setJournalFileSize(100 * 1024);
+ 
+-         liveConf.setJournalType(JournalType.ASYNCIO);
++         liveConf.setJournalType(JournalType.NIO);
+       }
+ 
+       if (fileBased)
+Index: tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	(working copy)
+@@ -124,7 +124,7 @@
+          backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+          backupConf.setJournalFileSize(100 * 1024);
+ 
+-         backupConf.setJournalType(JournalType.ASYNCIO);
++         backupConf.setJournalType(JournalType.NIO);
+ 
+          backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+          backupConf.setGlobalPagingSize(pageSize);
+@@ -164,7 +164,7 @@
+          liveConf.setGlobalPagingSize(pageSize);
+          liveConf.setJournalFileSize(100 * 1024);
+ 
+-         liveConf.setJournalType(JournalType.ASYNCIO);
++         liveConf.setJournalType(JournalType.NIO);
+       }
+ 
+       if (fileBased)
+@@ -208,7 +208,7 @@
+          backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+          backupConf.setJournalFileSize(100 * 1024);
+ 
+-         backupConf.setJournalType(JournalType.ASYNCIO);
++         backupConf.setJournalType(JournalType.NIO);
+ 
+          backupConf.setPagingMaxGlobalSizeBytes(-1);
+          backupConf.setGlobalPagingSize(-1);
+@@ -263,7 +263,7 @@
+          liveConf.setGlobalPagingSize(-1);
+          liveConf.setJournalFileSize(100 * 1024);
+ 
+-         liveConf.setJournalType(JournalType.ASYNCIO);
++         liveConf.setJournalType(JournalType.NIO);
+          liveServer = Messaging.newMessagingServer(liveConf);
+       }
+       else
+Index: tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java	(working copy)
+@@ -85,7 +85,7 @@
+       backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+       backupConf.setJournalFileSize(100 * 1024);
+ 
+-      backupConf.setJournalType(JournalType.ASYNCIO);
++      backupConf.setJournalType(JournalType.NIO);
+ 
+       backupConf.setSecurityEnabled(false);
+       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+@@ -106,7 +106,7 @@
+ 
+       liveConf.setJournalFileSize(100 * 1024);
+ 
+-      liveConf.setJournalType(JournalType.ASYNCIO);
++      liveConf.setJournalType(JournalType.NIO);
+ 
+       liveConf.setSecurityEnabled(false);
+       liveConf.getAcceptorConfigurations()
+Index: tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java	(working copy)
+@@ -60,7 +60,7 @@
+ 
+       configuration.start();
+ 
+-      configuration.setJournalType(JournalType.ASYNCIO);
++      configuration.setJournalType(JournalType.NIO);
+ 
+       final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+       journal.start();
+Index: tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java	(working copy)
+@@ -23,6 +23,7 @@
+ package org.jboss.messaging.tests.integration.journal;
+ 
+ import java.io.File;
++import java.util.concurrent.Executors;
+ 
+ import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+ import org.jboss.messaging.core.journal.SequentialFileFactory;
+Index: tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java	(working copy)
+@@ -25,6 +25,7 @@
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.Executors;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
+ import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+@@ -77,7 +78,6 @@
+       ByteBuffer buff = factory.newBuffer(10);
+       assertEquals(512, buff.limit());
+       file.close();
+-      factory.releaseBuffer(buff);
+    }
+ 
+    public void testBlockCallback() throws Exception
+@@ -131,7 +131,7 @@
+ 
+       BlockCallback callback = new BlockCallback();
+ 
+-      final int NUMBER_OF_RECORDS = 500;
++      final int NUMBER_OF_RECORDS = 10000;
+ 
+       SequentialFile file = factory.createSequentialFile("callbackBlock.log", 1000);
+       file.open();
+Index: tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java	(working copy)
+@@ -321,7 +321,7 @@
+       config.setJournalFileSize(10 * 1024 * 1024);
+       config.setJournalMinFiles(5);
+       
+-      config.setJournalType(JournalType.ASYNCIO);
++      config.setJournalType(JournalType.NIO);
+       
+       return config;
+    }
+Index: tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
+===================================================================
+--- tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	(revision 7105)
++++ tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	(working copy)
+@@ -126,7 +126,7 @@
+       Configuration config = new ConfigurationImpl();
+       config.setJournalDirectory(getJournalDir());
+       config.setBindingsDirectory(getBindingsDir());
+-      config.setJournalType(JournalType.ASYNCIO);
++      config.setJournalType(JournalType.NIO);
+       config.setLargeMessagesDirectory(getLargeMessagesDir());
+       return config;
+    }
+@@ -272,7 +272,7 @@
+       configuration.setJournalMinFiles(2);
+       configuration.setJournalDirectory(getJournalDir(index, false));
+       configuration.setJournalFileSize(100 * 1024);
+-      configuration.setJournalType(JournalType.ASYNCIO);
++      configuration.setJournalType(JournalType.NIO);
+       configuration.setPagingDirectory(getPageDir(index, false));
+       configuration.setLargeMessagesDirectory(getLargeMessagesDir(index, false));
+ 
+@@ -299,7 +299,7 @@
+       configuration.setPagingDirectory(getPageDir());
+       configuration.setLargeMessagesDirectory(getLargeMessagesDir());
+ 
+-      configuration.setJournalType(JournalType.ASYNCIO);
++      configuration.setJournalType(JournalType.NIO);
+ 
+       configuration.getAcceptorConfigurations().clear();
+ 




More information about the jboss-cvs-commits mailing list