[jboss-cvs] JBoss Messaging SVN: r6683 - in trunk: native/src and 16 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 6 00:42:57 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-05-06 00:42:56 -0400 (Wed, 06 May 2009)
New Revision: 6683

Removed:
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java
Modified:
   trunk/native/bin/libJBMLibAIO32.so
   trunk/native/bin/libJBMLibAIO64.so
   trunk/native/src/AsyncFile.cpp
   trunk/native/src/LibAIOController.cpp
   trunk/native/src/Version.h
   trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
   trunk/src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1538 - AIO & Buffer creation

Modified: trunk/native/bin/libJBMLibAIO32.so
===================================================================
(Binary files differ)

Modified: trunk/native/bin/libJBMLibAIO64.so
===================================================================
(Binary files differ)

Modified: trunk/native/src/AsyncFile.cpp
===================================================================
--- trunk/native/src/AsyncFile.cpp	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/native/src/AsyncFile.cpp	2009-05-06 04:42:56 UTC (rev 6683)
@@ -13,7 +13,7 @@
     USA
 
     The GNU Lesser General Public License is available in the file COPYING.
-    
+
     Software written by Clebert Suconic (csuconic at redhat dot com)
 */
 
@@ -48,12 +48,12 @@
 std::string io_error(int rc)
 {
 	std::stringstream buffer;
-	
+
 	if (rc == -ENOSYS)
 		buffer << "AIO not in this kernel";
-	else 
+	else
 		buffer << "Error:= " << strerror(-rc);
-	
+
 	return buffer.str();
 }
 
@@ -62,27 +62,27 @@
 {
 	::pthread_mutex_init(&fileMutex,0);
 	::pthread_mutex_init(&pollerMutex,0);
-	
+
 	maxIO = _maxIO;
 	fileName = _fileName;
 	if (io_queue_init(maxIO, &aioContext))
 	{
-		throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio"); 
+		throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio, out of AIO Handlers");
 	}
 
 	fileHandle = ::open(fileName.data(),  O_RDWR | O_CREAT | O_DIRECT, 0666);
 	if (fileHandle < 0)
 	{
 		io_queue_release(aioContext);
-		throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file"); 
+		throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file");
 	}
-	
+
 #ifdef DEBUG
 	fprintf (stderr,"File Handle %d", fileHandle);
 #endif
 
 	events = (struct io_event *)malloc (maxIO * sizeof (struct io_event));
-	
+
 	if (events == 0)
 	{
 		throw AIOException (NATIVE_ERROR_CANT_ALLOCATE_QUEUE, "Can't allocate ioEvents");
@@ -112,17 +112,17 @@
 
 void AsyncFile::pollEvents(THREAD_CONTEXT threadContext)
 {
-	
+
 	LockClass lock(&pollerMutex);
 	pollerRunning=1;
-	
+
 	// TODO: Maybe we don't need to wait for one second.... we just keep waiting forever, and use something to interrupt it
 	// maybe an invalid write to interrupt it.
 	struct timespec oneSecond;
 	oneSecond.tv_sec = 1;
 	oneSecond.tv_nsec = 0;
-	
-	
+
+
 	while (pollerRunning)
 	{
 		if (isException(threadContext))
@@ -130,15 +130,15 @@
 			return;
 		}
 		int result = io_getevents(this->aioContext, 1, maxIO, events, 0);
-		
-		
+
+
 #ifdef DEBUG
 		fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
 #endif
-		
+
 		if (result > 0)
 		{
-			
+
 #ifdef DEBUG
 			fprintf (stdout, "Received %d events\n", result);
 			fflush(stdout);
@@ -147,9 +147,9 @@
 
 		for (int i=0; i<result; i++)
 		{
-			
+
 			struct iocb * iocbp = events[i].obj;
-	
+
 			if (iocbp->data == (void *) -1)
 			{
 				pollerRunning = 0;
@@ -160,7 +160,7 @@
 			else
 			{
 				CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data;
-				
+
 				long result = events[i].res;
 				if (result < 0)
 				{
@@ -172,13 +172,13 @@
 					adapter->done(threadContext);
 				}
 			}
-			
+
 			delete iocbp;
 		}
 	}
 #ifdef DEBUG
 	controller->log(threadContext, 2, "Poller finished execution");
-#endif	
+#endif
 }
 
 
@@ -189,18 +189,18 @@
 	{
 		throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "You can only pre allocate files in multiples of 512");
 	}
-	
+
 	void * preAllocBuffer = 0;
 	if (posix_memalign(&preAllocBuffer, 512, size))
 	{
 		throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign");
 	}
-	
+
 	memset(preAllocBuffer, fillChar, size);
-	
-	
+
+
 	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
-	
+
 	for (int i=0; i<blocks; i++)
 	{
 		if (::write(fileHandle, preAllocBuffer, size)<0)
@@ -208,9 +208,9 @@
 			throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "Error pre allocating the file");
 		}
 	}
-	
+
 	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (NATIVE_ERROR_IO, "Error positioning the file");
-	
+
 	free (preAllocBuffer);
 }
 
@@ -223,7 +223,7 @@
 
 	int tries = 0;
 	int result = 0;
-	
+
 	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
 	{
 #ifdef DEBUG
@@ -237,7 +237,7 @@
 #endif
 			controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
 		}
-		
+
 		if (tries > TRIES_BEFORE_ERROR)
 		{
 #ifdef DEBUG
@@ -247,7 +247,7 @@
 		}
 		::usleep(WAIT_FOR_SPOT);
 	}
-	
+
 	if (result<0)
 	{
 		std::stringstream str;
@@ -265,7 +265,7 @@
 
 	int tries = 0;
 	int result = 0;
-	
+
 	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
 	{
 #ifdef DEBUG
@@ -279,7 +279,7 @@
 #endif
 			controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
 		}
-		
+
 		if (tries > TRIES_BEFORE_ERROR)
 		{
 #ifdef DEBUG
@@ -289,7 +289,7 @@
 		}
 		::usleep(WAIT_FOR_SPOT);
 	}
