[jboss-cvs] JBoss Messaging SVN: r4026 - in branches/trunk_tmp_aio: native/src and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Apr 9 21:27:55 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-04-09 21:27:55 -0400 (Wed, 09 Apr 2008)
New Revision: 4026

Added:
   branches/trunk_tmp_aio/native/src/.libs/
   branches/trunk_tmp_aio/native/src/.libs/libJBMLibAIO.so
   branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/IOCallback.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/AIOSequentialFileFactoryTest.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FileFactoryTestBase.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealAIOJournalImplTest.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealNIOJournalImplTest.java
Removed:
   branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_JlibAIO.h
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/JlibAIO.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java
Modified:
   branches/trunk_tmp_aio/.classpath
   branches/trunk_tmp_aio/.project
   branches/trunk_tmp_aio/build-messaging.xml
   branches/trunk_tmp_aio/native/src/
   branches/trunk_tmp_aio/native/src/AsyncFile.cpp
   branches/trunk_tmp_aio/native/src/AsyncFile.h
   branches/trunk_tmp_aio/native/src/LibAIOController.cpp
   branches/trunk_tmp_aio/native/src/Makefile.am
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/Journal.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/MultiThreadWriteNativeTest.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplTest.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestUnit.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
   branches/trunk_tmp_aio/tests/src/org/jboss/test/messaging/JBMServerTestCase.java
Log:
Intermediate commit on temporary branch

Modified: branches/trunk_tmp_aio/.classpath
===================================================================
--- branches/trunk_tmp_aio/.classpath	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/.classpath	2008-04-10 01:27:55 UTC (rev 4026)
@@ -1,21 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
-	<classpathentry kind="src" path="docs/examples/queue-failover/src"/>
-	<classpathentry kind="src" path="docs/examples/embedded/src"/>
-	<classpathentry kind="src" path="output/gen-parsers"/>
-	<classpathentry kind="src" path="docs/examples/bridge/src"/>
-	<classpathentry kind="src" path="docs/examples/stateless-clustered/src"/>
-	<classpathentry kind="src" path="docs/examples/mdb-failure/src"/>
-	<classpathentry kind="src" path="docs/examples/distributed-queue/src"/>
-	<classpathentry kind="src" path="docs/examples/common/src"/>
-	<classpathentry kind="src" path="docs/examples/distributed-topic/src"/>
-	<classpathentry kind="src" path="docs/examples/http/src"/>
-	<classpathentry kind="src" path="docs/examples/mdb/src"/>
-	<classpathentry kind="src" path="docs/examples/queue/src"/>
-	<classpathentry kind="src" path="docs/examples/secure-socket/src"/>
-	<classpathentry kind="src" path="docs/examples/stateless/src"/>
-	<classpathentry kind="src" path="docs/examples/topic/src"/>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
+	<classpathentry kind="src" path="output/gen-parsers"/>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src"/>
 	<classpathentry kind="src" path="tests/etc/ide"/>
 	<classpathentry excluding="ide/" kind="src" path="tests/etc"/>
@@ -26,7 +12,6 @@
 	<classpathentry kind="lib" path="thirdparty/jboss/profiler/jvmti/lib/jboss-profiler-jvmti.jar"/>
 	<classpathentry kind="lib" path="thirdparty/hsqldb/lib/hsqldb.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-logging/lib/commons-logging.jar"/>
-	<classpathentry kind="lib" path="thirdparty/jboss/serialization/lib/jboss-serialization.jar"/>
 	<classpathentry kind="lib" path="thirdparty/sun-javacc/lib/javacc.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-xerces/lib/xercesImpl.jar"/>
 	<classpathentry kind="lib" path="thirdparty/dom4j/lib/dom4j.jar"/>
@@ -63,7 +48,6 @@
 	<classpathentry kind="lib" path="thirdparty/jboss/integration/lib/jboss-transaction-spi.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/jboss-jaspi-api/lib/jboss-jaspi-api.jar"/>
 	<classpathentry kind="lib" path="thirdparty/slf4j/api/lib/slf4j-api-1.4.3.jar"/>
-	<classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>
@@ -76,6 +60,5 @@
 	<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12.jar"/>
 	<classpathentry kind="lib" path="src/etc/server/default/config"/>
 	<classpathentry kind="lib" path="src/etc/server/default/deploy"/>
-	<classpathentry kind="lib" path="tests/lib/jdbc-drivers/mysql-connector-java-5.1.5-bin.jar"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>

Modified: branches/trunk_tmp_aio/.project
===================================================================
--- branches/trunk_tmp_aio/.project	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/.project	2008-04-10 01:27:55 UTC (rev 4026)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <projectDescription>
-	<name>jboss-messaging</name>
+	<name>trunk</name>
 	<comment></comment>
 	<projects>
 	</projects>

Modified: branches/trunk_tmp_aio/build-messaging.xml
===================================================================
--- branches/trunk_tmp_aio/build-messaging.xml	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/build-messaging.xml	2008-04-10 01:27:55 UTC (rev 4026)
@@ -659,7 +659,7 @@
 	<property name="native.include" value="${native.src}/src"/>
 	
 	<target name="native-header" depends="compile">
-            <javah class="org.jboss.messaging.core.asyncio.impl.JlibAIO" classpathref="compilation.classpath" destdir="${native.include}">
+            <javah class="org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl" classpathref="compilation.classpath" destdir="${native.include}">
             </javah>
     </target>
 


Property changes on: branches/trunk_tmp_aio/native/src
___________________________________________________________________
Name: svn:ignore
   - Makefile
Makefile.in
.deps
.libs

   + Makefile
Makefile.in
.deps



Property changes on: branches/trunk_tmp_aio/native/src/.libs
___________________________________________________________________
Name: svn:ignore
   + *.lai
*.0


Added: branches/trunk_tmp_aio/native/src/.libs/libJBMLibAIO.so
===================================================================
--- branches/trunk_tmp_aio/native/src/.libs/libJBMLibAIO.so	                        (rev 0)
+++ branches/trunk_tmp_aio/native/src/.libs/libJBMLibAIO.so	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1 @@
+link libJBMLibAIO.so.0.0.0
\ No newline at end of file


Property changes on: branches/trunk_tmp_aio/native/src/.libs/libJBMLibAIO.so
___________________________________________________________________
Name: svn:special
   + *

Modified: branches/trunk_tmp_aio/native/src/AsyncFile.cpp
===================================================================
--- branches/trunk_tmp_aio/native/src/AsyncFile.cpp	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/native/src/AsyncFile.cpp	2008-04-10 01:27:55 UTC (rev 4026)
@@ -114,7 +114,7 @@
 	
 	while (pollerRunning)
 	{
-		int result = io_getevents(this->aioContext, 1, maxIO, events, &oneSecond);
+		int result = io_getevents(this->aioContext, 1, maxIO, events, 0);
 		
 #ifdef DEBUG
 		fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
@@ -134,39 +134,40 @@
 			
 			struct iocb * iocbp = events[i].obj;
 	
-			CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data;
-			
-			long result = events[i].res;
-			if (result < 0)
+			if (iocbp->data == (void *) -1)
 			{
-				std::string strerror = io_error(result);
-				adapter->onError(threadContext, result, strerror);
+				pollerRunning = 0;
+//				controller->log(threadContext, 2, "Received poller request to stop");
 			}
 			else
 			{
-				adapter->completeBlock(threadContext);
-				adapter->deleteRef(threadContext);
+				CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data;
+				
+				long result = events[i].res;
+				if (result < 0)
+				{
+					std::string strerror = io_error(result);
+					adapter->onError(threadContext, result, strerror);
+				}
+				else
+				{
+					adapter->completeBlock(threadContext);
+					adapter->deleteRef(threadContext);
+				}
 			}
 			
 			delete iocbp;
 		}
 	}
 	
-	controller->log(threadContext, 2, "Poller finished execution");
+//	controller->log(threadContext, 2, "Poller finished execution");
 	
 }
 
 
-void AsyncFile::preAllocate(THREAD_CONTEXT threadContext, int blocks, size_t size)
+void AsyncFile::preAllocate(THREAD_CONTEXT , off_t position, int blocks, size_t size, int fillChar)
 {
-	size_t currentSize = lseek (fileHandle, 0, SEEK_END);
-	
-	if (currentSize >= blocks * size)
-	{
-		controller->log(threadContext,2,"File being reused");
-		return;
-	}
-	
+
 	if (size % ALIGNMENT != 0)
 	{
 		throw AIOException (101, "You can only pre allocate files in multiples of 512");
@@ -178,10 +179,10 @@
 		throw AIOException(10, "Error on posix_memalign");
 	}
 	
-	memset(preAllocBuffer, 0, size);
+	memset(preAllocBuffer, fillChar, size);
 	
 	
-	if (::lseek (fileHandle, 0, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
 	
 	for (int i=0; i<blocks; i++)
 	{
@@ -191,7 +192,7 @@
 		}
 	}
 	
-	if (::lseek (fileHandle, 0, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
 	
 	free (preAllocBuffer);
 }
@@ -285,7 +286,22 @@
 void AsyncFile::stopPoller(THREAD_CONTEXT threadContext)
 {
 	pollerRunning = 0;
-	controller->log(threadContext, 2,"Setting poller to stop");
+	
+	
+	struct iocb * iocb = new struct iocb();
+	::io_prep_pwrite(iocb, fileHandle, 0, 0, 0);
+	iocb->data = (void *) -1;
+
+	int result = 0;
+	
+	while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+	{
+		fprintf(stderr, "Couldn't send request to stop poller, trying again");
+		controller->log(threadContext, 1, "Couldn't send request to stop poller, trying again");
+		::usleep(WAIT_FOR_SPOT);
+	}
+	
+//	controller->log(threadContext, 2,"Sent data to stop");
 	// It will wait the Poller to gives up its lock
 	LockClass lock(&pollerMutex);
 }

Modified: branches/trunk_tmp_aio/native/src/AsyncFile.h
===================================================================
--- branches/trunk_tmp_aio/native/src/AsyncFile.h	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/native/src/AsyncFile.h	2008-04-10 01:27:55 UTC (rev 4026)
@@ -79,9 +79,8 @@
 	
 	// Finishes the polling thread (if any) and return
 	void stopPoller(THREAD_CONTEXT threadContext);
+	void preAllocate(THREAD_CONTEXT threadContext, off_t position, int blocks, size_t size, int fillChar);
 	
-	void preAllocate(THREAD_CONTEXT threadContext, int numberOfBlocks, size_t size);
-	
 	void pollEvents(THREAD_CONTEXT threadContext);
 	
 };

Modified: branches/trunk_tmp_aio/native/src/LibAIOController.cpp
===================================================================
--- branches/trunk_tmp_aio/native/src/LibAIOController.cpp	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/native/src/LibAIOController.cpp	2008-04-10 01:27:55 UTC (rev 4026)
@@ -25,7 +25,7 @@
 #include <string>
 
 
-#include "org_jboss_messaging_core_asyncio_impl_JlibAIO.h"
+#include "org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h"
 
 
 #include "JavaUtilities.h"
@@ -41,7 +41,7 @@
  * Method:    init
  * Signature: (Ljava/lang/String;Ljava/lang/Class;)J
  */
-JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_init
+JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_init
   (JNIEnv * env, jclass, jstring jstrFileName, jclass callbackClass, jint maxIO, jobject logger)
 {
 	try
@@ -64,7 +64,7 @@
         
         controller->logger = env->NewGlobalRef(logger);
         
-        controller->log(env,4, "Controller initialized");
+//        controller->log(env,4, "Controller initialized");
 		
 	    return (jlong)controller;
 	}
@@ -74,13 +74,26 @@
 	}
 }
 
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_read
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_read
   (JNIEnv *env, jclass, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
 {
 	try 
 	{
 		AIOController * controller = (AIOController *) controllerAddress;
 		void * buffer = env->GetDirectBufferAddress(jbuffer);
+		
+		if (buffer == 0)
+		{
+			throwException(env, "java/lang/IllegalStateException", "Invalid Direct Buffer used");
+			return;
+		}
+		
+		if (((long)buffer) % 512)
+		{
+			throwException(env, "java/lang/IllegalStateException", "Buffer not aligned for use with DMA");
+			return;
+		}
+		
 		CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback));
 		
 		controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
@@ -91,13 +104,19 @@
 	}
 }
 
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_write
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
   (JNIEnv *env, jclass, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
 {
 	try 
 	{
 		AIOController * controller = (AIOController *) controllerAddress;
 		void * buffer = env->GetDirectBufferAddress(jbuffer);
+		if (buffer == 0)
+		{
+			throwException(env, "java/lang/IllegalStateException", "Invalid Direct Buffer used");
+			return;
+		}
+		
 		CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback));
 		
 		controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
@@ -110,7 +129,7 @@
 
 
 
-JNIEXPORT void Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_internalPollEvents
+JNIEXPORT void Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_internalPollEvents
   (JNIEnv *env, jclass, jlong controllerAddress)
 {
 	try
@@ -124,7 +143,7 @@
 	}
 }
 
-JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_newBuffer
+JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_newBuffer
   (JNIEnv * env, jobject, jlong size)
 {
 	try
@@ -150,7 +169,7 @@
 	}
 }
 
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_destroyBuffer
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_destroyBuffer
   (JNIEnv * env, jobject, jobject jbuffer)
 {
 	void *  buffer = env->GetDirectBufferAddress(jbuffer);
@@ -159,7 +178,7 @@
 
 
 
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_closeInternal
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_closeInternal
   (JNIEnv *env, jclass, jlong controllerAddress)
 {
 	try
@@ -175,13 +194,17 @@
 	}
 }
 
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_preAllocate
-  (JNIEnv * env, jclass, jlong controllerAddress, jint blocks, jlong size)
+
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_fill
+  (JNIEnv * env, jclass, jlong controllerAddress, jlong position, jint blocks, jlong size, jbyte fillChar)
 {
 	try
 	{
 		AIOController * controller = (AIOController *) controllerAddress;
-		controller->fileOutput.preAllocate(env, blocks, size);
+		
+		controller->fileOutput.preAllocate(env, position, blocks, size, fillChar);
+		
+		//controller->fileOutput.preAllocate(env, blocks, size);
 	}
 	catch (AIOException& e)
 	{

Modified: branches/trunk_tmp_aio/native/src/Makefile.am
===================================================================
--- branches/trunk_tmp_aio/native/src/Makefile.am	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/native/src/Makefile.am	2008-04-10 01:27:55 UTC (rev 4026)
@@ -4,4 +4,4 @@
 libJBMLibAIO_la_SOURCES = AIOController.cpp AIOController.h AIOException.h AsyncFile.cpp \
                      AsyncFile.h CallbackAdapter.h JAIODatatypes.h JavaUtilities.cpp \
                      JavaUtilities.h JNICallbackAdapter.cpp JNICallbackAdapter.h LibAIOController.cpp \
-                     LockClass.h org_jboss_messaging_core_persistence_impl_libaio_jni_impl_JlibAIO
+                     LockClass.h org_jboss_messaging_core_persistence_impl_libaio_jni_impl_AsynchronousFileImple.h

Added: branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
===================================================================
--- branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h	                        (rev 0)
+++ branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,79 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl */
+
+#ifndef _Included_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+#define _Included_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+#ifdef __cplusplus
+extern "C" {
+#endif
+/* Inaccessible static: log */
+/* Inaccessible static: loaded */
+/*
+ * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    init
+ * Signature: (Ljava/lang/String;Ljava/lang/Class;ILorg/jboss/messaging/core/logging/Logger;)J
+ */
+JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_init
+  (JNIEnv *, jclass, jstring, jclass, jint, jobject);
+
+/*
+ * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    write
+ * Signature: (JJJLjava/nio/ByteBuffer;Lorg/jboss/messaging/core/asyncio/AIOCallback;)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
+  (JNIEnv *, jclass, jlong, jlong, jlong, jobject, jobject);
+
+/*
+ * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    read
+ * Signature: (JJJLjava/nio/ByteBuffer;Lorg/jboss/messaging/core/asyncio/AIOCallback;)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_read
+  (JNIEnv *, jclass, jlong, jlong, jlong, jobject, jobject);
+
+/*
+ * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    fill
+ * Signature: (JJIJB)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_fill
+  (JNIEnv *, jclass, jlong, jlong, jint, jlong, jbyte);
+
+/*
+ * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    closeInternal
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_closeInternal
+  (JNIEnv *, jclass, jlong);
+
+/*
+ * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    internalPollEvents
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_internalPollEvents
+  (JNIEnv *, jclass, jlong);
+
+/*
+ * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    destroyBuffer
+ * Signature: (Ljava/nio/ByteBuffer;)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_destroyBuffer
+  (JNIEnv *, jobject, jobject);
+
+/*
+ * Class:     org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method:    newBuffer
+ * Signature: (J)Ljava/nio/ByteBuffer;
+ */
+JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_newBuffer
+  (JNIEnv *, jobject, jlong);
+
+#ifdef __cplusplus
+}
+#endif
+#endif

Deleted: branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_JlibAIO.h
===================================================================
--- branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_JlibAIO.h	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_JlibAIO.h	2008-04-10 01:27:55 UTC (rev 4026)
@@ -1,79 +0,0 @@
-/* DO NOT EDIT THIS FILE - it is machine generated */
-#include <jni.h>
-/* Header for class org_jboss_messaging_core_asyncio_impl_JlibAIO */
-
-#ifndef _Included_org_jboss_messaging_core_asyncio_impl_JlibAIO
-#define _Included_org_jboss_messaging_core_asyncio_impl_JlibAIO
-#ifdef __cplusplus
-extern "C" {
-#endif
-/* Inaccessible static: log */
-/* Inaccessible static: loaded */
-/*
- * Class:     org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method:    init
- * Signature: (Ljava/lang/String;Ljava/lang/Class;ILorg/jboss/messaging/core/logging/Logger;)J
- */
-JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_init
-  (JNIEnv *, jclass, jstring, jclass, jint, jobject);
-
-/*
- * Class:     org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method:    write
- * Signature: (JJJLjava/nio/ByteBuffer;Lorg/jboss/messaging/core/asyncio/AIOCallback;)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_write
-  (JNIEnv *, jclass, jlong, jlong, jlong, jobject, jobject);
-
-/*
- * Class:     org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method:    read
- * Signature: (JJJLjava/nio/ByteBuffer;Lorg/jboss/messaging/core/asyncio/AIOCallback;)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_read
-  (JNIEnv *, jclass, jlong, jlong, jlong, jobject, jobject);
-
-/*
- * Class:     org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method:    preAllocate
- * Signature: (JIJ)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_preAllocate
-  (JNIEnv *, jclass, jlong, jint, jlong);
-
-/*
- * Class:     org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method:    closeInternal
- * Signature: (J)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_closeInternal
-  (JNIEnv *, jclass, jlong);
-
-/*
- * Class:     org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method:    internalPollEvents
- * Signature: (J)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_internalPollEvents
-  (JNIEnv *, jclass, jlong);
-
-/*
- * Class:     org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method:    destroyBuffer
- * Signature: (Ljava/nio/ByteBuffer;)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_destroyBuffer
-  (JNIEnv *, jobject, jobject);
-
-/*
- * Class:     org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method:    newBuffer
- * Signature: (J)Ljava/nio/ByteBuffer;
- */
-JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_newBuffer
-  (JNIEnv *, jobject, jlong);
-
-#ifdef __cplusplus
-}
-#endif
-#endif

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -37,10 +37,12 @@
    
    void read(long position, long size, ByteBuffer directByteBuffer,  AIOCallback aioPackage);
    
-   void preAllocate(int blocks, long size);
-
+   void fill(long position, int blocks, long size, byte fillChar);
+   
    ByteBuffer newBuffer(long size);
    
    void destroyBuffer(ByteBuffer buffer);
+   
+   int getBlockSize();
 
 }

Copied: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java (from rev 3898, trunk/src/main/org/jboss/messaging/core/asyncio/impl/JlibAIO.java)
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	                        (rev 0)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,170 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.asyncio.impl;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.AsynchronousFile;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * 
+ * @author clebert.suconic at jboss.com
+ * Warning: Case you refactor the name or the package of this class
+ *          You need to make sure you also rename the C++ native calls
+ */
+public class AsynchronousFileImpl implements AsynchronousFile
+{
+    private static Logger log = Logger.getLogger(AsynchronousFileImpl.class);
+    private boolean opened = false;
+    private String fileName;
+    private Thread poller;
+    private static boolean loaded = true;
+    
+    /**
+     *  Warning: Beware of the C++ pointer! It will bite you! :-)
+     */ 
+    private long handler;
+    
+    static
+    {
+        try
+        {
+            log.info("JLibAIO being loaded");
+            System.loadLibrary("JBMLibAIO");
+        }
+        catch (Throwable e)
+        {
+            log.error(e.getLocalizedMessage(), e);
+            loaded = false;
+        }
+    }
+    
+    public static boolean isLoaded()
+    {
+       return loaded;
+    }
+    
+    
+
+    
+    public void open(String fileName, int maxIO)
+    {
+        opened = true;
+        this.fileName=fileName;
+        handler = init (fileName, AIOCallback.class, maxIO, log);
+        startPoller();
+    }
+    
+    class PollerThread extends Thread
+    {
+        PollerThread ()
+        {
+            super("NativePoller for " + fileName);
+        }
+        public void run()
+        {
+            pollEvents();
+        }
+    }
+    
+    private synchronized void  startPoller()
+    {
+        checkOpened();
+        poller = new PollerThread(); 
+        poller.start();
+    }
+    
+    public synchronized void close()
+    {
+        checkOpened();
+        closeInternal(handler);
+        opened = false;
+        handler = 0;
+    }
+    
+    
+    public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+    {
+        checkOpened();
+        write (handler, position, size, directByteBuffer, aioPackage);
+        
+    }
+
+    public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+    {
+        checkOpened();
+        read (handler, position, size, directByteBuffer, aioPackage);
+        
+    }
+
+    public long size()
+    {
+        checkOpened();
+        // TODO: wire this method to ftell
+        return 0;
+    }
+
+    public void fill(long position, int blocks, long size, byte fillChar)
+    {
+       checkOpened();
+       fill(handler, position, blocks, size, fillChar);
+    }
+    
+    public int getBlockSize()
+    {
+       return 512;
+    }
+    
+
+    private void pollEvents()
+    {
+        if (!opened)
+        {
+           return;
+        }
+        internalPollEvents(handler);
+    }
+    
+    private void checkOpened() 
+    {
+        if (!opened)
+        {
+            throw new RuntimeException("File is not opened");
+        }
+    }
+    
+    /** 
+     * I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
+     */
+    @SuppressWarnings("unchecked")
+    private static native long init(String fileName, Class aioPackageClazz, int maxIO, Logger logger);
+    
+    private static native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
+
+    private static native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
+    
+    private static native void fill(long handle, long position, int blocks, long size, byte fillChar);
+
+    private static native void closeInternal(long handler);
+    
+    /** Poll asynchrounous events from internal queues */
+    private static native void internalPollEvents(long handler);
+
+    // Should we make this method static?
+    public native void destroyBuffer(ByteBuffer buffer);
+
+    // Should we make this method static?
+    public native ByteBuffer newBuffer(long size);
+
+
+
+
+    
+}

Deleted: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/JlibAIO.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/JlibAIO.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/JlibAIO.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -1,158 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.messaging.core.asyncio.impl;
-
-import java.nio.ByteBuffer;
-
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.AsynchronousFile;
-import org.jboss.messaging.core.logging.Logger;
-
-/**
- * 
- * @author clebert.suconic at jboss.com
- * Warning: Case you refactor the name or the package of this class
- *          You need to make sure you also rename the C++ native calls
- */
-public class JlibAIO implements AsynchronousFile
-{
-    private static Logger log = Logger.getLogger(JlibAIO.class);
-    private boolean opened = false;
-    private String fileName;
-    private Thread poller;
-    private static boolean loaded = true;
-    
-    /**
-     *  Warning: Beware of the C++ pointer! It will bite you! :-)
-     */ 
-    private long handler;
-    
-    static
-    {
-        try
-        {
-            log.info("JLibAIO being loaded");
-            System.loadLibrary("JBMLibAIO");
-        }
-        catch (Throwable e)
-        {
-            log.error(e.getLocalizedMessage(), e);
-            loaded = false;
-        }
-    }
-    
-    public static boolean isLoaded()
-    {
-       return loaded;
-    }
-    
-    
-
-    
-    public void open(String fileName, int maxIO)
-    {
-        opened = true;
-        this.fileName=fileName;
-        handler = init (fileName, AIOCallback.class, maxIO, log);
-        startPoller();
-    }
-    
-    class PollerThread extends Thread
-    {
-        PollerThread ()
-        {
-            super("NativePoller for " + fileName);
-        }
-        public void run()
-        {
-            pollEvents();
-        }
-    }
-    
-    private void startPoller()
-    {
-        checkOpened();
-        poller = new PollerThread(); 
-        poller.start();
-    }
-    
-    public void close()
-    {
-        checkOpened();
-        opened = false;
-        closeInternal(handler);
-        handler = 0;
-    }
-    
-    
-    public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
-    {
-        checkOpened();
-        write (handler, position, size, directByteBuffer, aioPackage);
-        
-    }
-
-    public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
-    {
-        checkOpened();
-        read (handler, position, size, directByteBuffer, aioPackage);
-        
-    }
-
-    public long size()
-    {
-        checkOpened();
-        // TODO: wire this method to ftell
-        return 0;
-    }
-
-    public void preAllocate(int blocks, long size)
-    {
-        checkOpened();
-        preAllocate(handler, blocks, size);
-    }
-
-    private void pollEvents()
-    {
-        checkOpened();
-        internalPollEvents(handler);
-    }
-    
-    private void checkOpened() 
-    {
-        if (!opened)
-        {
-            throw new RuntimeException("File is not opened");
-        }
-    }
-    
-    /** 
-     * I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
-     */
-    @SuppressWarnings("unchecked")
-    private static native long init(String fileName, Class aioPackageClazz, int maxIO, Logger logger);
-    
-    private static native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
-
-    private static native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
-    
-    private static native void preAllocate(long handle, int blocks, long size);
-
-    private static native void closeInternal(long handler);
-    
-    /** Poll asynchrounous events from internal queues */
-    private static native void internalPollEvents(long handler);
-
-    // Should we make this method static?
-    public native void destroyBuffer(ByteBuffer buffer);
-
-    // Should we make this method static?
-    public native ByteBuffer newBuffer(long size);
-   
-    
-}

Added: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/IOCallback.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/IOCallback.java	                        (rev 0)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/IOCallback.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,15 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+
+public interface IOCallback extends AIOCallback
+{
+
+}


Property changes on: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/IOCallback.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/Journal.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/Journal.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -36,7 +36,10 @@
 {
 	// Non transactional operations
 	
-	void appendAddRecord(long id, byte[] record) throws Exception;
+   // TODO: Implement callbacks
+   void appendAddRecord(long id, byte[] record, IOCallback callback) throws Exception;
+
+   void appendAddRecord(long id, byte[] record) throws Exception;
 	
 	void appendUpdateRecord(long id, byte[] record) throws Exception;
 	

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -37,17 +37,27 @@
 	 */
 	void open() throws Exception;
 	
+	int getAlignment() throws Exception;
+	
 	String getFileName();
 	
 	void fill(int position, int size, byte fillCharacter) throws Exception;
 	
 	void delete() throws Exception;
 
-	int write(ByteBuffer bytes, boolean sync) throws Exception;
-	   
-	int read(ByteBuffer bytes) throws Exception;
-	
+   int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
+   
+   int write(ByteBuffer bytes, boolean sync) throws Exception;
+   
+   int read(ByteBuffer bytes, IOCallback callback) throws Exception;
+   
+   int read(ByteBuffer bytes) throws Exception;
+   
 	void position(int pos) throws Exception;
 	
 	void close() throws Exception;
+	
+	ByteBuffer newBuffer(int size);
+	
+	ByteBuffer wrapBuffer(byte bytes[]);
 }

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -32,7 +32,7 @@
  */
 public interface SequentialFileFactory
 {
-	SequentialFile createSequentialFile(String fileName, boolean sync) throws Exception;
+	SequentialFile createSequentialFile(String fileName, boolean sync, boolean control) throws Exception;
 	
 	List<String> listFiles(String extension) throws Exception;
 }

Added: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	                        (rev 0)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal.impl;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.messaging.core.asyncio.AsynchronousFile;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.SequentialFile;
+
+public class AIOSequentialFile implements SequentialFile
+{
+
+   String journalDir;
+   String fileName;
+   
+   AsynchronousFile aioFile;
+   
+   AtomicLong position = new AtomicLong(0);
+   
+   public AIOSequentialFile(String journalDir, String fileName) throws Exception
+   {
+      this.journalDir = journalDir;
+      this.fileName = fileName;
+   }
+   
+   public int getAlignment() throws Exception
+   {
+      checkOpened();
+      return aioFile.getBlockSize();
+   }
+  
+   
+   public void close() throws Exception
+   {
+      //Thread.sleep(1000);
+      checkOpened();
+      aioFile.close();
+      aioFile = null;
+      
+   }
+
+   public void delete() throws Exception
+   {
+      if (aioFile != null)
+      {
+         aioFile.close();
+         aioFile = null;
+      }
+      
+      File file = new File(journalDir + "/" +  fileName);
+      file.delete();
+   }
+
+   public void fill(int position, int size, byte fillCharacter)
+         throws Exception
+   {
+      checkOpened();
+      
+      int blockSize = aioFile.getBlockSize();
+      
+      if (size % (10*1024*1024) == 0)
+      {
+         blockSize = 10*1024*1024;
+      }
+      else
+      if (size % (1024*1024) == 0)
+      {
+         blockSize = 1024*1024;
+      }
+      else
+      if (size % (10*1024) == 0)
+      {
+         blockSize = 10*1024;
+      }
+      else
+      {
+         blockSize = aioFile.getBlockSize();
+      }
+      
+      int blocks = size / blockSize;
+      if (size % blockSize != 0)
+      {
+         blocks++;
+      }
+      
+      if (position % aioFile.getBlockSize() != 0)
+      {
+         position = ((position / aioFile.getBlockSize()) + 1) * aioFile.getBlockSize();
+      }
+      //System.out.println("filling " + blocks + " blocks with blockSize=" + blockSize + " on file=" + this.getFileName());
+      aioFile.fill((long)position, blocks, blockSize, (byte)fillCharacter);
+      
+   }
+
+   public String getFileName()
+   {
+      return fileName;
+   }
+
+   public void open() throws Exception
+   {
+      aioFile = new AsynchronousFileImpl();
+      aioFile.open(journalDir + "/" + fileName, 500);
+      position.set(0);
+      
+   }
+
+   public void position(int pos) throws Exception
+   {
+      position.set(pos);
+      
+   }
+
+   public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+   {
+      int bytesToRead = bytes.limit();
+      long positionToRead = position.getAndAdd(bytesToRead);
+      
+      bytes.rewind();
+      aioFile.read(positionToRead, bytesToRead, bytes, callback);
+      
+      return bytesToRead;
+   }
+
+   public int read(ByteBuffer bytes) throws Exception
+   {
+      WaitCompletion waitCompletion = new WaitCompletion();
+      int bytesRead = read (bytes, waitCompletion);
+      
+      waitCompletion.waitLatch();
+      
+      if (waitCompletion.errorMessage != null)
+      {
+         throw new MessagingException(waitCompletion.errorCode, waitCompletion.errorMessage);
+      }
+      
+      return bytesRead;
+   }
+
+   public int write(ByteBuffer bytes, boolean sync, IOCallback callback)
+         throws Exception
+   {
+      int bytesToWrite = bytes.limit();
+      long positionToWrite = position.getAndAdd(bytesToWrite);
+      
+      aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
+      return bytesToWrite;
+   }
+
+   public int write(ByteBuffer bytes, boolean sync) throws Exception
+   {
+      WaitCompletion waitCompletion = new WaitCompletion();
+      int bytesWritten = write (bytes, sync, waitCompletion);
+      
+      waitCompletion.waitLatch();
+      
+      if (waitCompletion.errorMessage != null)
+      {
+         throw new MessagingException(waitCompletion.errorCode, waitCompletion.errorMessage);
+      }
+      
+      return bytesWritten;
+   }
+   
+   private void checkOpened() throws Exception
+   {
+      if (aioFile == null)
+      {
+         throw new IllegalStateException ("File not opened");
+      }
+   }
+   
+   class WaitCompletion implements IOCallback
+   {
+
+      CountDownLatch latch = new CountDownLatch(1);
+      
+      String errorMessage;
+      int errorCode = 0;
+      
+      public void done()
+      {
+         latch.countDown();
+      }
+
+      public void onError(int errorCode, String errorMessage)
+      {
+         System.out.println("OK Error!");
+         this.errorCode = errorCode;
+         this.errorMessage = errorMessage;
+         
+         latch.countDown();
+         
+      }
+      
+      public void waitLatch() throws Exception
+      {
+         latch.await();
+      }
+      
+   }
+
+   public ByteBuffer newBuffer(int size)
+   {
+      if (size % aioFile.getBlockSize() != 0)
+      {
+         size = ((size / aioFile.getBlockSize()) + 1) * aioFile.getBlockSize();
+      }
+      return ByteBuffer.allocateDirect(size);
+   }
+
+   public ByteBuffer wrapBuffer(byte[] bytes)
+   {
+      ByteBuffer newbuffer = newBuffer(bytes.length);
+      newbuffer.put(bytes);
+      return newbuffer;
+   };
+
+}


Property changes on: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Added: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	                        (rev 0)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+
+public class AIOSequentialFileFactory extends AbstractSequentialFactory
+{
+
+   public AIOSequentialFileFactory(String journalDir)
+   {
+      super(journalDir);
+   }
+
+   public SequentialFile createSequentialFile(String fileName, boolean sync, boolean control)
+         throws Exception
+   {
+      if (control)
+      {
+         return new NIOSequentialFile(journalDir, fileName, sync);
+      }
+      else
+      {
+         return new AIOSequentialFile(journalDir, fileName);
+      }
+   }
+
+}


Property changes on: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Added: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	                        (rev 0)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+
+public abstract class AbstractSequentialFactory implements SequentialFileFactory
+{
+   protected final String journalDir;
+
+   public AbstractSequentialFactory(final String journalDir)
+   {
+      this.journalDir = journalDir;
+   }
+   
+   public List<String> listFiles(final String extension) throws Exception
+   {
+      File dir = new File(journalDir);
+      
+      FilenameFilter fnf = new FilenameFilter()
+      {
+         public boolean accept(File file, String name)
+         {
+            return name.endsWith("." + extension);
+         }
+      };
+      
+      String[] fileNames = dir.list(fnf);
+      
+      if (fileNames == null)
+      {
+         throw new IOException("Failed to list: " + journalDir);
+      }
+      
+      return Arrays.asList(fileNames);
+   }
+
+}


Property changes on: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -39,6 +39,7 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;
 import org.jboss.messaging.core.journal.RecordInfo;
 import org.jboss.messaging.core.journal.SequentialFile;
@@ -86,8 +87,13 @@
 			
 	public static final byte ADD_RECORD_TX = 14;
 	
+	public static final int SIZE_ADD_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
+
+	
 	public static final byte UPDATE_RECORD_TX = 15;
 	
+	public static final int  SIZE_UPDATE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + SIZE_BYTE;  // Add the size of Bytes on this
+	
 	public static final byte DELETE_RECORD_TX = 16;
 	
 	public static final byte PREPARE_RECORD = 17;
@@ -192,11 +198,36 @@
 	
 	// Journal implementation ----------------------------------------------------------------
 	
-	public ByteBuffer allocateBuffer(final int size) throws Exception
+	/*public ByteBuffer allocateBuffer(final int size) throws Exception
 	{
 		return ByteBuffer.allocateDirect(size);
-	}
+	}*/
 	
+   public void appendAddRecord(long id, byte[] record, IOCallback callback)
+   throws Exception
+   {
+      if (state != STATE_LOADED)
+      {
+         throw new IllegalStateException("Journal must be loaded first");
+      }
+      
+      int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+      
+      ByteBuffer bb = currentFile.getFile().newBuffer(size); 
+
+      bb.put(ADD_RECORD);     
+      bb.putLong(id);      
+      bb.putInt(record.length);     
+      bb.put(record);      
+      bb.put(DONE);
+      bb.rewind();
+      
+      appendRecord(bb, true, callback);
+      
+      posFilesMap.put(id, new PosFiles(currentFile));
+   }
+	
+	
 	public void appendAddRecord(final long id, final byte[] record) throws Exception
 	{
 		if (state != STATE_LOADED)
@@ -206,14 +237,14 @@
 		
 		int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
 		
-		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-		
+		ByteBuffer bb = currentFile.getFile().newBuffer(size); 
+
 		bb.put(ADD_RECORD);		
 		bb.putLong(id);		
 		bb.putInt(record.length);		
 		bb.put(record);		
-		bb.put(DONE);			
-		bb.flip();
+		bb.put(DONE);
+		bb.rewind();
 		
 		appendRecord(bb, true);
 		
@@ -236,14 +267,14 @@
 			
 		int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
 		
-		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-		
+      ByteBuffer bb = currentFile.getFile().newBuffer(size); 
+
 		bb.put(UPDATE_RECORD);		
 		bb.putLong(id);		
 		bb.putInt(record.length);		
 		bb.put(record);		
 		bb.put(DONE);		
-		bb.flip();
+		bb.rewind();
 		
 		appendRecord(bb, true);		
 		
@@ -268,12 +299,12 @@
 		
 		int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
 		
-		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-		
+      ByteBuffer bb = currentFile.getFile().newBuffer(size); 
+
 		bb.put(DELETE_RECORD);		
 		bb.putLong(id);		
 		bb.put(DONE);		
-		bb.flip();
+		bb.rewind();
 								
 		appendRecord(bb, true);							
 	}		
@@ -291,17 +322,17 @@
 			throw new IllegalStateException("Journal must be loaded first");
 		}
 		