-	
+
 	if (result<0)
 	{
 		std::stringstream str;
@@ -301,7 +301,7 @@
 long AsyncFile::getSize()
 {
 	struct stat64 statBuffer;
-	
+
 	if (fstat64(fileHandle, &statBuffer) < 0)
 	{
 		return -1l;
@@ -313,21 +313,21 @@
 void AsyncFile::stopPoller(THREAD_CONTEXT threadContext)
 {
 	pollerRunning = 0;
-	
-	
+
+
 	struct iocb * iocb = new struct iocb();
 	::io_prep_pwrite(iocb, fileHandle, 0, 0, 0);
 	iocb->data = (void *) -1;
 
 	int result = 0;
-	
+
 	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
 	{
 		fprintf(stderr, "Couldn't send request to stop poller, trying again");
 		controller->log(threadContext, 1, "Couldn't send request to stop poller, trying again");
 		::usleep(WAIT_FOR_SPOT);
 	}
-	
+
 	// Waiting the Poller to finish (by giving up the lock)
 	LockClass lock(&pollerMutex);
 }

Modified: trunk/native/src/LibAIOController.cpp
===================================================================
--- trunk/native/src/LibAIOController.cpp	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/native/src/LibAIOController.cpp	2009-05-06 04:42:56 UTC (rev 6683)
@@ -35,8 +35,6 @@
 #include "Version.h"
 
 
-
-
 /*
  * Class:     org_jboss_jaio_libaioimpl_LibAIOController
  * Method:    init
@@ -110,6 +108,8 @@
 	}
 }
 
+
+// Fast memset on buffer
 JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_resetBuffer
   (JNIEnv *env, jclass, jobject jbuffer, jint size)
 {
@@ -125,7 +125,46 @@
 	
 }
 
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_destroyBuffer
+  (JNIEnv * env, jclass, jobject jbuffer)
+{
+	void *  buffer = env->GetDirectBufferAddress(jbuffer);
+	free(buffer);
+}
 
+JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_newNativeBuffer
+  (JNIEnv * env, jclass, jlong size)
+{
+	try
+	{
+		
+		if (size % ALIGNMENT)
+		{
+			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Buffer size needs to be aligned to 512");
+			return 0;
+		}
+		
+		
+		// This will allocate a buffer, aligned by 512.
+		// Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
+		void * buffer = 0;
+		if (::posix_memalign(&buffer, 512, size))
+		{
+			throwException(env, NATIVE_ERROR_INTERNAL, "Error on posix_memalign");
+			return 0;
+		}
+		
+		memset(buffer, 0, (size_t)size);
+		
+		jobject jbuffer = env->NewDirectByteBuffer(buffer, size);
+		return jbuffer;
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+		return 0;
+	}
+}
 
 JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
   (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
@@ -134,12 +173,14 @@
 	{
 		AIOController * controller = (AIOController *) controllerAddress;
 		void * buffer = env->GetDirectBufferAddress(jbuffer);
+
 		if (buffer == 0)
 		{
 			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Direct Buffer used");
 			return;
 		}
 		
+		
 		CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
 		
 		controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);

Modified: trunk/native/src/Version.h
===================================================================
--- trunk/native/src/Version.h	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/native/src/Version.h	2009-05-06 04:42:56 UTC (rev 6683)
@@ -1,5 +1,5 @@
 
 #ifndef _VERSION_NATIVE_AIO
-#define _VERSION_NATIVE_AIO 17
+#define _VERSION_NATIVE_AIO 19
 #endif
 

Modified: trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml	2009-05-06 04:42:56 UTC (rev 6683)
@@ -187,7 +187,7 @@
 
       <create-journal-dir>true</create-journal-dir>
 
-      <journal-type>NIO</journal-type>
+      <journal-type>ASYNCIO</journal-type>
 
       <!-- The journal will reuse any buffers where the size < journal-buffer-reuse-size on write operations
            Set this to -1 to disable this feature -->

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -40,8 +40,9 @@
     * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error 
     * @param fileName
     * @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
+    * @throws MessagingException 
     */
-   void open(String fileName, int maxIO);
+   void open(String fileName, int maxIO) throws MessagingException;
 
    /** 
     * Warning: This function will perform a synchronous IO, probably translating to a fstat call
@@ -49,14 +50,13 @@
     * */
    long size() throws MessagingException;
 
-   void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage) throws MessagingException;
+   /** Any error will be reported on the callback interface */ 
+   void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
 
-   void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage) throws MessagingException;
+   void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws MessagingException;
 
    void fill(long position, int blocks, long size, byte fillChar) throws MessagingException;
 
-   ByteBuffer newBuffer(int size);
-
    void setBufferCallback(BufferCallback callback);
 
    int getBlockSize();

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -23,6 +23,8 @@
 package org.jboss.messaging.core.asyncio.impl;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -34,6 +36,7 @@
 import org.jboss.messaging.core.asyncio.BufferCallback;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.VariableLatch;
 
 /**
  * 
@@ -54,7 +57,7 @@
 
    private static boolean loaded = false;
 
-   private static int EXPECTED_NATIVE_VERSION = 17;
+   private static int EXPECTED_NATIVE_VERSION = 19;
 
    public static void addMax(final int io)
    {
@@ -131,7 +134,9 @@
 
    private String fileName;
 
-   private volatile Thread poller;
+   private final VariableLatch pollerLatch = new VariableLatch();
+   
+   private volatile Runnable poller;
 
    private int maxIO;
 
@@ -145,12 +150,31 @@
     *  Warning: Beware of the C++ pointer! It will bite you! :-)
     */
    private long handler;
+   
+   
+   // A context switch on AIO would make it to synchronize the disk before
+   // switching to the new thread, what would cause
+   // serious performance problems. Because of that we make all the writes on
+   // AIO using a single thread.
+   private final Executor writeExecutor;
+   
+   private final Executor pollerExecutor;
 
    // AsynchronousFile implementation
    // ------------------------------------------------------------------------------------
 
-   public void open(final String fileName, final int maxIO)
+   /**
+    * @param writeExecutor It needs to be a single Thread executor. If null it will use the user thread to execute write operations
+    * @param pollerExecutor The thread pool that will initialize poller handlers
+    */
+   public AsynchronousFileImpl(Executor writeExecutor, Executor pollerExecutor)
    {
+      this.writeExecutor = writeExecutor;
+      this.pollerExecutor = pollerExecutor;
+   }
+   
+   public void open(final String fileName, final int maxIO) throws MessagingException
+   {
       writeLock.lock();
 
       try
@@ -165,7 +189,23 @@
 
          this.fileName = fileName;
 
-         handler = init(fileName, this.maxIO, log);
+         try
+         {
+            handler = init(fileName, this.maxIO, log);
+         }
+         catch (MessagingException e)
+         {
+            MessagingException ex = null;
+            if (e.getCode() == MessagingException.NATIVE_ERROR_CANT_INITIALIZE_AIO)
+            {
+               ex = new MessagingException(e.getCode(), "Can't initialize AIO. Currently AIO in use = " + totalMaxIO.get() + ", trying to allocate more " + maxIO, e);
+            }
+            else
+            {
+               ex = e;
+            }
+            throw ex;
+         }
          opened = true;
          addMax(this.maxIO);
       }
@@ -192,11 +232,10 @@
          writeSemaphore = null;
          if (poller != null)
          {
-            Thread currentPoller = poller;
             stopPoller(handler);
             // We need to make sure we won't call close until Poller is
             // completely done, or we might get beautiful GPFs
-            currentPoller.join();
+            this.pollerLatch.waitCompletion();
          }
 
          closeInternal(handler);
@@ -216,30 +255,56 @@
    public void write(final long position,
                      final long size,
                      final ByteBuffer directByteBuffer,
-                     final AIOCallback aioPackage) throws MessagingException
+                     final AIOCallback aioCallback)
    {
+      if (aioCallback == null)
+      {
+         throw new NullPointerException("Null Callback");
+      }
+      
       checkOpened();
       if (poller == null)
       {
          startPoller();
       }
       writeSemaphore.acquireUninterruptibly();
-      try
+      
+      if (writeExecutor != null)
       {
-         write(handler, position, size, directByteBuffer, aioPackage);
+         writeExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  write(handler, position, size, directByteBuffer, aioCallback);
+               }
+               catch (MessagingException e)
+               {
+                  callbackError(aioCallback, e.getCode(), e.getMessage());
+               }
+               catch (RuntimeException e)
+               {
+                  callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+               }
+            }
+         });
       }
-      catch (MessagingException e)
+      else
       {
-         // Release only if an exception happened
-         writeSemaphore.release();
-         throw e;
+         try
+         {
+            write(handler, position, size, directByteBuffer, aioCallback);
+         }
+         catch (MessagingException e)
+         {
+            callbackError(aioCallback, e.getCode(), e.getMessage());
+         }
+         catch (RuntimeException e)
+         {
+            callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+         }
       }
-      catch (RuntimeException e)
-      {
-         // Release only if an exception happened
-         writeSemaphore.release();
-         throw e;
-      }
 
    }
 
@@ -294,22 +359,42 @@
       return fileName;
    }
 
-   // Should we make this method static?
-   public ByteBuffer newBuffer(final int size)
+   /**
+    * This needs to be synchronized because of 
+    * http://bugs.sun.com/view_bug.do?bug_id=6791815
+    * http://mail.openjdk.java.net/pipermail/hotspot-runtime-dev/2009-January/000386.html
+    * 
+    * @param size
+    * @return
+    */
+   public synchronized static ByteBuffer newBuffer(final int size)
    {
-      if (size % getBlockSize() != 0)
+      if (size % 512 != 0)
       {
          throw new RuntimeException("Buffer size needs to be aligned to 512");
       }
 
-      return ByteBuffer.allocateDirect(size);
+      return newNativeBuffer(size);
    }
 
    public void setBufferCallback(final BufferCallback callback)
    {
       bufferCallback = callback;
    }
+   
+   /** Return the JNI handler used on C++ */
+   public long getHandler()
+   {
+      return handler;
+   }
+   
+   public static void clearBuffer(ByteBuffer buffer)
+   {
+      resetBuffer(buffer, buffer.limit());
+      buffer.position(0);
+   }
 
+
    // Private
    // ---------------------------------------------------------------------------------
 
@@ -355,10 +440,11 @@
 
          if (poller == null)
          {
-            poller = new PollerThread();
+            pollerLatch.up();
+            poller = new PollerRunnable();
             try
             {
-               poller.start();
+               pollerExecutor.execute(poller);
             }
             catch (Exception ex)
             {
@@ -379,14 +465,22 @@
          throw new RuntimeException("File is not opened");
       }
    }
-
    // Native
    // ------------------------------------------------------------------------------------------
 
-   public static native void resetBuffer(ByteBuffer directByteBuffer, int size);
+   private static native void resetBuffer(ByteBuffer directByteBuffer, int size);
 
-   private static native long init(String fileName, int maxIO, Logger logger);
 
+   // Should we make this method static?
+	public static native void destroyBuffer(ByteBuffer buffer);
+	
+	// Should we make this method static?
+	private static native ByteBuffer newNativeBuffer(long size);
+	
+   
+   
+   private static native long init(String fileName, int maxIO, Logger logger) throws MessagingException;
+
    private native long size0(long handle) throws MessagingException;
 
    private native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws MessagingException;
@@ -401,6 +495,7 @@
 
    /** A native method that does nothing, and just validate if the ELF dependencies are loaded and on the correct platform as this binary format */
    private static native int getNativeVersion();
+   
 
    /** Poll asynchrounous events from internal queues */
    private static native void internalPollEvents(long handler);
@@ -408,14 +503,12 @@
    // Inner classes
    // -----------------------------------------------------------------------------------------
 
-   private class PollerThread extends Thread
+   private class PollerRunnable implements Runnable
    {
-      PollerThread()
+      PollerRunnable()
       {
-         super("NativePoller for " + fileName);
       }
 
-      @Override
       public void run()
       {
          try
@@ -428,6 +521,7 @@
             // Case the poller thread is interrupted, this will allow us to
             // restart the thread when required
             poller = null;
+            pollerLatch.down();
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -60,7 +60,7 @@
          throw new NullPointerException("buffer");
       }
 
-      this.buffer = buffer.slice();
+      this.buffer = buffer;
       capacity = buffer.remaining();
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -48,8 +48,6 @@
     */
    void open(int maxIO) throws Exception;
 
-   void setBufferCallback(BufferCallback callback);
-
    int getAlignment() throws Exception;
 
    int calculateBlockStart(int position) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -42,8 +42,14 @@
    boolean isSupportsCallbacks();
 
    ByteBuffer newBuffer(int size);
+   
+   void releaseBuffer(ByteBuffer buffer);
+   
+   void setBufferCallback(BufferCallback bufferCallback);
+   
+   BufferCallback getBufferCallback();
 
-   // Avoid using this method in production as it creates an unecessary copy
+   // To be used in tests only
    ByteBuffer wrapBuffer(byte[] bytes);
 
    int getAlignment();

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -25,8 +25,7 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -60,18 +59,26 @@
    private AsynchronousFile aioFile;
 
    private final AtomicLong position = new AtomicLong(0);
+   
+   private BufferCallback bufferCallback;
 
-   // A context switch on AIO would make it to synchronize the disk before
-   // switching to the new thread, what would cause
-   // serious performance problems. Because of that we make all the writes on
-   // AIO using a single thread.
-   private ExecutorService executor;
+   /** A context switch on AIO would make it to synchronize the disk before
+       switching to the new thread, what would cause
+       serious performance problems. Because of that we make all the writes on
+       AIO using a single thread. */
+   private final Executor executor;
 
-   public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO)
+   /** The pool for Thread pollers */
+   private final Executor pollerExecutor;
+
+   public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final BufferCallback bufferCallback, final Executor executor, final Executor pollerExecutor)
    {
       this.journalDir = journalDir;
       this.fileName = fileName;
       this.maxIO = maxIO;
+      this.bufferCallback = bufferCallback;
+      this.executor = executor;
+      this.pollerExecutor = pollerExecutor;
    }
 
    public boolean isOpen() 
@@ -99,10 +106,20 @@
    {
       checkOpened();
       opened = false;
-      executor.shutdown();
 
-      while (!executor.awaitTermination(60, TimeUnit.SECONDS))
+      final CountDownLatch donelatch = new CountDownLatch(1);
+      
+      executor.execute(new Runnable()
       {
+         public void run()
+         {
+            donelatch.countDown();
+         }
+      });
+      
+      
+      while (!donelatch.await(60, TimeUnit.SECONDS))
+      {
          log.warn("Executor on file " + fileName + " couldn't complete its tasks in 60 seconds.",
                   new Exception("Warning: Executor on file " + fileName + " couldn't complete its tasks in 60 seconds."));
       }
@@ -190,10 +207,10 @@
    public synchronized void open(final int currentMaxIO) throws Exception
    {
       opened = true;
-      executor = Executors.newSingleThreadExecutor();
       aioFile = newFile();
       aioFile.open(journalDir + "/" + fileName, currentMaxIO);
       position.set(0);
+      aioFile.setBufferCallback(bufferCallback);
 
    }
 
@@ -289,7 +306,7 @@
     */
    protected AsynchronousFile newFile()
    {
-      return new AsynchronousFileImpl();
+      return new AsynchronousFileImpl(executor, pollerExecutor);
    }
 
    // Private methods
@@ -300,24 +317,7 @@
                           final int bytesToWrite,
                           final long positionToWrite)
    {
-      executor.execute(new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
-            }
-            catch (Exception e)
-            {
-               log.warn(e.getMessage(), e);
-               if (callback != null)
-               {
-                  callback.onError(-1, e.getMessage());
-               }
-            }
-         }
-      });
+      aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
    }
 
    private void checkOpened() throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -23,9 +23,13 @@
 package org.jboss.messaging.core.journal.impl;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
 import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.utils.JBMThreadFactory;
 
 /**
  * 
@@ -36,6 +40,16 @@
  */
 public class AIOSequentialFileFactory extends AbstractSequentialFactory
 {
+   
+   /** A single AIO write executor for every AIO File.
+    *  This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
+    *  And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
+   private final Executor writeExecutor = Executors.newSingleThreadExecutor();
+   
+
+   private final Executor pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), false));
+
+
    public AIOSequentialFileFactory(final String journalDir)
    {
       super(journalDir);
@@ -43,7 +57,7 @@
 
    public SequentialFile createSequentialFile(final String fileName, final int maxIO)
    {
-      return new AIOSequentialFile(journalDir, fileName, maxIO);
+      return new AIOSequentialFile(journalDir, fileName, maxIO, bufferCallback, writeExecutor, pollerExecutor);
    }
 
    public boolean isSupportsCallbacks()
@@ -62,12 +76,12 @@
       {
          size = (size / 512 + 1) * 512;
       }
-      return ByteBuffer.allocateDirect(size);
+      return AsynchronousFileImpl.newBuffer(size);
    }
 
    public void clearBuffer(final ByteBuffer directByteBuffer)
    {
-      AsynchronousFileImpl.resetBuffer(directByteBuffer, directByteBuffer.limit());
+      AsynchronousFileImpl.clearBuffer(directByteBuffer);
    }
 
    public int getAlignment()
@@ -91,4 +105,12 @@
 
       return pos;
    }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+    */
+   public void releaseBuffer(ByteBuffer buffer)
+   {
+      AsynchronousFileImpl.destroyBuffer(buffer);
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -28,6 +28,7 @@
 import java.util.Arrays;
 import java.util.List;
 
+import org.jboss.messaging.core.journal.BufferCallback;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
 
@@ -44,6 +45,8 @@
    private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
 
    protected final String journalDir;
+   
+   protected BufferCallback bufferCallback;
 
    public AbstractSequentialFactory(final String journalDir)
    {
@@ -85,4 +88,22 @@
       return Arrays.asList(fileNames);
    }
 
+   /**
+    * @return the bufferCallback
+    */
+   public BufferCallback getBufferCallback()
+   {
+      return bufferCallback;
+   }
+
+   /**
+    * @param bufferCallback the bufferCallback to set
+    */
+   public void setBufferCallback(BufferCallback bufferCallback)
+   {
+      this.bufferCallback = bufferCallback;
+   }
+
+   
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -274,6 +274,8 @@
       this.syncNonTransactional = syncNonTransactional;
 
       this.fileFactory = fileFactory;
+      
+      this.fileFactory.setBufferCallback(this.buffersControl.callback);
 
       this.filePrefix = filePrefix;
 
@@ -870,6 +872,10 @@
          throw new IllegalStateException("Journal must be in started state");
       }
 
+      // Disabling life cycle control on buffers, as we are reading the buffer 
+      buffersControl.disable();
+
+
       Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
 
       List<JournalFile> orderedFiles = orderFiles();
@@ -880,12 +886,12 @@
 
       for (JournalFile file : orderedFiles)
       {
+         ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
+
          file.getFile().open(1);
 
-         ByteBuffer bb = fileFactory.newBuffer(fileSize);
+         int bytesRead = file.getFile().read(wholeFileBuffer);
 
-         int bytesRead = file.getFile().read(bb);
-
          if (bytesRead != fileSize)
          {
             // FIXME - We should extract everything we can from this file
@@ -900,16 +906,19 @@
                                             file.getFile().getFileName());
          }
 
+         
+         wholeFileBuffer.position(0);
+         
          // First long is the ordering timestamp, we just jump its position
-         bb.position(SIZE_HEADER);
+         wholeFileBuffer.position(SIZE_HEADER);
 
          boolean hasData = false;
 
-         while (bb.hasRemaining())
+         while (wholeFileBuffer.hasRemaining())
          {
-            final int pos = bb.position();
+            final int pos = wholeFileBuffer.position();
 
-            byte recordType = bb.get();
+            byte recordType = wholeFileBuffer.get();
 
             if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
             {
@@ -919,7 +928,7 @@
                continue;
             }
 
-            if (bb.position() + SIZE_INT > fileSize)
+            if (wholeFileBuffer.position() + SIZE_INT > fileSize)
             {
                // II - Ignore this record, lets keep looking
                continue;
@@ -927,7 +936,7 @@
 
             // III - Every record has the file-id.
             // This is what supports us from not re-filling the whole file
-            int readFileId = bb.getInt();
+            int readFileId = wholeFileBuffer.getInt();
 
             // IV - This record is from a previous file-usage. The file was
             // reused and we need to ignore this record
@@ -937,7 +946,7 @@
                // next reclaiming will fix it
                hasData = true;
 
-               bb.position(pos + 1);
+               wholeFileBuffer.position(pos + 1);
 
                continue;
             }
@@ -946,24 +955,24 @@
 
             if (isTransaction(recordType))
             {
-               if (bb.position() + SIZE_LONG > fileSize)
+               if (wholeFileBuffer.position() + SIZE_LONG > fileSize)
                {
                   continue;
                }
 
-               transactionID = bb.getLong();
+               transactionID = wholeFileBuffer.getLong();
             }
 
             long recordID = 0;
 
             if (!isCompleteTransaction(recordType))
             {
-               if (bb.position() + SIZE_LONG > fileSize)
+               if (wholeFileBuffer.position() + SIZE_LONG > fileSize)
                {
                   continue;
                }
 
-               recordID = bb.getLong();
+               recordID = wholeFileBuffer.getLong();
 
                maxID = Math.max(maxID, recordID);
             }
@@ -984,14 +993,14 @@
 
             if (isContainsBody(recordType))
             {
-               if (bb.position() + SIZE_INT > fileSize)
+               if (wholeFileBuffer.position() + SIZE_INT > fileSize)
                {
                   continue;
                }
 
-               variableSize = bb.getInt();
+               variableSize = wholeFileBuffer.getInt();
 
-               if (bb.position() + variableSize > fileSize)
+               if (wholeFileBuffer.position() + variableSize > fileSize)
                {
                   log.warn("Record at position " + pos +
                            " file:" +
@@ -1002,12 +1011,12 @@
 
                if (recordType != DELETE_RECORD_TX)
                {
-                  userRecordType = bb.get();
+                  userRecordType = wholeFileBuffer.get();
                }
 
                record = new byte[variableSize];
 
-               bb.get(record);
+               wholeFileBuffer.get(record);
             }
 
             if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
@@ -1015,11 +1024,11 @@
                if (recordType == PREPARE_RECORD)
                {
                   // Add the variable size required for preparedTransactions
-                  preparedTransactionExtraDataSize = bb.getInt();
+                  preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
                }
                // Both commit and record contain the recordSummary, and this is
                // used to calculate the record-size on both record-types
-               variableSize += bb.getInt() * SIZE_INT * 2;
+               variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
             }
 
             int recordSize = getRecordSize(recordType);
@@ -1042,11 +1051,11 @@
                continue;
             }
 
-            int oldPos = bb.position();
+            int oldPos = wholeFileBuffer.position();
 
-            bb.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+            wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
 
-            int checkSize = bb.getInt();
+            int checkSize = wholeFileBuffer.getInt();
 
             // VII - The checkSize at the end has to match with the size
             // informed at the beggining.
@@ -1063,12 +1072,12 @@
                // next reclaiming will fix it
                hasData = true;
 
-               bb.position(pos + SIZE_BYTE);
+               wholeFileBuffer.position(pos + SIZE_BYTE);
 
                continue;
             }
 
-            bb.position(oldPos);
+            wholeFileBuffer.position(oldPos);
 
             // At this point everything is checked. So we relax and just load
             // the data now.
@@ -1190,10 +1199,10 @@
 
                   byte extraData[] = new byte[preparedTransactionExtraDataSize];
 
-                  bb.get(extraData);
+                  wholeFileBuffer.get(extraData);
 
                   // Pair <FileID, NumberOfElements>
-                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
 
                   tx.prepared = true;
 
@@ -1231,7 +1240,7 @@
                   // We need to read it even if transaction was not found, or
                   // the reading checks would fail
                   // Pair <OrderId, NumberOfElements>
-                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
 
                   // The commit could be alone on its own journal-file and the
                   // whole transaction body was reclaimed but not the
@@ -1319,18 +1328,20 @@
                }
             }
 