-		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+		int size = SIZE_ADD_RECORD_TX + record.length;
+		
+      ByteBuffer bb = currentFile.getFile().newBuffer(size); 
 
-		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
 		bb.put(ADD_RECORD_TX);
 		bb.putLong(txID);
 		bb.putLong(id);
 		bb.putInt(record.length);
 		bb.put(record);
 		bb.put(DONE);		
-		bb.flip();
+		bb.rewind();
 		
 		appendRecord(bb, false);
 		
@@ -318,17 +349,17 @@
 			throw new IllegalStateException("Journal must be loaded first");
 		}
 		
-		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+		int size = SIZE_UPDATE_RECORD_TX + record.length; 
 		
-		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-		
+      ByteBuffer bb = currentFile.getFile().newBuffer(size); 
+
 		bb.put(UPDATE_RECORD_TX);		
 		bb.putLong(txID);		
 		bb.putLong(id);		
 		bb.putInt(record.length);		
-		bb.put(record);		
+		bb.put(record);
 		bb.put(DONE);		
-		bb.flip();
+		bb.rewind();
 		
 		appendRecord(bb, false);
 		
@@ -346,13 +377,13 @@
 		
 		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
 		
-		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-		
+      ByteBuffer bb = currentFile.getFile().newBuffer(size); 
+
 		bb.put(DELETE_RECORD_TX);		
 		bb.putLong(txID);		
 		bb.putLong(id);		
 		bb.put(DONE);			
-		bb.flip();
+		bb.rewind();
 								
 		appendRecord(bb, false);		
 		
@@ -377,12 +408,12 @@
 		
 		int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
 		
-		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-		
+      ByteBuffer bb = currentFile.getFile().newBuffer(size); 
+
 		bb.put(PREPARE_RECORD);		
-		bb.putLong(txID);		
+		bb.putLong(txID);
 		bb.put(DONE);				
-		bb.flip();
+		bb.rewind();
 		
 		appendRecord(bb, true);		
 		
@@ -405,12 +436,12 @@
 				
 		int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
 		
-		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-		
+      ByteBuffer bb = currentFile.getFile().newBuffer(size); 
+
 		bb.put(COMMIT_RECORD);		
 		bb.putLong(txID);		
 		bb.put(DONE);				
-		bb.flip();
+		bb.rewind();
 		
 		appendRecord(bb, true);	
 		
@@ -433,12 +464,12 @@
 				
 		int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
 		
-		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-		
+      ByteBuffer bb = currentFile.getFile().newBuffer(size); 
+
 		bb.put(ROLLBACK_RECORD);		
-		bb.putLong(txID);		
+		bb.putLong(txID);
 		bb.put(DONE);			
-		bb.flip();
+		bb.rewind();
 								
 		appendRecord(bb, true);			
 		
@@ -465,17 +496,20 @@
 				
 		for (String fileName: fileNames)
 		{
-			SequentialFile file = fileFactory.createSequentialFile(fileName, sync);
+			SequentialFile file = fileFactory.createSequentialFile(fileName, sync, false);
 			
 			file.open();
 			
-			ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
+			ByteBuffer bb = file.newBuffer(SIZE_LONG);
 			
 			file.read(bb);
 			
-			bb.flip();
+			//bb.flip();
+			//bb.rewind();
 			
 			long orderingID = bb.getLong();
+			
+			log.info("file=" + file + " with orderingID=" + orderingID);
 						
 			orderedFiles.add(new JournalFileImpl(file, orderingID));
 			
@@ -503,10 +537,12 @@
 		
 		for (JournalFile file: orderedFiles)
 		{	
-			ByteBuffer bb = ByteBuffer.wrap(new byte[fileSize]);
-			
 			file.getFile().open();
 			
+			log.info("Loading file " + file.getFile().getFileName());
+			
+         ByteBuffer bb = file.getFile().newBuffer(fileSize);
+         
 			int bytesRead = file.getFile().read(bb);
 			
 			if (bytesRead != fileSize)
@@ -517,10 +553,11 @@
 						                          " expected " + fileSize + " : " + file.getFile().getFileName());
 			}
 			
-			bb.flip();
+//			bb.flip();
+//			bb.rewind();
 			
-			//First long is the ordering timestamp
-			bb.getLong();
+			//First long is the ordering timestamp, we just jump its position
+			bb.position(calculateBlockStart(SIZE_LONG, file.getFile().getAlignment()));
 			
 			boolean hasData = false;
 			
@@ -538,7 +575,7 @@
 						
 						int size = bb.getInt();						
 						byte[] record = new byte[size];						
-						bb.get(record);						
+						bb.get(record);
 						byte end = bb.get();
 						
 						if (end != DONE)
@@ -563,7 +600,7 @@
 						byte[] record = new byte[size];						
 						bb.get(record);						
 						byte end = bb.get();