-            checkSize = bb.getInt();
+            checkSize = wholeFileBuffer.getInt();
 
             // This is a sanity check about the loading code itself.
             // If this checkSize doesn't match, it means the reading method is
             // not doing what it was supposed to do
             if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
             {
-               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() + ", pos = " + pos);
             }
 
-            lastDataPos = bb.position();
+            lastDataPos = wholeFileBuffer.position();
          }
+         
+         fileFactory.releaseBuffer(wholeFileBuffer);
 
          file.getFile().close();
 
@@ -1345,6 +1356,8 @@
          }
       }
 
+      buffersControl.enable();
+      
       // Create any more files we need
 
       // FIXME - size() involves a scan
@@ -1377,11 +1390,6 @@
       {
          currentFile.getFile().open();
 
-         if (reuseBufferSize > 0)
-         {
-            currentFile.getFile().setBufferCallback(buffersControl.callback);
-         }
-
          currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
 
          currentFile.setOffset(currentFile.getFile().position());
@@ -1680,6 +1688,8 @@
          freeFiles.clear();
 
          openedFiles.clear();
+         
+         buffersControl.clearPoll();
 
          state = STATE_STOPPED;
       }
@@ -1932,8 +1942,19 @@
       return recordSize;
    }
 
+   
+   /** 
+    * This method requires bufferControl disabled, or the reads are going to be invalid
+    * */
    private List<JournalFile> orderFiles() throws Exception
    {
+      
+      if (buffersControl.enabled)
+      {
+         // Sanity check, this shouldn't happen unless someone made an invalid change on the code
+         throw new IllegalStateException("Buffer life cycle control needs to be disabled at this point!!!");
+      }
+      
       List<String> fileNames = fileFactory.listFiles(fileExtension);
 
       List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
@@ -1949,6 +1970,10 @@
          file.read(bb);
 
          int orderingID = bb.getInt();
+         
+         fileFactory.releaseBuffer(bb);
+         
+         bb = null;
 
          if (nextOrderingId.get() < orderingID)
          {
@@ -2105,11 +2130,6 @@
       file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
 
       file.setOffset(file.getFile().calculateBlockStart(SIZE_HEADER));
-
-      if (reuseBufferSize > 0)
-      {
-         file.getFile().setBufferCallback(buffersControl.callback);
-      }
    }
 
    private int generateOrderingID()
@@ -2387,9 +2407,22 @@
       /** This queue is fed by {@link JournalImpl.ReuseBuffersController.LocalBufferCallback}} which is called directly by NIO or NIO.
        * On the case of the AIO this is almost called by the native layer as soon as the buffer is not being used any more
        * and ready to be reused or GCed */
-      private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+      private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
+      
+      /** During reload we may disable/enable buffer reuse */
+      private boolean enabled = true;
 
       final BufferCallback callback = new LocalBufferCallback();
+      
+      public void enable()
+      {
+         this.enabled = true;
+      }
+      
+      public void disable()
+      {
+         this.enabled = false;
+      }
 
       public ByteBuffer newBuffer(final int size)
       {
@@ -2398,11 +2431,11 @@
          // just to cleanup this
          if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
          {
-            trace("Clearing reuse buffers queue with " + reuseBuffers.size() + " elements");
+            trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + " elements");
 
             bufferReuseLastTime = System.currentTimeMillis();
 
-            reuseBuffers.clear();
+            clearPoll();
          }
 
          // if a buffer is bigger than the configured-size, we just create a new
@@ -2418,7 +2451,7 @@
             int alignedSize = fileFactory.calculateBlockSize(size);
 
             // Try getting a buffer from the queue...
-            ByteBuffer buffer = reuseBuffers.poll();
+            ByteBuffer buffer = reuseBuffersQueue.poll();
 
             if (buffer == null)
             {
@@ -2434,24 +2467,41 @@
 
                fileFactory.clearBuffer(buffer);
             }
-
+            
             buffer.rewind();
 
             return buffer;
          }
       }
 
+      public void clearPoll()
+      {
+         ByteBuffer reusedBuffer;
+         
+         while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
+         {
+            fileFactory.releaseBuffer(reusedBuffer);
+         }
+      }
+
       private class LocalBufferCallback implements BufferCallback
       {
          public void bufferDone(final ByteBuffer buffer)
          {
-            bufferReuseLastTime = System.currentTimeMillis();
-
-            // If a buffer has any other than the configured size, the buffer
-            // will be just sent to GC
-            if (buffer.capacity() == reuseBufferSize)
+            if (enabled)
             {
-               reuseBuffers.offer(buffer);
+               bufferReuseLastTime = System.currentTimeMillis();
+   
+               // If a buffer has any other than the configured size, the buffer
+               // will be just sent to GC
+               if (buffer.capacity() == reuseBufferSize)
+               {
+                  reuseBuffersQueue.offer(buffer);
+               }
+               else
+               {
+                  fileFactory.releaseBuffer(buffer);
+               }
             }
          }
       }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -54,10 +54,11 @@
 
    BufferCallback bufferCallback;
 
-   public NIOSequentialFile(final String directory, final String fileName)
+   public NIOSequentialFile(final String directory, final String fileName, final BufferCallback bufferCallback)
    {
       this.directory = directory;
       file = new File(directory + "/" + fileName);
+      this.bufferCallback = bufferCallback;
    }
 
    public int getAlignment()
@@ -92,11 +93,6 @@
       open();
    }
 
-   public void setBufferCallback(final BufferCallback callback)
-   {
-      bufferCallback = callback;
-   }
-
    public void fill(final int position, final int size, final byte fillCharacter) throws Exception
    {
       ByteBuffer bb = ByteBuffer.allocateDirect(size);

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -53,7 +53,7 @@
    // maxIO is ignored on NIO
    public SequentialFile createSequentialFile(final String fileName, final int maxIO)
    {
-      return new NIOSequentialFile(journalDir, fileName);
+      return new NIOSequentialFile(journalDir, fileName, bufferCallback);
    }
 
    public boolean isSupportsCallbacks()
@@ -94,4 +94,12 @@
       return bytes;
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+    */
+   public void releaseBuffer(ByteBuffer buffer)
+   {
+      // nothing to be done here
+   }
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -882,9 +882,11 @@
       configuration.setSecurityEnabled(false);
       configuration.setBindingsDirectory(getBindingsDir(node, backup));
       configuration.setJournalMinFiles(2);
+      configuration.setJournalMaxAIO(1000);
       configuration.setJournalDirectory(getJournalDir(node, backup));
       configuration.setJournalFileSize(100 * 1024);
-      configuration.setJournalType(JournalType.NIO);
+      configuration.setJournalType(JournalType.ASYNCIO);
+      configuration.setJournalMaxAIO(1000);
       configuration.setPagingDirectory(getPageDir(node, backup));
       configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, backup));
       configuration.setClustered(true);