-						
+
 						if (end != DONE)
 						{
 							repairFrom(pos, file);
@@ -590,7 +627,7 @@
 					case DELETE_RECORD:						
 					{
 						long id = bb.getLong();	
-						byte end = bb.get();
+                  byte end = bb.get();
 						
 						if (end != DONE)
 						{
@@ -616,10 +653,14 @@
 						long txID = bb.getLong();							
 						maxTransactionID = Math.max(maxTransactionID, txID);						
 						long id = bb.getLong();				
+
+						log.info("read AddRecordTX txID = " + txID + " , id=" + id);
+                  
+
 						int size = bb.getInt();						
 						byte[] record = new byte[size];						
 						bb.get(record);						
-						byte end = bb.get();
+                  byte end = bb.get();
 						
 						if (end != DONE)
 						{
@@ -657,11 +698,14 @@
 					{					
 						long txID = bb.getLong();	
 						maxTransactionID = Math.max(maxTransactionID, txID);						
-						long id = bb.getLong();					
+						long id = bb.getLong();
+						
+						log.info("read UpdateRecordTX txID = " + txID + " , id=" + id);
+						
 						int size = bb.getInt();						
 						byte[] record = new byte[size];						
 						bb.get(record);						
-						byte end = bb.get();
+                  byte end = bb.get();
 						
 						if (end != DONE)
 						{
@@ -700,7 +744,10 @@
 						long txID = bb.getLong();	
 						maxTransactionID = Math.max(maxTransactionID, txID);						
 						long id = bb.getLong();			
-						byte end = bb.get();
+                  
+						log.info("read DeleteRecordTX txID = " + txID + " , id=" + id);
+
+                  byte end = bb.get();
 						
 						if (end != DONE)
 						{
@@ -737,8 +784,11 @@
 					case PREPARE_RECORD:
 					{
 						long txID = bb.getLong();				
+
+						log.info("read Prepare txID=" + txID);
+                  
 						maxTransactionID = Math.max(maxTransactionID, txID);						
-						byte end = bb.get();
+                  byte end = bb.get();
 						
 						if (end != DONE)
 						{
@@ -772,8 +822,11 @@
 					case COMMIT_RECORD:
 					{
 						long txID = bb.getLong();	
+						
+						log.info("read Commit txID=" + txID);
+						
 						maxTransactionID = Math.max(maxTransactionID, txID);
-						byte end = bb.get();
+                  byte end = bb.get();
 						
 						if (end != DONE)
 						{
@@ -806,8 +859,11 @@
 					case ROLLBACK_RECORD:
 					{
 						long txID = bb.getLong();		
+
+                  log.info("read RollbacktxID=" + txID);
+                  
 						maxTransactionID = Math.max(maxTransactionID, txID);						
-						byte end = bb.get();
+                  byte end = bb.get();
 						
 						if (end != DONE)
 						{
@@ -857,6 +913,8 @@
 					}
 				}
 				
+				bb.position(calculateBlockStart(bb.position(), file.getFile().getAlignment()));
+				
 				if (recordType != FILL_CHARACTER)
 				{
 					lastDataPos = bb.position();
@@ -875,7 +933,7 @@
 				freeFiles.add(file);
 				
 				//Position it ready for writing
-				file.getFile().position(SIZE_LONG);
+				file.getFile().position(calculateBlockStart(SIZE_LONG, file.getFile().getAlignment()));
 			}								
 		}			
 		
@@ -971,6 +1029,8 @@
    		{
    			//File can be reclaimed or deleted
    			
+   		   log.info("Reclaiming file " + file);
+   		   
    			dataFiles.remove(file);
    			
    			//FIXME - size() involves a scan!!!
@@ -980,13 +1040,13 @@
       			
       			long newOrderingID = generateOrderingID();
       			
-      			ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
-      			
-      			bb.putLong(newOrderingID);
-      			
       			SequentialFile sf = file.getFile();
-      			
+
       			sf.open();
+
+               ByteBuffer bb = sf.newBuffer(SIZE_LONG); 
+               
+               bb.putLong(newOrderingID);
       			
       			//Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
       			
@@ -994,13 +1054,13 @@
       			//operation and can impact other IO operations on the disk
       			sf.fill(0, fileSize, FILL_CHARACTER);
       			
-      			sf.write(bb, true);
+      			int bytesWritten = sf.write(bb, true);
       			
       			JournalFile jf = new JournalFileImpl(sf, newOrderingID);
       			
-      			sf.position(SIZE_LONG);
+      			sf.position(bytesWritten);
       			
-      			jf.setOffset(SIZE_LONG);
+      			jf.setOffset(bytesWritten);
       			
       			freeFiles.add(jf);  
    			}
@@ -1098,24 +1158,51 @@
 			
 	// Private -----------------------------------------------------------------------------
 		
-	private void appendRecord(ByteBuffer bb, boolean sync) throws Exception
+	private int calculateBlockStart(int position, int alignment)
 	{
-		lock.acquire();
-		
-		int size = bb.capacity();
-				
-		try
-		{   					
-			checkFile(size);
-			currentFile.getFile().write(bb, sync);			
-			currentFile.extendOffset(size);
-		}
-		finally
-		{
-			lock.release();
-		}
+	   int pos = ((position / alignment) + (position % alignment != 0 ? 1 : 0)) * alignment;
+	   
+	   //System.out.println("Calculated " + pos + " and received " + position);
+	   
+	   return pos;
 	}
 	
+   private void appendRecord(ByteBuffer bb, boolean sync) throws Exception
+   {
+      lock.acquire();
+      
+      int size = bb.capacity();
+            
+      try
+      {                 
+         checkFile(size);
+         currentFile.getFile().write(bb, sync);       
+         currentFile.extendOffset(size);
+      }
+      finally
+      {
+         lock.release();
+      }
+   }
+   
+   private void appendRecord(ByteBuffer bb, boolean sync, IOCallback callback) throws Exception
+   {
+      lock.acquire();
+      
+      int size = bb.capacity();
+            
+      try
+      {                 
+         checkFile(size);
+         currentFile.getFile().write(bb, sync, callback);       
+         currentFile.extendOffset(size);
+      }
+      finally
+      {
+         lock.release();
+      }
+   }
+   
 	private void repairFrom(int pos, JournalFile file) throws Exception
 	{
 		log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
@@ -1132,27 +1219,30 @@
 		long orderingID = generateOrderingID();
 		
 		String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
+		
+		log.info("Creating file " + fileName);
 						
-		SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync);
+		SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync, false);
 		
 		sequentialFile.open();
 						
 		sequentialFile.fill(0, fileSize, FILL_CHARACTER);
 		
-		ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
+		ByteBuffer bb = sequentialFile.newBuffer(SIZE_LONG); 
 		
 		bb.putLong(orderingID);
 		
-		bb.flip();
+		bb.rewind();
 		
-		sequentialFile.write(bb, true);
+		int bytesWritten = sequentialFile.write(bb, true);
 		
-		sequentialFile.position(SIZE_LONG);
+//		sequentialFile.position(SIZE_LONG);
 		
 		JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
 		
-		info.extendOffset(SIZE_LONG);
 		
+		info.extendOffset(bytesWritten);
+		
 		return info;
 	}
 	
@@ -1179,14 +1269,20 @@
 	
 	private void checkFile(final int size) throws Exception
 	{
+	   
+	   if (size % currentFile.getFile().getAlignment() != 0)
+	   {
+	      throw new IllegalStateException("You can't write blocks in a size different than " + currentFile.getFile().getAlignment());
+	   }
 		//We take into account the first timestamp long
-		if (size > fileSize - SIZE_LONG)
+		if (size > fileSize - calculateBlockStart(8, currentFile.getFile().getAlignment()))
 		{
 			throw new IllegalArgumentException("Record is too large to store " + size);
 		}
 
 		if (currentFile == null || fileSize - currentFile.getOffset() < size)
-		{					
+		{
+		   //Thread.sleep(1000);
 			currentFile.getFile().close();
 			
 			dataFiles.add(currentFile);
@@ -1407,4 +1503,5 @@
 			}
 		}
 	}
+
 }

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -26,6 +26,7 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
+import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.logging.Logger;
 
@@ -61,6 +62,11 @@
 		this.sync = sync;		
 	}
 	
+	public int getAlignment()
+	{
+	   return 1;
+	}
+	
 	public String getFileName()
 	{
 		return fileName;
@@ -115,14 +121,41 @@
 		close();		
 	}
 
-	public int read(ByteBuffer bytes) throws Exception
-	{
-		int bytesRead = channel.read(bytes);
-		
-		return bytesRead;
-	}
+   public int read(ByteBuffer bytes) throws Exception
+   {
+      return read(bytes, null);
+   }
 
-	public int write(ByteBuffer bytes, boolean sync) throws Exception
+   public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+   {
+      try
+      {
+         int bytesRead = channel.read(bytes);
+         if (callback != null)
+         {
+            callback.done();
+         }
+         bytes.flip();
+         return bytesRead;
+      }
+      catch (Exception e)
+      {
+         if (callback != null)
+         {
+            callback.onError(-1, e.getLocalizedMessage());
+         }
+         
+         throw e;
+      }
+      
+   }
+
+   public int write(ByteBuffer bytes, boolean sync) throws Exception
+   {
+      return write(bytes, sync, null);
+   }
+   
+   public int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception
 	{
 		int bytesRead = channel.write(bytes);
 		
@@ -138,4 +171,14 @@
 	{
 		channel.position(pos);
 	}
+
+   public ByteBuffer newBuffer(int size)
+   {
+      return ByteBuffer.allocate(size);
+   }
+
+   public ByteBuffer wrapBuffer(byte[] bytes)
+   {
+      return ByteBuffer.wrap(bytes);
+   }
 }

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -37,39 +37,16 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
-public class NIOSequentialFileFactory implements SequentialFileFactory
+public class NIOSequentialFileFactory extends AbstractSequentialFactory
 {
-	private final String journalDir;
-	
 	public NIOSequentialFileFactory(final String journalDir)
 	{
-		this.journalDir = journalDir;
+		super(journalDir);
 	}	
 	
-	public SequentialFile createSequentialFile(final String fileName, final boolean sync)
+	public SequentialFile createSequentialFile(final String fileName, final boolean sync, boolean control)
 	{
 		return new NIOSequentialFile(journalDir, fileName, sync);
 	}
 
-	public List<String> listFiles(final String extension) throws Exception
-	{
-		File dir = new File(journalDir);
-		
-		FilenameFilter fnf = new FilenameFilter()
-		{
-			public boolean accept(File file, String name)
-			{
-				return name.endsWith("." + extension);
-			}
-		};
-		
-		String[] fileNames = dir.list(fnf);
-		
-		if (fileNames == null)
-		{
-			throw new IOException("Failed to list: " + journalDir);
-		}
-		
-		return Arrays.asList(fileNames);
-	}
 }

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -21,7 +21,7 @@
 import org.jboss.messaging.core.journal.RecordInfo;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.message.MessageReference;
@@ -98,7 +98,7 @@
 		
 		checkAndCreateDir(bindingsDir, config.isCreateBindingsDir());
 			
-	   SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+	   SequentialFileFactory bindingsFF = new AIOSequentialFileFactory(bindingsDir);
 	      
 	   bindingsJournal = new JournalImpl(1024 * 1024, 2, true, bindingsFF, 10000, "jbm-bindings", "bindings");
 	      
@@ -111,7 +111,7 @@
 	   
 	   checkAndCreateDir(journalDir, config.isCreateBindingsDir());
 	       
-	   SequentialFileFactory journalFF = new NIOSequentialFileFactory(journalDir);
+	   SequentialFileFactory journalFF = new AIOSequentialFileFactory(journalDir);
 	      
 	   messageJournal = new JournalImpl(config.getJournalFileSize(), 
 	   		config.getJournalMinFiles(), config.isJournalSync(), journalFF,

Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/MultiThreadWriteNativeTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/MultiThreadWriteNativeTest.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/MultiThreadWriteNativeTest.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -20,7 +20,7 @@
 import junit.framework.TestCase;
 
 import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.impl.JlibAIO;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
 import org.jboss.messaging.core.logging.Logger;
 
 // you need to define java.library.path=${project-root}/native/src/.libs
@@ -47,12 +47,12 @@
    static class ExecClass implements Runnable
    {
        
-       JlibAIO aio;
+       AsynchronousFileImpl aio;
        ByteBuffer buffer;
        AIOCallback callback;
        
        
-       public ExecClass(JlibAIO aio, ByteBuffer buffer, AIOCallback callback)
+       public ExecClass(AsynchronousFileImpl aio, ByteBuffer buffer, AIOCallback callback)
        {
            this.aio = aio;
            this.buffer = buffer;
@@ -81,7 +81,7 @@
 
    
    
-   private static void addData(JlibAIO aio, ByteBuffer buffer, AIOCallback callback) throws Exception
+   private static void addData(AsynchronousFileImpl aio, ByteBuffer buffer, AIOCallback callback) throws Exception
    {
        //aio.write(getNewPosition()*SIZE, SIZE, buffer, callback);
        executor.execute(new ExecClass(aio, buffer, callback));
@@ -125,10 +125,11 @@
    private void executeTest(boolean sync) throws Throwable
    {
        log.info(sync?"Sync test:":"Async test");
-       JlibAIO jlibAIO = new JlibAIO();
+       AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl();
        jlibAIO.open(FILE_NAME, 21000);
        log.debug("Preallocating file");
-       jlibAIO.preAllocate(NUMBER_OF_THREADS,  SIZE * NUMBER_OF_LINES);
+      
+       jlibAIO.fill(0l, NUMBER_OF_THREADS,  SIZE * NUMBER_OF_LINES, (byte)0);
        log.debug("Done Preallocating file");
        
        CountDownLatch latchStart = new CountDownLatch (NUMBER_OF_THREADS + 1);
@@ -179,9 +180,9 @@
        Throwable failed = null;
        CountDownLatch latchStart;
        boolean sync;
-       JlibAIO libaio;
+       AsynchronousFileImpl libaio;
 
-       public ThreadProducer(String name, CountDownLatch latchStart, JlibAIO libaio, boolean sync)
+       public ThreadProducer(String name, CountDownLatch latchStart, AsynchronousFileImpl libaio, boolean sync)
        {
            super(name);
            this.latchStart = latchStart;
@@ -281,9 +282,9 @@
        boolean errorCalled = false;
        CountDownLatch latchDone;
        ByteBuffer releaseMe;
-       JlibAIO libaio;
+       AsynchronousFileImpl libaio;
        
-       public LocalCallback(CountDownLatch latchDone, ByteBuffer releaseMe, JlibAIO libaio)
+       public LocalCallback(CountDownLatch latchDone, ByteBuffer releaseMe, AsynchronousFileImpl libaio)
        {
            this.latchDone = latchDone;
            this.releaseMe = releaseMe;

Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -13,16 +13,21 @@
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetEncoder;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.impl.JlibAIO;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.logging.Logger;
 
 import junit.framework.TestCase;
 
 //you need to define java.library.path=${project-root}/native/src/.libs
 public class SingleThreadWriteNativeTest extends TestCase
 {
+   private static final Logger log = Logger.getLogger(SingleThreadWriteNativeTest.class);
+   
    private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
    
    
@@ -52,7 +57,97 @@
        buffer.put((byte)'\n');
        
    }
+
+   public void testTwoFiles() throws Exception
+   {
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      final AsynchronousFileImpl controller2 = new AsynchronousFileImpl();
+      controller.open(FILE_NAME + ".1", 10000);
+      controller2.open(FILE_NAME + ".2", 10000);
+      
+      int numberOfLines = 100000;
+      int size = 1024;
+       
+       try
+       {
+           System.out.println("++testDirectDataNoPage"); System.out.flush();
+           CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+           CountDownLatch latchDone2 = new CountDownLatch(numberOfLines);
+           
+           ByteBuffer block = controller.newBuffer(size);
+           encodeBufer(block);
+
+           preAlloc(controller, numberOfLines * size);
+           preAlloc(controller2, numberOfLines * size);
+
+           ArrayList<LocalAIO> list = new ArrayList<LocalAIO>();
+           ArrayList<LocalAIO> list2 = new ArrayList<LocalAIO>();
    
+           for (int i=0; i<numberOfLines; i++)
+           {
+               list.add(new LocalAIO(latchDone));
+               list2.add(new LocalAIO(latchDone2));
+           }
+           
+          
+           long valueInitial = System.currentTimeMillis();
+   
+           System.out.println("Adding data");
+           
+           long lastTime = System.currentTimeMillis();
+           int counter = 0;
+           Iterator<LocalAIO> iter2 = list2.iterator();
+           
+           for (LocalAIO tmp: list)
+           {
+               LocalAIO tmp2 = iter2.next();
+               
+               controller.write(counter * size, size, block, tmp);
+               controller.write(counter * size, size, block, tmp2);
+               if (++counter % 5000 == 0)
+               {
+                   System.out.println(5000*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Async)");
+                   lastTime = System.currentTimeMillis();
+               }
+               
+           }
+           
+           System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
+           
+           
+           System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
+           System.out.println("Flush now");
+           System.out.println("Received " + LocalAIO.staticDone);
+           long timeTotal = System.currentTimeMillis() - valueInitial;
+
+           System.out.println("Asynchronous time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size  + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
+
+           latchDone.await();
+           latchDone2.await();
+   
+           timeTotal = System.currentTimeMillis() - valueInitial;
+           System.out.println("After completions time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size  + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
+   
+           for (LocalAIO tmp: list)
+           {
+               assertEquals(1, tmp.timesDoneCalled);
+               assertTrue(tmp.doneCalled);
+               assertFalse(tmp.errorCalled);
+           }
+           
+           controller.destroyBuffer(block);
+           
+           controller.close();
+       }
+       finally
+       {
+           try {controller.close();} catch (Exception ignored){}
+       }
+       
+       
+   }
+   
+
    public void testAddBeyongSimultaneousLimit() throws Exception
    {
        asyncData(150000,1024,100);
@@ -63,13 +158,122 @@
        asyncData(150000,1024,20000);
    }
    
+   public void testValidateData() throws Exception
+   {
+      validateData(150000,1024,20000);
+   }
+
+   public void testInvalidReads() throws Exception
+   {
+      class LocalCallback implements AIOCallback
+      {
+
+         CountDownLatch latch = new CountDownLatch(1);
+         boolean error;
+         public void done()
+         {
+            latch.countDown();
+         }
+
+         public void onError(int errorCode, String errorMessage)
+         {
+            this.error = true;
+            latch.countDown();
+         }
+      }
+
+      AsynchronousFileImpl controller = new AsynchronousFileImpl();
+      try
+      {
+          
+          final int NUMBER_LINES = 1;
+          final int SIZE = 512;
+          
+          controller.open(FILE_NAME, 10);
+          controller.close();
+          
+          controller = new AsynchronousFileImpl();
+          
+          controller.open(FILE_NAME, 10);
+          
+          controller.fill(0,1, 512, (byte)'j');
+          
+          
+          ByteBuffer buffer = controller.newBuffer(SIZE);
+  
+          
+          buffer.clear();
+          
+          for (int i=0; i<SIZE; i++)
+          {
+              buffer.put((byte)(i%100));
+          }
+          
+          LocalCallback callbackLocal = new LocalCallback();
+          
+          controller.write(0, 512, buffer, callbackLocal);
+          
+          callbackLocal.latch.await();
+          
+          ByteBuffer newBuffer = ByteBuffer.allocateDirect(50);
+          
+          callbackLocal = new LocalCallback();
+          
+          controller.read(0, 50, newBuffer, callbackLocal);
+          
+          callbackLocal.latch.await();
+          
+          //assertTrue(callbackLocal.error);
+          
+          callbackLocal = new LocalCallback();
+          
+          byte bytes[] = new byte[512];
+          
+          try
+          {
+             newBuffer = ByteBuffer.wrap(bytes);
+             
+             controller.read(0, 512, newBuffer, callbackLocal);
+             
+             fail("An exception was supposed to be thrown");
+          }
+          catch (Exception ignored)
+          {
+          }
+          
+          //newBuffer = ByteBuffer.allocateDirect(512);
+          newBuffer = controller.newBuffer(512);
+          callbackLocal = new LocalCallback();
+          controller.read(0, 512, newBuffer,callbackLocal);
+          callbackLocal.latch.await();
+          assertFalse(callbackLocal.error);
+          
+          newBuffer.rewind();
+          
+          byte[] bytesRead = new byte[SIZE];
+          
+          newBuffer.get(bytesRead);
+          
+          for (int i=0; i<SIZE;i++)
+          {
+             assertEquals((byte)(i%100), bytesRead[i]);
+          }
+          
+          
+          controller.destroyBuffer(buffer);
+      }
+      finally
+      {
+          try { controller.close(); } catch (Throwable ignored){}
+          
+      }
+          
+   }
+
+   
    public void testRead() throws Exception
    {
-       
-       
-       
-
-       final JlibAIO controller = new JlibAIO();
+       final AsynchronousFileImpl controller = new AsynchronousFileImpl();
        try
        {
            
@@ -150,9 +354,94 @@
            
    }
    
+   /**
+    * This method is not used unless you uncomment testValidateData
+    * The purpose of this method is to verify if the information generated by one of the write methods is correct
+    * @param numberOfLines
+    * @param size
+    * @param aioLimit
+    * @throws Exception
+    */
+   private void validateData(int numberOfLines, int size, int aioLimit) throws Exception
+   {
+       final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+       controller.open(FILE_NAME, aioLimit);
+       
+       ByteBuffer compareBlock = ByteBuffer.allocateDirect(size);
+       encodeBufer(compareBlock);
+       
+       ByteBuffer readBuffer = controller.newBuffer(size);
+       
+       
+       boolean firstInvalid = false;
+       for (int i=0;i<numberOfLines;i++)
+       {
+          if (i % 1000 == 0)
+          {
+             log.info("line = " + i);
+          }
+          CountDownLatch latch = new CountDownLatch(1);
+          LocalAIO callback = new LocalAIO(latch);
+          controller.read(i * size, size, readBuffer, callback);
+
+          latch.await();
+          
+          if (!compareBuffers(compareBlock, readBuffer))
+          {
+             //log.info("Invalid line at " + i);
+             firstInvalid=true;
+          }
+          else
+          {
+             if (firstInvalid)
+             {
+                for (int line=0;line<10;line++) log.info("*********************************************");
+                log.warn("Valid line after an invalid line!!!");
+             }
+          }
+          
+          readBuffer.position(100);
+          ByteBuffer buf1 = readBuffer.slice();
+          
+          //System.out.println("buf1=" + buf1);
+          
+          
+          
+       }
+   }
+   
+   
+   private boolean compareBuffers(ByteBuffer buffer1, ByteBuffer buffer2)
+   {
+      
+      buffer1.rewind();
+      buffer2.rewind();
+      
+      if (buffer1.limit() != buffer2.limit())
+      {
+         return false;
+      }
+      
+      byte bytes1[] = new byte[buffer1.limit()];
+      byte bytes2[] = new byte[buffer2.limit()];
+      
+      buffer1.get(bytes1);
+      buffer2.get(bytes2);
+      
+      for (int i=0; i< bytes1.length; i++)
+      {
+         if (bytes1[i] != bytes2[i])
+         {
+            return false;
+         }
+      }
+      
+      return true;
+   }
+   
    private void asyncData(int numberOfLines, int size, int aioLimit) throws Exception
    {
-       final JlibAIO controller = new JlibAIO();
+       final AsynchronousFileImpl controller = new AsynchronousFileImpl();
        controller.open(FILE_NAME, aioLimit);
        
        try
@@ -233,7 +522,7 @@
            final int SIZE = 1024;
            //final int SIZE = 512;
            
-           final JlibAIO controller = new JlibAIO();
+           final AsynchronousFileImpl controller = new AsynchronousFileImpl();
            controller.open(FILE_NAME, 2000);
 
            ByteBuffer block = controller.newBuffer(SIZE);
@@ -285,11 +574,11 @@
        
    }
    
-   private void preAlloc(JlibAIO controller, long size)
+   private void preAlloc(AsynchronousFileImpl controller, long size)
    {
        System.out.println("Pre allocating");  System.out.flush();
        long startPreAllocate = System.currentTimeMillis();
-       controller.preAllocate(1, size);
+       controller.fill(0l, 1, size, (byte)0);
        long endPreAllocate = System.currentTimeMillis() - startPreAllocate;
        if (endPreAllocate != 0) System.out.println("PreAllocated the file in " + endPreAllocate + " seconds, What means " + (size/endPreAllocate) + " bytes per millisecond");
    }
@@ -297,7 +586,7 @@
    
    public void testInvalidWrite() throws Exception
    {
-       final JlibAIO controller = new JlibAIO();
+       final AsynchronousFileImpl controller = new AsynchronousFileImpl();
        controller.open(FILE_NAME, 2000);
 
        try
@@ -337,7 +626,7 @@
    
    public void testInvalidAlloc() throws Exception
    {
-       JlibAIO controller = new JlibAIO();
+       AsynchronousFileImpl controller = new AsynchronousFileImpl();
        try
        {
            // You don't need to open the file to alloc it

Added: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java	                        (rev 0)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,196 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.timing;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * 
+ * A RealJournalImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealJournalImplAIOTest extends JournalImplTestUnit
+{
+	private static final Logger log = Logger.getLogger(RealJournalImplAIOTest.class);
+	
+	protected String journalDir = System.getProperty("user.home") + "/journal-test";
+		
+	protected SequentialFileFactory getFileFactory() throws Exception
+	{
+		File file = new File(journalDir);
+		
+		log.info("deleting directory " + journalDir);
+		
+		deleteDirectory(file);
+		
+		file.mkdir();		
+		
+		return new NIOSequentialFileFactory(journalDir);
+	}
+	
+	public void testSpeedNonTransactional() throws Exception
+	{
+	   
+		Journal journal =
+			new JournalImpl(10 * 1024 * 1024, 10, true, new AIOSequentialFileFactory(journalDir),
+					5000, "jbm-data", "jbm");
+		
+		journal.start();
+		
+		journal.load(new ArrayList<RecordInfo>(), null);
+		
+		final int numMessages = 50000;
+
+		final CountDownLatch latch = new CountDownLatch(numMessages);
+		
+		
+		class LocalCallback implements IOCallback
+		{
+
+		   int i=0;
+		   String message = null;
+		   boolean done = false;
+		   CountDownLatch latch;
+		   
+		   public LocalCallback(int i, CountDownLatch latch)
+		   {
+		      this.i = i;
+		      this.latch = latch;
+		   }
+         public void done()
+         {
+            synchronized (this)
+            {
+               if (done)
+               {
+                  message = "done received in duplicate";
+               }
+               done = true;
+               this.latch.countDown();
+            }
+         }
+
+         public void onError(int errorCode, String errorMessage)
+         {
+            synchronized (this)
+            {
+               System.out.println("********************** Error = " + (i++));
+               message = errorMessage;
+               latch.countDown();
+            }
+         }
+		   
+		}
+      
+		
+		byte[] data = new byte[700];
+		
+		long start = System.currentTimeMillis();
+		
+		ArrayList<LocalCallback> callbacks = new ArrayList<LocalCallback>();
+		
+		for (int i = 0; i < numMessages; i++)
+		{
+		   LocalCallback callback = new LocalCallback(i, latch);
+		   callbacks.add(callback);
+			journal.appendAddRecord(i, data, callback);
+		}
+		
+		latch.await(2, TimeUnit.SECONDS);
+		
+		long end = System.currentTimeMillis();
+		
+		double rate = 1000 * (double)numMessages / (end - start);
+		
+		boolean failed = false;
+		
+		for (LocalCallback callback: callbacks)
+		{
+	      if (callback.message != null)
+	      {
+	         fail(callback.message);
+	      }
+	      
+	      if (!callback.done)
+	      {
+	         System.out.println("callback i=" + callback.i + " was not received!");
+	         failed = true;
+	      }
+		}
+		
+		
+		// If this fails it is probably because JournalImpl it is closing the files without waiting all the completes to arrive first
+		assertFalse(failed);
+		
+		
+		log.info("Rate " + rate + " records/sec");
+
+	}
+	
+	public void testSpeedTransactional() throws Exception
+	{
+		Journal journal =
+			new JournalImpl(10 * 1024 * 1024, 10, true, new AIOSequentialFileFactory(journalDir),
+					5000, "jbm-data", "jbm");
+		
+		journal.start();
+		
+		journal.load(new ArrayList<RecordInfo>(), null);
+		
+		final int numMessages = 10000;
+		
+		byte[] data = new byte[1024];
+		
+		long start = System.currentTimeMillis();
+		
+		int count = 0;
+		for (int i = 0; i < numMessages; i++)
+		{
+			journal.appendAddRecordTransactional(i, count++, data);
+			
+			journal.appendCommitRecord(i);
+		}
+		
+		long end = System.currentTimeMillis();
+		
+		double rate = 1000 * (double)numMessages / (end - start);
+		
+		log.info("Rate " + rate + " records/sec");
+
+	}
+}
+


Property changes on: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplTest.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplTest.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -60,16 +60,16 @@
 	public void testSpeedNonTransactional() throws Exception
 	{
 		Journal journal =
-			new JournalImpl(10 * 1024 * 1024, 10, true, new NIOSequentialFileFactory(journalDir),
+			new JournalImpl(50 * 1024 * 1024, 2, true, new NIOSequentialFileFactory(journalDir),
 					5000, "jbm-data", "jbm");
 		
 		journal.start();
 		
 		journal.load(new ArrayList<RecordInfo>(), null);
 		
-		final int numMessages = 10000;
+		final int numMessages = 30000;
 		
-		byte[] data = new byte[1024];
+		byte[] data = new byte[700];
 		
 		long start = System.currentTimeMillis();
 		

Added: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/AIOSequentialFileFactoryTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/AIOSequentialFileFactoryTest.java	                        (rev 0)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/AIOSequentialFileFactoryTest.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,304 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+
+public class AIOSequentialFileFactoryTest extends FileFactoryTestBase
+{
+
+   protected String journalDir = System.getProperty("user.home") + "/journal-test";
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      File file = new File(journalDir);
+      
+      System.out.println("Filling files on " + journalDir);
+      
+      deleteDirectory(file);
+      
+      file.mkdir();     
+   }
+
+   protected SequentialFileFactory createFactory()
+   {
+      return new AIOSequentialFileFactory(journalDir);
+   }
+   
+   public void testBuffer() throws Exception
+   {
+      SequentialFile file = factory.createSequentialFile("filtetmp.log", true, false);
+      file.open();
+      ByteBuffer buff = file.newBuffer(10);
+      assertEquals(512, buff.limit());
+      //ByteBuffer buffer = 
+   }
+   
+   public void testCreateAndListFiles() throws Exception
+   {
+      List<String> expectedFiles = new ArrayList<String>();
+      
+      final int numFiles = 10;
+      
+      for (int i = 0; i < numFiles; i++)
+      {
+         String fileName = UUID.randomUUID().toString() + ".jbm";
+         
+         expectedFiles.add(fileName);
+         
+         SequentialFile sf = factory.createSequentialFile(fileName, false, false);
+         
+         sf.open();
+         
+         assertEquals(fileName, sf.getFileName());
+      }           
+      
+      //Create a couple with a different extension - they shouldn't be picked up
+      
+      SequentialFile sf1 = factory.createSequentialFile("different.file", false, false);
+      sf1.open();
+      
+      SequentialFile sf2 = factory.createSequentialFile("different.cheese", false, false);
+      sf2.open();
+                  
+      List<String> fileNames = factory.listFiles("jbm");
+      
+      assertEquals(expectedFiles.size(), fileNames.size());
+      
+      for (String fileName: expectedFiles)
+      {
+         assertTrue(fileNames.contains(fileName));
+      }
+      
+      fileNames = factory.listFiles("file");
+      
+      assertEquals(1, fileNames.size());
+      
+      assertTrue(fileNames.contains("different.file"));  
+      
+      fileNames = factory.listFiles("cheese");
+      
+      assertEquals(1, fileNames.size());
+      
+      assertTrue(fileNames.contains("different.cheese"));   
+   }
+   
+   
+   public void testFill() throws Exception
+   {
+      SequentialFile sf = factory.createSequentialFile("fill.jbm", true, false);
+      
+      sf.open();
+      
+      checkFill(sf, 0, 512, (byte)'X');
+      
+      checkFill(sf, 512, 512, (byte)'Y');
+      
+      checkFill(sf, 0, 512, (byte)'Z');
+      
+      checkFill(sf, 512, 512, (byte)'A');
+      
+      checkFill(sf, 1024, 10*1024, (byte)'B');
+   }
+   
+   public void testDelete() throws Exception
+   {
+      SequentialFile sf = factory.createSequentialFile("delete-me.jbm", true, false);
+      
+      sf.open();
+      
+      SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", true, false);
+      
+      sf2.open();
+      
+      List<String> fileNames = factory.listFiles("jbm");
+      
+      assertEquals(2, fileNames.size());
+      
+      assertTrue(fileNames.contains("delete-me.jbm"));
+      
+      assertTrue(fileNames.contains("delete-me2.jbm"));
+      
+      sf.delete();
+      
+      fileNames = factory.listFiles("jbm");
+      
+      assertEquals(1, fileNames.size());
+      
+      assertTrue(fileNames.contains("delete-me2.jbm"));
+      
+   }
+   
+   public void testWriteandRead() throws Exception
+   {
+      SequentialFile sf = factory.createSequentialFile("write.jbm", true, false);
+      
+      sf.open();
+      
+      String s1 = "aardvark";
+      byte[] bytes1 = s1.getBytes("UTF-8");
+      
+      ByteBuffer bb1 = sf.wrapBuffer(bytes1);
+      
+      String s2 = "hippopotamus";
+      byte[] bytes2 = s2.getBytes("UTF-8");
+      ByteBuffer bb2 = sf.wrapBuffer(bytes2);
+      
+      String s3 = "echidna";
+      byte[] bytes3 = s3.getBytes("UTF-8");
+      ByteBuffer bb3 = sf.wrapBuffer(bytes3);
+      
+      int bytesWritten = sf.write(bb1, true);
+      
+      assertEquals(bb1.limit(), bytesWritten);
+      
+      bytesWritten = sf.write(bb2, true);
+      
+      assertEquals(bb2.limit(), bytesWritten);
+      
+      bytesWritten = sf.write(bb3, true);
+      
+      assertEquals(bb3.limit(), bytesWritten);
+      
+      sf.position(0);
+      
+      byte[] rbytes1 = new byte[bytes1.length];
+      
+      byte[] rbytes2 = new byte[bytes2.length];
+      
+      byte[] rbytes3 = new byte[bytes3.length];
+      
+      ByteBuffer rb1 = sf.newBuffer(rbytes1.length);
+      ByteBuffer rb2 = sf.newBuffer(rbytes2.length);
+      ByteBuffer rb3 = sf.newBuffer(rbytes3.length);
+
+      int bytesRead = sf.read(rb1);
+      assertEquals(rb1.limit(), bytesRead);
+      rb1.get(rbytes1);
+      assertByteArraysEquivalent(bytes1, rbytes1);
+      
+      bytesRead = sf.read(rb2);
+      assertEquals(rb2.limit(), bytesRead);     
+      rb2.get(rbytes2);
+      assertByteArraysEquivalent(bytes2, rbytes2);
+      
+      bytesRead = sf.read(rb3);
+      assertEquals(rb3.limit(), bytesRead);
+      rb3.get(rbytes3);
+      assertByteArraysEquivalent(bytes3, rbytes3);          
+   }
+   
+   public void testPosition() throws Exception
+   {
+      SequentialFile sf = factory.createSequentialFile("position.jbm", true, false);
+      
+      sf.open();
+      
+      String s1 = "orange";
+      byte[] bytes1 = s1.getBytes("UTF-8");
+      ByteBuffer bb1 = sf.wrapBuffer(bytes1); 
+      
+      String s2 = "grapefruit";
+      byte[] bytes2 = s2.getBytes("UTF-8");
+      ByteBuffer bb2 = sf.wrapBuffer(bytes2);
+      
+      String s3 = "lemon";
+      byte[] bytes3 = s3.getBytes("UTF-8");
+      ByteBuffer bb3 = sf.wrapBuffer(bytes3);
+      
+      int bytesWritten = sf.write(bb1, true);
+      
+      assertEquals(bb1.limit(), bytesWritten);
+      
+      bytesWritten = sf.write(bb2, true);
+      
+      assertEquals(bb2.limit(), bytesWritten);
+      
+      bytesWritten = sf.write(bb3, true);
+      
+      assertEquals(bb3.limit(), bytesWritten);
+      
+      byte[] rbytes1 = new byte[bytes1.length];
+      
+      byte[] rbytes2 = new byte[bytes2.length];
+      
+      byte[] rbytes3 = new byte[bytes3.length];
+      
+      ByteBuffer rb1 = sf.newBuffer(rbytes1.length);
+      ByteBuffer rb2 = sf.newBuffer(rbytes2.length);
+      ByteBuffer rb3 = sf.newBuffer(rbytes3.length);
+      
+      sf.position(bb1.limit() + bb2.limit());
+      
+      int bytesRead = sf.read(rb3);
+      assertEquals(rb3.limit(), bytesRead);
+      rb3.rewind();
+      rb3.get(rbytes3);
+      assertByteArraysEquivalent(bytes3, rbytes3);    
+      
+      sf.position(rb1.limit());
+      
+      bytesRead = sf.read(rb2);
+      assertEquals(rb2.limit(), bytesRead);
+      rb2.get(rbytes2);
+      assertByteArraysEquivalent(bytes2, rbytes2);
+      
+      sf.position(0);
+      
+      bytesRead = sf.read(rb1);
+      assertEquals(rb1.limit(), bytesRead);
+      rb1.get(rbytes1);
+      
+      assertByteArraysEquivalent(bytes1, rbytes1);    
+   }
+   
+   public void testOpenClose() throws Exception
+   {
+      SequentialFile sf = factory.createSequentialFile("openclose.jbm", true, false);
+      
+      sf.open();
+      
+      String s1 = "cheesecake";
+      byte[] bytes1 = s1.getBytes("UTF-8");
+      ByteBuffer bb1 = sf.wrapBuffer(bytes1);
+      
+      int bytesWritten = sf.write(bb1, true);
+      
+      assertEquals(bb1.limit(), bytesWritten);
+      
+      sf.close();
+      
+      try
+      {
+         sf.write(bb1, true);
+         
+         fail("Should throw exception");
+      }
+      catch (Exception e)
+      {
+         //OK
+      }
+      
+      sf.open();
+      
+      sf.write(bb1, true);
+   }
+   
+
+
+}


Property changes on: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/AIOSequentialFileFactoryTest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Added: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FileFactoryTestBase.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FileFactoryTestBase.java	                        (rev 0)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FileFactoryTestBase.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.test.unit.UnitTestCase;
+
+public abstract class FileFactoryTestBase extends UnitTestCase
+{
+   protected abstract SequentialFileFactory createFactory();
+   
+   protected SequentialFileFactory factory;
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      factory = createFactory();
+   }
+   
+
+   
+   // Protected ---------------------------------
+   
+   protected void checkFill(SequentialFile file, int pos, int size, byte fillChar) throws Exception
+   {
+      file.fill(pos, size, fillChar);
+      
+      file.close();
+      
+      file.open();
+      
+      file.position(pos);
+      
+      
+      
+      ByteBuffer bb = ByteBuffer.allocateDirect(size);
+      
+      int bytesRead = file.read(bb);
+      
+      assertEquals(size, bytesRead);
+      
+      bb.rewind();
+      
+      byte bytes[] = new byte[size];
+      
+      bb.get(bytes);
+      
+      for (int i = 0; i < size; i++)
+      {
+         //log.info(" i is " + i);
+         assertEquals(fillChar, bytes[i]);
+      }
+            
+   }
+   
+   
+
+}


Property changes on: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FileFactoryTestBase.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -210,8 +210,9 @@
 		TransactionHolder tx = getTransaction(txID);
 		
 		for (int i = 0; i < arguments.length; i++)
-		{		
-			byte[] record = generateRecord(recordLength);
+		{	
+		   // SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE
+			byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX );
 			
 			journal.appendAddRecordTransactional(txID, arguments[i], record);
 			
@@ -226,7 +227,7 @@
 		
 		for (int i = 0; i < arguments.length; i++)
 		{		
-			byte[] updateRecord = generateRecord(recordLength);
+			byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_UPDATE_RECORD_TX );
 							
 			journal.appendUpdateRecordTransactional(txID, arguments[i], updateRecord);
 			

Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestUnit.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestUnit.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestUnit.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -253,6 +253,36 @@
 		}
 	}
 	
+   public void testEmptyReopen() throws Exception
+   {
+      setup(2, 10 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      
+      List<String> files1 = fileFactory.listFiles(fileExtension);
+      
+      assertEquals(2, files1.size());
+      
+      stopJournal();
+      
+      setup(2, 10 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      
+      List<String> files2 = fileFactory.listFiles(fileExtension);
+      
+      assertEquals(2, files2.size());
+      
+      for (String file: files1)
+      {
+         assertTrue(files2.contains(file));
+      }
+            
+      stopJournal(); 
+   }
+	
 	public void testCreateFilesOnLoad() throws Exception
 	{
 		setup(10, 10 * 1024, true);
@@ -2015,9 +2045,10 @@
 		loadAndCheck();		
 	}
 	
+	/// aki
 	public void testMultipleTransactionsDifferentIDs() throws Exception
 	{
-		setup(10, 10 * 1024, true);
+		setup(5, 512 + 9 * 1024, true);
 		createJournal();
 		startJournal();
 		load();
@@ -2028,8 +2059,6 @@
 		commit(1);
 		
 		addTx(2, 11, 12, 13, 14, 15, 16);
-		updateTx(2, 11, 13, 15);
-		deleteTx(2, 11, 12, 13, 14, 15, 16);
 		commit(2);
 		
 		addTx(3, 21, 22, 23, 24, 25, 26);
@@ -2043,6 +2072,29 @@
 		loadAndCheck();
 	}
 	
+	public void testTransactionOnDifferentFiles() throws Exception
+	{
+	   setup(2, 512 + 2*1024, true);
+
+	   createJournal();
+      startJournal();
+      load();
+
+      addTx(1, 1, 2, 3, 4, 5, 6);
+      updateTx(1, 1, 3, 5);
+      deleteTx(1, 1, 2, 3, 4, 5, 6);
+      commit(1);
+      
+      addTx(2, 11, 12);
+      
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+      
+	   
+	}
+	
 	public void testMultipleInterleavedTransactionsDifferentIDs() throws Exception
 	{
 		setup(10, 10 * 1024, true);

Added: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealAIOJournalImplTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealAIOJournalImplTest.java	                        (rev 0)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealAIOJournalImplTest.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,56 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * 
+ * A RealJournalImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealAIOJournalImplTest extends JournalImplTestUnit
+{
+	private static final Logger log = Logger.getLogger(RealAIOJournalImplTest.class);
+	
+	protected String journalDir = System.getProperty("user.home") + "/journal-test";
+		
+	protected SequentialFileFactory getFileFactory() throws Exception
+	{
+		File file = new File(journalDir);
+		
+		log.info("deleting directory " + journalDir);
+		
+		deleteDirectory(file);
+		
+		file.mkdir();		
+		
+		return new AIOSequentialFileFactory(journalDir);
+	}	
+	
+}


Property changes on: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealAIOJournalImplTest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Deleted: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -1,60 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.core.journal.impl.test.unit;
-
-import java.io.File;
-import java.util.ArrayList;
-
-import org.jboss.messaging.core.journal.Journal;
-import org.jboss.messaging.core.journal.RecordInfo;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
-import org.jboss.messaging.core.logging.Logger;
-
-/**
- * 
- * A RealJournalImplTest
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class RealJournalImplTest extends JournalImplTestUnit
-{
-	private static final Logger log = Logger.getLogger(RealJournalImplTest.class);
-	
-	protected String journalDir = System.getProperty("user.home") + "/journal-test";
-		
-	protected SequentialFileFactory getFileFactory() throws Exception
-	{
-		File file = new File(journalDir);
-		
-		log.info("deleting directory " + journalDir);
-		
-		deleteDirectory(file);
-		
-		file.mkdir();		
-		
-		return new NIOSequentialFileFactory(journalDir);
-	}	
-	
-}

Copied: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealNIOJournalImplTest.java (from rev 4009, trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java)
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealNIOJournalImplTest.java	                        (rev 0)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealNIOJournalImplTest.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,60 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * 
+ * A RealJournalImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealNIOJournalImplTest extends JournalImplTestUnit
+{
+	private static final Logger log = Logger.getLogger(RealNIOJournalImplTest.class);
+	
+	protected String journalDir = System.getProperty("user.home") + "/journal-test";
+		
+	protected SequentialFileFactory getFileFactory() throws Exception
+	{
+		File file = new File(journalDir);
+		
+		log.info("deleting directory " + journalDir);
+		
+		deleteDirectory(file);
+		
+		file.mkdir();		
+		
+		return new NIOSequentialFileFactory(journalDir);
+	}	
+	
+}

Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -66,7 +66,7 @@
 			
 			expectedFiles.add(fileName);
 			
-			SequentialFile sf = factory.createSequentialFile(fileName, false);
+			SequentialFile sf = factory.createSequentialFile(fileName, false, false);
 			
 			sf.open();
 			
@@ -75,10 +75,10 @@
 		
 		//Create a couple with a different extension - they shouldn't be picked up
 		
-		SequentialFile sf1 = factory.createSequentialFile("different.file", false);
+		SequentialFile sf1 = factory.createSequentialFile("different.file", false, false);
 		sf1.open();
 		
-		SequentialFile sf2 = factory.createSequentialFile("different.cheese", false);
+		SequentialFile sf2 = factory.createSequentialFile("different.cheese", false, false);
 		sf2.open();
 						
 		List<String> fileNames = factory.listFiles("jbm");
@@ -106,7 +106,7 @@
 	
 	public void testFill() throws Exception
 	{
-		SequentialFile sf = factory.createSequentialFile("fill.jbm", true);
+		SequentialFile sf = factory.createSequentialFile("fill.jbm", true, false);
 		
 		sf.open();
 		
@@ -123,11 +123,11 @@
 	
 	public void testDelete() throws Exception
 	{
-		SequentialFile sf = factory.createSequentialFile("delete-me.jbm", true);
+		SequentialFile sf = factory.createSequentialFile("delete-me.jbm", true, false);
 		
 		sf.open();
 		
-		SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", true);
+		SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", true, false);
 		
 		sf2.open();
 		
@@ -151,7 +151,7 @@
 	
 	public void testWriteandRead() throws Exception
 	{
-		SequentialFile sf = factory.createSequentialFile("write.jbm", true);
+		SequentialFile sf = factory.createSequentialFile("write.jbm", true, false);
 		
 		sf.open();
 		
@@ -206,7 +206,7 @@
 	
 	public void testPosition() throws Exception
 	{
-		SequentialFile sf = factory.createSequentialFile("position.jbm", true);
+		SequentialFile sf = factory.createSequentialFile("position.jbm", true, false);
 		
 		sf.open();
 		
@@ -265,7 +265,7 @@
 	
 	public void testOpenClose() throws Exception
 	{
-		SequentialFile sf = factory.createSequentialFile("openclose.jbm", true);
+		SequentialFile sf = factory.createSequentialFile("openclose.jbm", true, false);
 		
 		sf.open();
 		

Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
@@ -44,7 +45,7 @@
 		
 	private Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<String, FakeSequentialFile>();
 	
-	public SequentialFile createSequentialFile(final String fileName, final boolean sync) throws Exception
+	public SequentialFile createSequentialFile(final String fileName, final boolean sync, boolean control) throws Exception
 	{
 		FakeSequentialFile sf = fileMap.get(fileName);
 		
@@ -178,6 +179,11 @@
 		
 		public int read(ByteBuffer bytes) throws Exception
 		{
+		   return read(bytes, null);
+		}
+		
+		public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+		{
 			if (!open)
 			{
 				throw new IllegalStateException("Is closed");
@@ -193,6 +199,8 @@
 			
 			bytes.put(bytesRead);
 			
+			if (callback != null) callback.done();
+			
 			return bytesRead.length;
 		}
 
@@ -208,22 +216,30 @@
 			data.position(pos);
 		}
 
+      public int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception
+      {
+         if (!open)
+         {
+            throw new IllegalStateException("Is closed");
+         }
+         
+         int position = data == null ? 0 : data.position();
+         
+         checkAndResize(bytes.capacity() + position);
+         
+         //log.info("write called, position is " + data.position() + " bytes is " + bytes.array().length);
+         
+         data.put(bytes);
+         
+         if (callback!=null) callback.done();
+         
+         return bytes.array().length;
+         
+      }
+      
 		public int write(ByteBuffer bytes, boolean sync) throws Exception
 		{
-			if (!open)
-			{
-				throw new IllegalStateException("Is closed");
-			}
-			
-			int position = data == null ? 0 : data.position();
-			
-			checkAndResize(bytes.capacity() + position);
-			
-			//log.info("write called, position is " + data.position() + " bytes is " + bytes.array().length);
-			
-			data.put(bytes);
-			
-			return bytes.array().length;
+		   return write(bytes, sync, null);
 		}
 		
 		private void checkAndResize(int size)
@@ -245,7 +261,22 @@
 			}
 		}
 
+      public ByteBuffer newBuffer(int size)
+      {
+         return ByteBuffer.allocate(size);
+      }
 
+      public ByteBuffer wrapBuffer(byte[] bytes)
+      {
+         return ByteBuffer.wrap(bytes);
+      }
+
+      public int getAlignment() throws Exception
+      {
+         return 1;
+      }
+
+
 	}
 
 }

Modified: branches/trunk_tmp_aio/tests/src/org/jboss/test/messaging/JBMServerTestCase.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/test/messaging/JBMServerTestCase.java	2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/test/messaging/JBMServerTestCase.java	2008-04-10 01:27:55 UTC (rev 4026)
@@ -247,7 +247,7 @@
 
    protected boolean getClearDatabase()
    {
-      return true;
+      return false;
    }
 
    protected HashMap<String, Object> getConfiguration()




More information about the jboss-cvs-commits mailing list