@@ -978,7 +980,8 @@
       configuration.setJournalMinFiles(2);
       configuration.setJournalDirectory(getJournalDir(node, false));
       configuration.setJournalFileSize(100 * 1024);
-      configuration.setJournalType(JournalType.NIO);
+      configuration.setJournalType(JournalType.ASYNCIO);
+      configuration.setJournalMaxAIO(1000);
       configuration.setPagingDirectory(getPageDir(node, false));
       configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
       configuration.setClustered(true);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -124,7 +124,7 @@
          backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
          backupConf.setJournalFileSize(100 * 1024);
 
-         backupConf.setJournalType(JournalType.NIO);
+         backupConf.setJournalType(JournalType.ASYNCIO);
 
          backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
          backupConf.setPagingGlobalWatermarkSize(pageSize);
@@ -164,7 +164,7 @@
          liveConf.setPagingGlobalWatermarkSize(pageSize);
          liveConf.setJournalFileSize(100 * 1024);
 
-         liveConf.setJournalType(JournalType.NIO);
+         liveConf.setJournalType(JournalType.ASYNCIO);
       }
 
       if (fileBased)
@@ -208,7 +208,7 @@
          backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
          backupConf.setJournalFileSize(100 * 1024);
 
-         backupConf.setJournalType(JournalType.NIO);
+         backupConf.setJournalType(JournalType.ASYNCIO);
 
          backupConf.setPagingMaxGlobalSizeBytes(-1);
          backupConf.setPagingGlobalWatermarkSize(-1);
@@ -262,7 +262,7 @@
          liveConf.setPagingGlobalWatermarkSize(-1);
          liveConf.setJournalFileSize(100 * 1024);
 
-         liveConf.setJournalType(JournalType.NIO);
+         liveConf.setJournalType(JournalType.ASYNCIO);
          liveServer = Messaging.newMessagingServer(liveConf);
       }
       else

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -103,7 +103,7 @@
       backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
       backupConf.setJournalFileSize(100 * 1024);
       
-      backupConf.setJournalType(JournalType.NIO);
+      backupConf.setJournalType(JournalType.ASYNCIO);
 
       backupConf.setSecurityEnabled(false);
       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -124,7 +124,7 @@
 
       liveConf.setJournalFileSize(100 * 1024);
       
-      liveConf.setJournalType(JournalType.NIO);
+      liveConf.setJournalType(JournalType.ASYNCIO);
 
 
       liveConf.setSecurityEnabled(false);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -382,7 +382,7 @@
          backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
          backupConf.setJournalFileSize(100 * 1024);
 
-         backupConf.setJournalType(JournalType.NIO);
+         backupConf.setJournalType(JournalType.ASYNCIO);
 
          backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
          backupConf.setPagingGlobalWatermarkSize(pageSize);
@@ -422,7 +422,7 @@
          liveConf.setPagingGlobalWatermarkSize(pageSize);
          liveConf.setJournalFileSize(100 * 1024);
 
-         liveConf.setJournalType(JournalType.NIO);
+         liveConf.setJournalType(JournalType.ASYNCIO);
       }
 
       if (fileBased)

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -85,7 +85,7 @@
       backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
       backupConf.setJournalFileSize(100 * 1024);
 
-      backupConf.setJournalType(JournalType.NIO);
+      backupConf.setJournalType(JournalType.ASYNCIO);
 
       backupConf.setSecurityEnabled(false);
       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -106,7 +106,7 @@
 
       liveConf.setJournalFileSize(100 * 1024);
 
-      liveConf.setJournalType(JournalType.NIO);
+      liveConf.setJournalType(JournalType.ASYNCIO);
 
       liveConf.setSecurityEnabled(false);
       liveConf.getAcceptorConfigurations()

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -25,7 +25,6 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
@@ -78,6 +77,7 @@
       ByteBuffer buff = factory.newBuffer(10);
       assertEquals(512, buff.limit());
       file.close();
+      factory.releaseBuffer(buff);
    }
 
    public void testBlockCallback() throws Exception
@@ -131,7 +131,7 @@
 
       BlockCallback callback = new BlockCallback();
 
-      final int NUMBER_OF_RECORDS = 10000;
+      final int NUMBER_OF_RECORDS = 500;
 
       SequentialFile file = factory.createSequentialFile("callbackBlock.log", 1000);
       file.open();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -23,7 +23,6 @@
 package org.jboss.messaging.tests.integration.journal;
 
 import java.io.File;
-import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
 import org.jboss.messaging.core.journal.SequentialFileFactory;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -60,7 +60,7 @@
 
       configuration.start();
 
-      configuration.setJournalType(JournalType.NIO);
+      configuration.setJournalType(JournalType.ASYNCIO);
 
       final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
       journal.start();

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -202,7 +202,7 @@
    {
       Journal journal =
          new JournalImpl(10 * 1024 * 1024, 10, true, true, getFileFactory(),
-               "jbm-data", "jbm", 5000, 0);
+               "jbm-data", "jbm", 5000, 10 * 1024);
       
       journal.start();
       

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -321,7 +321,7 @@
       config.setJournalFileSize(10 * 1024 * 1024);
       config.setJournalMinFiles(5);
       
-      config.setJournalType(JournalType.NIO);
+      config.setJournalType(JournalType.ASYNCIO);
       
       return config;
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -22,12 +22,7 @@
 
 package org.jboss.messaging.tests.unit.core.asyncio;
 
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.BufferCallback;
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-
+import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.nio.charset.Charset;
@@ -35,7 +30,17 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.BufferCallback;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.JBMThreadFactory;
+
 /**
  * 
  * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
@@ -53,19 +58,40 @@
    private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
 
    byte commonBuffer[] = null;
+   
+   ExecutorService executor;
+   
+   ExecutorService pollerExecutor;
 
+
    private static void debug(final String msg)
    {
       log.debug(msg);
    }
 
+   
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), false));
+      executor = Executors.newSingleThreadExecutor();
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      executor.shutdown();
+      pollerExecutor.shutdown();
+      super.tearDown();
+   }
+   
    /** 
     * Opening and closing a file immediately can lead to races on the native layer,
     * creating crash conditions.
     * */
    public void testOpenClose() throws Exception
    {
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
       for (int i = 0; i < 1000; i++)
       {
          controller.open(FILE_NAME, 10000);
@@ -73,16 +99,16 @@
 
       }
    }
-   
+
    public void testFileNonExistent() throws Exception
    {
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
       for (int i = 0; i < 1000; i++)
       {
          try
          {
             controller.open("/non-existent/IDontExist.error", 10000);
-            fail ("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail.");
+            fail("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail.");
          }
          catch (Throwable ignored)
          {
@@ -106,21 +132,22 @@
     */
    public void testTwoFiles() throws Exception
    {
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
-      final AsynchronousFileImpl controller2 = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+      final AsynchronousFileImpl controller2 = new AsynchronousFileImpl(executor, pollerExecutor);
       controller.open(FILE_NAME + ".1", 10000);
       controller2.open(FILE_NAME + ".2", 10000);
 
       int numberOfLines = 1000;
       int size = 1024;
 
+      ByteBuffer buffer = null;
       try
       {
          CountDownLatch latchDone = new CountDownLatch(numberOfLines);
          CountDownLatch latchDone2 = new CountDownLatch(numberOfLines);
 
-         ByteBuffer block = controller.newBuffer(size);
-         encodeBufer(block);
+         buffer = AsynchronousFileImpl.newBuffer(size);
+         encodeBufer(buffer);
 
          preAlloc(controller, numberOfLines * size);
          preAlloc(controller2, numberOfLines * size);
@@ -144,8 +171,8 @@
          {
             CountDownCallback tmp2 = iter2.next();
 
-            controller.write(counter * size, size, block, tmp);
-            controller.write(counter * size, size, block, tmp2);
+            controller.write(counter * size, size, buffer, tmp);
+            controller.write(counter * size, size, buffer, tmp2);
             if (++counter % 5000 == 0)
             {
                debug(5000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
@@ -202,6 +229,7 @@
       }
       finally
       {
+         AsynchronousFileImpl.destroyBuffer(buffer);
          try
          {
             controller.close();
@@ -249,7 +277,8 @@
          }
       }
 
-      AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+      ByteBuffer buffer = null;
       try
       {
 
@@ -258,13 +287,13 @@
          controller.open(FILE_NAME, 10);
          controller.close();
 
-         controller = new AsynchronousFileImpl();
+         controller = new AsynchronousFileImpl(executor, pollerExecutor);
 
          controller.open(FILE_NAME, 10);
 
          controller.fill(0, 1, 512, (byte)'j');
 
-         ByteBuffer buffer = controller.newBuffer(SIZE);
+         buffer = AsynchronousFileImpl.newBuffer(SIZE);
 
          buffer.clear();
 
@@ -305,8 +334,7 @@
          {
          }
 
-         // newBuffer = ByteBuffer.allocateDirect(512);
-         newBuffer = controller.newBuffer(512);
+         newBuffer = AsynchronousFileImpl.newBuffer(512);
          callbackLocal = new LocalCallback();
          controller.read(0, 512, newBuffer, callbackLocal);
          callbackLocal.latch.await();
@@ -325,6 +353,8 @@
       }
       finally
       {
+         AsynchronousFileImpl.destroyBuffer(buffer);
+         
          try
          {
             controller.close();
@@ -340,7 +370,7 @@
    public void testBufferCallbackUniqueBuffers() throws Exception
    {
       boolean closed = false;
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
       try
       {
          final int NUMBER_LINES = 1000;
@@ -366,7 +396,7 @@
          CountDownCallback aio = new CountDownCallback(latch);
          for (int i = 0; i < NUMBER_LINES; i++)
          {
-            ByteBuffer buffer = controller.newBuffer(SIZE);
+            ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
             buffer.rewind();
             for (int j = 0; j < SIZE; j++)
             {
@@ -400,6 +430,11 @@
             }
          }
 
+         for (ByteBuffer bufferTmp : buffers)
+         {
+            AsynchronousFileImpl.destroyBuffer(bufferTmp);
+         }
+
          buffers.clear();
 
       }
@@ -415,7 +450,8 @@
    public void testBufferCallbackAwaysSameBuffer() throws Exception
    {
       boolean closed = false;
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+      ByteBuffer buffer = null;
       try
       {
          final int NUMBER_LINES = 1000;
@@ -440,7 +476,7 @@
          CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
          CountDownCallback aio = new CountDownCallback(latch);
 
-         ByteBuffer buffer = controller.newBuffer(SIZE);
+         buffer = AsynchronousFileImpl.newBuffer(SIZE);
          buffer.rewind();
          for (int j = 0; j < SIZE; j++)
          {
@@ -482,6 +518,7 @@
       }
       finally
       {
+         AsynchronousFileImpl.destroyBuffer(buffer);
          if (!closed)
          {
             controller.close();
@@ -491,11 +528,22 @@
 
    public void testRead() throws Exception
    {
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+      controller.setBufferCallback(new BufferCallback()
+      {
+
+         public void bufferDone(ByteBuffer buffer)
+         {
+            AsynchronousFileImpl.destroyBuffer(buffer);
+         }
+
+      });
+
+      ByteBuffer readBuffer = null;
       try
       {
 
-         final int NUMBER_LINES = 5000;
+         final int NUMBER_LINES = 10000;
          final int SIZE = 1024;
 
          controller.open(FILE_NAME, 1000);
@@ -508,13 +556,15 @@
 
             for (int i = 0; i < NUMBER_LINES; i++)
             {
-               ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
-               addString("Str value " + i + "\n", buffer);
-               for (int j = buffer.position(); j < buffer.capacity() - 1; j++)
+               if (i % 1000 == 0)
                {
-                  buffer.put((byte)' ');
+                  System.out.println("Wrote " + i + " lines");
                }
-               buffer.put((byte)'\n');
+               ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
+               for (int j = 0; j < SIZE; j++)
+               {
+                  buffer.put(getSamplebyte(j));
+               }
 
                controller.write(i * SIZE, SIZE, buffer, aio);
             }
@@ -527,48 +577,76 @@
          // If you call close you're supposed to wait events to finish before
          // closing it
          controller.close();
+         controller.setBufferCallback(null);
+
          controller.open(FILE_NAME, 10);
 
-         ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+         readBuffer = AsynchronousFileImpl.newBuffer(SIZE);
 
+         Thread t = null;
+
          for (int i = 0; i < NUMBER_LINES; i++)
          {
-            newBuffer.clear();
-            addString("Str value " + i + "\n", newBuffer);
-            for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++)
+            if (i % 1000 == 0)
             {
-               newBuffer.put((byte)' ');
+               System.out.println("Read " + i + " lines");
             }
-            newBuffer.put((byte)'\n');
+            AsynchronousFileImpl.clearBuffer(readBuffer);
 
             CountDownLatch latch = new CountDownLatch(1);
             CountDownCallback aio = new CountDownCallback(latch);
 
-            ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+            controller.read(i * SIZE, SIZE, readBuffer, aio);
 
-            controller.read(i * SIZE, SIZE, buffer, aio);
+            // at the first 20 lines, we will force a lot of garbage, to make sure the pointers are well isolated from Garbage Collection
+            if (i < 20)
+            {
+               if (t != null)
+               {
+                  t.join();
+               }
+
+               t = new Thread()
+               {
+                  public void run()
+                  {
+                     // Force a lot of garbage during reading, to make sure the memory read is well isolated from
+                     // garbage collection
+                     WeakReference<Object> garbage = new WeakReference<Object>(new Object());
+                     // Stays in loop until GC kicks in to clean up this reference
+                     while (garbage.get() != null)
+                     {
+                        @SuppressWarnings("unused")
+                        byte[] garbage2 = new byte[10 * 1024 * 1024]; // More Garbage
+                     }
+
+                  }
+               };
+
+               t.start();
+            }
+
             latch.await();
             assertFalse(aio.errorCalled);
             assertTrue(aio.doneCalled);
 
             byte bytesRead[] = new byte[SIZE];
-            byte bytesCompare[] = new byte[SIZE];
+            readBuffer.get(bytesRead);
 
-            newBuffer.rewind();
-            newBuffer.get(bytesCompare);
-            buffer.rewind();
-            buffer.get(bytesRead);
-
             for (int count = 0; count < SIZE; count++)
             {
-               assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
+               assertEquals("byte position " + count + " differs on line " + i + " position = " + count,
+                            getSamplebyte(count),
+                            bytesRead[count]);
             }
-
-            assertTrue(buffer.equals(newBuffer));
          }
       }
       finally
       {
+         if (readBuffer != null)
+         {
+            AsynchronousFileImpl.destroyBuffer(readBuffer);
+         }
          try
          {
             controller.close();
@@ -587,7 +665,7 @@
     *  The file is also read after being written to validate its correctness */
    public void testConcurrentClose() throws Exception
    {
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
       try
       {
 
@@ -598,10 +676,20 @@
          controller.open(FILE_NAME, 10000);
 
          controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+         
+         controller.setBufferCallback(new BufferCallback()
+         {
 
+            public void bufferDone(ByteBuffer buffer)
+            {
+               AsynchronousFileImpl.destroyBuffer(buffer);
+            }
+            
+         });
+
          for (int i = 0; i < NUMBER_LINES; i++)
          {
-            ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+            ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
 
             buffer.clear();
             addString("Str value " + i + "\n", buffer);
@@ -618,14 +706,16 @@
          // If you call close you're supposed to wait events to finish before
          // closing it
          controller.close();
+         
+         controller.setBufferCallback(null);
 
          assertEquals(0, readLatch.getCount());
          readLatch.await();
          controller.open(FILE_NAME, 10);
 
-         ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+         ByteBuffer newBuffer = AsynchronousFileImpl.newBuffer(SIZE);
 
-         ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+         ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
 
          for (int i = 0; i < NUMBER_LINES; i++)
          {
@@ -659,6 +749,9 @@
 
             assertTrue(buffer.equals(newBuffer));
          }
+         
+         AsynchronousFileImpl.destroyBuffer(newBuffer);
+         AsynchronousFileImpl.destroyBuffer(buffer);
 
       }
       finally
@@ -676,15 +769,17 @@
 
    private void asyncData(final int numberOfLines, final int size, final int aioLimit) throws Exception
    {
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
       controller.open(FILE_NAME, aioLimit);
-
+      
+      ByteBuffer buffer = null;
+      
       try
       {
          CountDownLatch latchDone = new CountDownLatch(numberOfLines);
 
-         ByteBuffer block = controller.newBuffer(size);
-         encodeBufer(block);
+         buffer = AsynchronousFileImpl.newBuffer(size);
+         encodeBufer(buffer);
 
          preAlloc(controller, numberOfLines * size);
 
@@ -701,7 +796,7 @@
          int counter = 0;
          for (CountDownCallback tmp : list)
          {
-            controller.write(counter * size, size, block, tmp);
+            controller.write(counter * size, size, buffer, tmp);
             if (++counter % 20000 == 0)
             {
                debug(20000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
@@ -736,6 +831,7 @@
       }
       finally
       {
+         AsynchronousFileImpl.destroyBuffer(buffer);
          try
          {
             controller.close();
@@ -749,16 +845,17 @@
 
    public void testDirectSynchronous() throws Exception
    {
+      ByteBuffer buffer = null;
       try
       {
          final int NUMBER_LINES = 3000;
          final int SIZE = 1024;
 
-         final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+         final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
          controller.open(FILE_NAME, 2000);
 
-         ByteBuffer block = ByteBuffer.allocateDirect(SIZE);
-         encodeBufer(block);
+         buffer = AsynchronousFileImpl.newBuffer(SIZE);
+         encodeBufer(buffer);
 
          preAlloc(controller, NUMBER_LINES * SIZE);
 
@@ -768,7 +865,7 @@
          {
             CountDownLatch latchDone = new CountDownLatch(1);
             CountDownCallback aioBlock = new CountDownCallback(latchDone);
-            controller.write(i * 512, 512, block, aioBlock);
+            controller.write(i * 512, 512, buffer, aioBlock);
             latchDone.await();
             assertTrue(aioBlock.doneCalled);
             assertFalse(aioBlock.errorCalled);
@@ -793,27 +890,33 @@
       {
          throw e;
       }
+      finally
+      {
+         AsynchronousFileImpl.destroyBuffer(buffer);
+      }
 
    }
 
    public void testInvalidWrite() throws Exception
    {
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
       controller.open(FILE_NAME, 2000);
-
+      
+      ByteBuffer buffer = null;
+      
       try
       {
          final int SIZE = 512;
 
-         ByteBuffer block = controller.newBuffer(SIZE);
-         encodeBufer(block);
+         buffer = AsynchronousFileImpl.newBuffer(SIZE);
+         encodeBufer(buffer);
 
          preAlloc(controller, 10 * 512);
 
          CountDownLatch latchDone = new CountDownLatch(1);
 
          CountDownCallback aioBlock = new CountDownCallback(latchDone);
-         controller.write(11, 512, block, aioBlock);
+         controller.write(11, 512, buffer, aioBlock);
 
          latchDone.await();
 
@@ -827,6 +930,7 @@
       }
       finally
       {
+         AsynchronousFileImpl.destroyBuffer(buffer);
          controller.close();
       }
 
@@ -834,10 +938,10 @@
 
    public void testInvalidAlloc() throws Exception
    {
-      AsynchronousFileImpl controller = new AsynchronousFileImpl();
       try
       {
-         ByteBuffer buffer = controller.newBuffer(300);
+         @SuppressWarnings("unused")
+         ByteBuffer buffer = AsynchronousFileImpl.newBuffer(300);
          fail("Exception expected");
       }
       catch (Exception ignored)
@@ -845,10 +949,58 @@
       }
 
    }
+   
+   // This is in particular testing for http://bugs.sun.com/view_bug.do?bug_id=6791815
+   public void testAllocations() throws Exception
+   {
+      final AtomicInteger errors = new AtomicInteger(0);
+ 
+      Thread ts[] = new Thread[100];
 
+      final CountDownLatch align = new CountDownLatch(ts.length);
+      final CountDownLatch start = new CountDownLatch(1);
+
+      for (int i = 0; i < ts.length; i++)
+      {
+         ts[i] = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  align.countDown();
+                  start.await();
+                  for (int i = 0; i < 1000; i++)
+                  {
+                     ByteBuffer buffer = AsynchronousFileImpl.newBuffer(512);
+                     AsynchronousFileImpl.destroyBuffer(buffer);
+                  }
+               }
+               catch (Throwable e)
+               {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+            }
+         };
+         ts[i].start();
+      }
+      
+      align.await();
+      start.countDown();
+      
+      for (Thread t: ts)
+      {
+         t.join();
+      }
+      
+      assertEquals(0, errors.get());
+   }
+
+
    public void testSize() throws Exception
    {
-      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
 
       final int NUMBER_LINES = 10;
       final int SIZE = 1024;

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -22,18 +22,19 @@
 
 package org.jboss.messaging.tests.unit.core.asyncio;
 
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.logging.Logger;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.JBMThreadFactory;
+
 /**
  * 
  * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
@@ -59,18 +60,35 @@
 
    // Executor exec
 
-   Executor executor = Executors.newSingleThreadExecutor();
+   ExecutorService executor;
+   
+   ExecutorService pollerExecutor;
 
-   @Override
+
+   private static void debug(final String msg)
+   {
+      log.debug(msg);
+   }
+
+   
+   
    protected void setUp() throws Exception
    {
       super.setUp();
-      position.set(0);
+      pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), false));
+      executor = Executors.newSingleThreadExecutor();
    }
-
+   
+   protected void tearDown() throws Exception
+   {
+      executor.shutdown();
+      pollerExecutor.shutdown();
+      super.tearDown();
+   }
+   
    public void testMultipleASynchronousWrites() throws Throwable
    {
-      executeTest(false);
+         executeTest(false);
    }
 
    public void testMultipleSynchronousWrites() throws Throwable
@@ -80,15 +98,15 @@
 
    private void executeTest(final boolean sync) throws Throwable
    {
-      log.debug(sync ? "Sync test:" : "Async test");
-      AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl();
+      debug(sync ? "Sync test:" : "Async test");
+      AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl(executor, pollerExecutor);
       jlibAIO.open(FILE_NAME, 21000);
       try
       {
-         log.debug("Preallocating file");
+         debug("Preallocating file");
 
          jlibAIO.fill(0l, NUMBER_OF_THREADS, SIZE * NUMBER_OF_LINES, (byte)0);
-         log.debug("Done Preallocating file");
+         debug("Done Preallocating file");
 
          CountDownLatch latchStart = new CountDownLatch(NUMBER_OF_THREADS + 1);
 
@@ -115,7 +133,7 @@
          }
          long endTime = System.currentTimeMillis();
 
-         log.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
+         debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
                    NUMBER_OF_THREADS *
                    NUMBER_OF_LINES *
                    1000 /
@@ -164,11 +182,17 @@
       {
          super.run();
 
+
+         ByteBuffer buffer = null;
+         
+         synchronized (MultiThreadAsynchronousFileTest.class)
+         {
+            buffer = AsynchronousFileImpl.newBuffer(SIZE);
+         }
+
          try
          {
 
-            ByteBuffer buffer = libaio.newBuffer(SIZE);
-
             // I'm aways reusing the same buffer, as I don't want any noise from
             // malloc on the measurement
             // Encoding buffer
@@ -225,7 +249,7 @@
 
             long endtime = System.currentTimeMillis();
 
-            log.debug(Thread.currentThread().getName() + " Rec/Sec= " +
+            debug(Thread.currentThread().getName() + " Rec/Sec= " +
                       NUMBER_OF_LINES *
                       1000 /
                       (endtime - startTime) +
@@ -246,6 +270,13 @@
             e.printStackTrace();
             failed = e;
          }
+         finally
+         {
+            synchronized (MultiThreadAsynchronousFileTest.class)
+            {
+               AsynchronousFileImpl.destroyBuffer(buffer);
+            }
+         }
 
       }
    }

Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -1,534 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- * 
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- * 
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.unit.core.journal.impl;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.jboss.messaging.core.asyncio.AsynchronousFile;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.journal.IOCallback;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.impl.AIOSequentialFile;
-import org.jboss.messaging.tests.util.UnitTestCase;
-
-/**
- * Test AIOSEquentialFile using an EasyMock
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class AIOSequentialFileTest extends UnitTestCase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   AsynchronousFile mockFile;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testOpen() throws Exception
-   {
-      openFile();
-   }
-
-   public void testAlignment() throws Exception
-   {
-      SequentialFile file = new MockAIOSequentialFileImpl(getTestDir(), "nothing", 1);
-
-      try
-      {
-         file.getAlignment();
-         fail("Exception expected");
-      }
-      catch (Exception ignored)
-      {
-      }
-
-      file = openFile();
-
-      EasyMock.expect(mockFile.getBlockSize()).andReturn(512);
-
-      EasyMock.replay(mockFile);
-
-      assertEquals(512, file.getAlignment());
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testCalculateblockStart() throws Exception
-   {
-      SequentialFile file = new MockAIOSequentialFileImpl(getTestDir(), "nothing", 1);
-
-      try
-      {
-         file.calculateBlockStart(10);
-         fail("Exception expected");
-      }
-      catch (Exception ignored)
-      {
-      }
-
-      file = openFile();
-
-      EasyMock.expect(mockFile.getBlockSize()).andReturn(512);
-
-      EasyMock.replay(mockFile);
-
-      assertEquals(1024, file.calculateBlockStart(900));
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testClose() throws Exception
-   {
-      SequentialFile file = new MockAIOSequentialFileImpl(getTestDir(), "nothing", 1);
-
-      try
-      {
-         file.close();
-         fail("Exception expected");
-      }
-      catch (Exception ignored)
-      {
-      }
-
-      file = openFile();
-
-      mockFile.close();
-
-      EasyMock.replay(mockFile);
-
-      file.close();
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testDelete() throws Exception
-   {
-      File tmpFile = File.createTempFile("temporaryTestFile", ".tmp");
-
-      assertTrue(tmpFile.exists());
-
-      SequentialFile fileImpl = new MockAIOSequentialFileImpl(tmpFile.getParent(), tmpFile.getName(), 1);
-
-      fileImpl.delete();
-
-      // delete on a closed file
-      assertFalse(tmpFile.exists());
-
-      tmpFile = File.createTempFile("temporaryTestFile", ".tmp");
-
-      assertTrue(tmpFile.exists());
-
-      fileImpl = openFile(tmpFile.getParent(), tmpFile.getName());
-
-      mockFile.close();
-
-      EasyMock.replay(mockFile);
-
-      fileImpl.delete();
-
-      assertFalse(tmpFile.exists());
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testFill() throws Exception
-   {
-      SequentialFile fileImpl = openFile();
-
-      validateFill(fileImpl, 3 * 100 * 1024 * 1024, 3, 100 * 1024 * 1024);
-
-      validateFill(fileImpl, 3 * 10 * 1024 * 1024, 3, 10 * 1024 * 1024);
-
-      validateFill(fileImpl, 7 * 1024 * 1024, 7, 1024 * 1024);
-
-      validateFill(fileImpl, 7 * 10 * 1024, 7, 10 * 1024);
-
-      validateFill(fileImpl, 7 * 512, 7, 512);
-
-      validateFill(fileImpl, 300, 1, 512);
-   }
-
-   public void testWriteWithCallback() throws Exception
-   {
-      SequentialFile fileImpl = openFile();
-
-      ByteBuffer buffer = ByteBuffer.allocate(512);
-
-      IOCallback callback = new IOCallback()
-      {
-
-         public void done()
-         {
-         }
-
-         public void onError(int errorCode, String errorMessage)
-         {
-         }
-      };
-
-      mockFile.write(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.same(callback));
-
-      mockFile.close();
-
-      EasyMock.replay(mockFile);
-
-      fileImpl.position(512 * 3);
-
-      fileImpl.write(buffer, callback);
-
-      // We need that to make sure the executor is cleared before the verify
-      fileImpl.close();
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testWriteWithSyncOnCallback() throws Exception
-   {
-      SequentialFile fileImpl = openFile();
-
-      ByteBuffer buffer = ByteBuffer.allocate(512);
-
-      mockFile.write(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
-      EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
-      {
-
-         public Object answer() throws Throwable
-         {
-            IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[3];
-
-            callback.done();
-
-            return null;
-         }
-
-      });
-
-      EasyMock.replay(mockFile);
-
-      fileImpl.position(512 * 3);
-
-      fileImpl.write(buffer, true);
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testWriteWithNoSyncOnCallback() throws Exception
-   {
-      SequentialFile fileImpl = openFile();
-
-      ByteBuffer buffer = ByteBuffer.allocate(512);
-
-      mockFile.write(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
-      EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
-      {
-
-         public Object answer() throws Throwable
-         {
-            IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[3];
-
-            callback.done();
-
-            return null;
-         }
-
-      });
-
-      mockFile.close();
-
-      EasyMock.replay(mockFile);
-
-      fileImpl.position(512 * 3);
-
-      fileImpl.write(buffer, false);
-
-      fileImpl.close();
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testWriteWithSyncAndCallbackError() throws Exception
-   {
-      SequentialFile fileImpl = openFile();
-
-      ByteBuffer buffer = ByteBuffer.allocate(512);
-
-      mockFile.write(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
-      EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
-      {
-
-         public Object answer() throws Throwable
-         {
-            IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[3];
-
-            callback.onError(100, "Fake Message");
-
-            return null;
-         }
-
-      });
-
-      EasyMock.replay(mockFile);
-
-      fileImpl.position(512 * 3);
-
-      try
-      {
-         fileImpl.write(buffer, true);
-         fail("Exception was expected");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(100, e.getCode());
-         assertEquals("Fake Message", e.getMessage());
-      }
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testWriteWithSyncAndException() throws Exception
-   {
-      SequentialFile fileImpl = openFile();
-
-      ByteBuffer buffer = ByteBuffer.allocate(512);
-
-      mockFile.write(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
-      EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
-      {
-
-         public Object answer() throws Throwable
-         {
-            throw new IllegalArgumentException("Fake Message");
-         }
-
-      });
-
-      EasyMock.replay(mockFile);
-
-      fileImpl.position(512 * 3);
-
-      try
-      {
-         fileImpl.write(buffer, true);
-         fail("Exception was expected");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(-1, e.getCode());
-         assertEquals("Fake Message", e.getMessage());
-      }
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testReadWithCallback() throws Exception
-   {
-      SequentialFile fileImpl = openFile();
-
-      ByteBuffer buffer = ByteBuffer.allocate(512);
-
-      IOCallback callback = new IOCallback()
-      {
-
-         public void done()
-         {
-         }
-
-         public void onError(int errorCode, String errorMessage)
-         {
-         }
-      };
-
-      mockFile.read(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.same(callback));
-
-      EasyMock.replay(mockFile);
-
-      fileImpl.position(512 * 3);
-
-      fileImpl.read(buffer, callback);
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testReadWithoutCallback() throws Exception
-   {
-      SequentialFile fileImpl = openFile();
-
-      ByteBuffer buffer = ByteBuffer.allocate(512);
-
-      mockFile.read(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
-      EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
-      {
-
-         public Object answer() throws Throwable
-         {
-            IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[3];
-
-            callback.done();
-
-            return null;
-         }
-
-      });
-
-      EasyMock.replay(mockFile);
-
-      fileImpl.position(512 * 3);
-
-      fileImpl.read(buffer);
-
-      EasyMock.verify(mockFile);
-   }
-
-   public void testReadWithoutCallbackOnError() throws Exception
-   {
-      SequentialFile fileImpl = openFile();
-
-      ByteBuffer buffer = ByteBuffer.allocate(512);
-
-      mockFile.read(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
-      EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
-      {
-
-         public Object answer() throws Throwable
-         {
-            IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[3];
-
-            callback.onError(100, "Fake Message");
-
-            return null;
-         }
-
-      });
-
-      EasyMock.replay(mockFile);
-
-      fileImpl.position(512 * 3);
-
-      try
-      {
-         fileImpl.read(buffer);
-         fail("Expected Exception");
-      }
-      catch (MessagingException e)
-      {
-         assertEquals(100, e.getCode());
-         assertEquals("Fake Message", "Fake Message");
-      }
-
-      EasyMock.verify(mockFile);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-      
-      mockFile = null;
-      File testDir = new File(getTestDir());
-      testDir.mkdirs();
-   }
-
-   // Private -------------------------------------------------------
-
-   private void validateFill(final SequentialFile fileImpl,
-                             final int totalSize,
-                             final int numberOfBlocksExpected,
-                             final long blockSizeExpected) throws Exception
-   {
-      EasyMock.expect(mockFile.getBlockSize()).andReturn(512);
-
-      mockFile.fill(512, numberOfBlocksExpected, blockSizeExpected, (byte)'b');
-
-      EasyMock.replay(mockFile);
-
-      fileImpl.fill(5, totalSize, (byte)'b');
-
-      EasyMock.verify(mockFile);
-
-      EasyMock.reset(mockFile);
-   }
-
-   private SequentialFile openFile() throws Exception
-   {
-      return openFile(getTemporaryDir(), "nothing");
-   }
-
-   private SequentialFile openFile(final String directory, final String fileName) throws Exception
-   {
-      mockFile = EasyMock.createStrictMock(AsynchronousFile.class);
-
-      mockFile.open(directory + "/" + fileName, 1);
-
-      EasyMock.replay(mockFile);
-
-      SequentialFile file = new MockAIOSequentialFileImpl(directory, fileName, 1);
-
-      file.open();
-
-      EasyMock.verify(mockFile);
-
-      EasyMock.reset(mockFile);
-
-      return file;
-   }
-
-   // Inner classes -------------------------------------------------
-
-   class MockAIOSequentialFileImpl extends AIOSequentialFile
-   {
-
-      public MockAIOSequentialFileImpl(final String journalDir, final String fileName, final int maxIO) throws Exception
-      {
-         super(journalDir, fileName, maxIO);
-      }
-
-      @Override
-      protected AsynchronousFile newFile()
-      {
-         return mockFile;
-      }
-
-   }
-
-}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.tests.unit.core.journal.impl;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -474,7 +475,7 @@
     * @param expected
     * @param actual
     */
-   private void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
+   protected void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
    {
       System.out.println("***********************************************");
       System.out.println("Expected list:");
@@ -482,10 +483,14 @@
       {
          System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
       }
-      System.out.println("Actual list:");
-      for (RecordInfo info : actual)
+      if (actual != null)
       {
-         System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+         System.out.println("***********************************************");
+         System.out.println("Actual list:");
+         for (RecordInfo info : actual)
+         {
+            System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+         }
       }
       System.out.println("***********************************************");
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -3018,7 +3018,6 @@
       assertEquals(0, journal.getDataFilesCount());
    }
 
-
    protected abstract int getAlignment();
 
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -574,4 +574,29 @@
       // nothing to be done on the fake Sequential file
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+    */
+   public void releaseBuffer(ByteBuffer buffer)
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFileFactory#getBufferCallback()
+    */
+   public BufferCallback getBufferCallback()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFileFactory#setBufferCallback(org.jboss.messaging.core.journal.BufferCallback)
+    */
+   public void setBufferCallback(BufferCallback bufferCallback)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-05-06 04:42:56 UTC (rev 6683)
@@ -132,7 +132,7 @@
       Configuration config = new ConfigurationImpl();
       config.setJournalDirectory(getJournalDir());
       config.setBindingsDirectory(getBindingsDir());
-      config.setJournalType(JournalType.NIO);
+      config.setJournalType(JournalType.ASYNCIO);
       config.setLargeMessagesDirectory(getLargeMessagesDir());
       return config;
    }
@@ -278,7 +278,7 @@
       configuration.setJournalMinFiles(2);
       configuration.setJournalDirectory(getJournalDir(index, false));
       configuration.setJournalFileSize(100 * 1024);
-      configuration.setJournalType(JournalType.NIO);
+      configuration.setJournalType(JournalType.ASYNCIO);
       configuration.setPagingDirectory(getPageDir(index, false));
       configuration.setLargeMessagesDirectory(getLargeMessagesDir(index, false));
 
@@ -305,7 +305,7 @@
       configuration.setPagingDirectory(getPageDir());
       configuration.setLargeMessagesDirectory(getLargeMessagesDir());
 
-      configuration.setJournalType(JournalType.NIO);
+      configuration.setJournalType(JournalType.ASYNCIO);
 
       configuration.getAcceptorConfigurations().clear();
 




More information about the jboss-cvs-commits mailing list