[jboss-cvs] JBoss Messaging SVN: r4026 - in branches/trunk_tmp_aio: native/src and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Apr 9 21:27:55 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-04-09 21:27:55 -0400 (Wed, 09 Apr 2008)
New Revision: 4026
Added:
branches/trunk_tmp_aio/native/src/.libs/
branches/trunk_tmp_aio/native/src/.libs/libJBMLibAIO.so
branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/IOCallback.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/AIOSequentialFileFactoryTest.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FileFactoryTestBase.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealAIOJournalImplTest.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealNIOJournalImplTest.java
Removed:
branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_JlibAIO.h
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/JlibAIO.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java
Modified:
branches/trunk_tmp_aio/.classpath
branches/trunk_tmp_aio/.project
branches/trunk_tmp_aio/build-messaging.xml
branches/trunk_tmp_aio/native/src/
branches/trunk_tmp_aio/native/src/AsyncFile.cpp
branches/trunk_tmp_aio/native/src/AsyncFile.h
branches/trunk_tmp_aio/native/src/LibAIOController.cpp
branches/trunk_tmp_aio/native/src/Makefile.am
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/Journal.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFile.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/MultiThreadWriteNativeTest.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplTest.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestUnit.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
branches/trunk_tmp_aio/tests/src/org/jboss/test/messaging/JBMServerTestCase.java
Log:
Intermediate commit on temporary branch
Modified: branches/trunk_tmp_aio/.classpath
===================================================================
--- branches/trunk_tmp_aio/.classpath 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/.classpath 2008-04-10 01:27:55 UTC (rev 4026)
@@ -1,21 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
- <classpathentry kind="src" path="docs/examples/queue-failover/src"/>
- <classpathentry kind="src" path="docs/examples/embedded/src"/>
- <classpathentry kind="src" path="output/gen-parsers"/>
- <classpathentry kind="src" path="docs/examples/bridge/src"/>
- <classpathentry kind="src" path="docs/examples/stateless-clustered/src"/>
- <classpathentry kind="src" path="docs/examples/mdb-failure/src"/>
- <classpathentry kind="src" path="docs/examples/distributed-queue/src"/>
- <classpathentry kind="src" path="docs/examples/common/src"/>
- <classpathentry kind="src" path="docs/examples/distributed-topic/src"/>
- <classpathentry kind="src" path="docs/examples/http/src"/>
- <classpathentry kind="src" path="docs/examples/mdb/src"/>
- <classpathentry kind="src" path="docs/examples/queue/src"/>
- <classpathentry kind="src" path="docs/examples/secure-socket/src"/>
- <classpathentry kind="src" path="docs/examples/stateless/src"/>
- <classpathentry kind="src" path="docs/examples/topic/src"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
+ <classpathentry kind="src" path="output/gen-parsers"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src"/>
<classpathentry kind="src" path="tests/etc/ide"/>
<classpathentry excluding="ide/" kind="src" path="tests/etc"/>
@@ -26,7 +12,6 @@
<classpathentry kind="lib" path="thirdparty/jboss/profiler/jvmti/lib/jboss-profiler-jvmti.jar"/>
<classpathentry kind="lib" path="thirdparty/hsqldb/lib/hsqldb.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-logging/lib/commons-logging.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/serialization/lib/jboss-serialization.jar"/>
<classpathentry kind="lib" path="thirdparty/sun-javacc/lib/javacc.jar"/>
<classpathentry kind="lib" path="thirdparty/apache-xerces/lib/xercesImpl.jar"/>
<classpathentry kind="lib" path="thirdparty/dom4j/lib/dom4j.jar"/>
@@ -63,7 +48,6 @@
<classpathentry kind="lib" path="thirdparty/jboss/integration/lib/jboss-transaction-spi.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/jboss-jaspi-api/lib/jboss-jaspi-api.jar"/>
<classpathentry kind="lib" path="thirdparty/slf4j/api/lib/slf4j-api-1.4.3.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>
@@ -76,6 +60,5 @@
<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12.jar"/>
<classpathentry kind="lib" path="src/etc/server/default/config"/>
<classpathentry kind="lib" path="src/etc/server/default/deploy"/>
- <classpathentry kind="lib" path="tests/lib/jdbc-drivers/mysql-connector-java-5.1.5-bin.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Modified: branches/trunk_tmp_aio/.project
===================================================================
--- branches/trunk_tmp_aio/.project 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/.project 2008-04-10 01:27:55 UTC (rev 4026)
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
- <name>jboss-messaging</name>
+ <name>trunk</name>
<comment></comment>
<projects>
</projects>
Modified: branches/trunk_tmp_aio/build-messaging.xml
===================================================================
--- branches/trunk_tmp_aio/build-messaging.xml 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/build-messaging.xml 2008-04-10 01:27:55 UTC (rev 4026)
@@ -659,7 +659,7 @@
<property name="native.include" value="${native.src}/src"/>
<target name="native-header" depends="compile">
- <javah class="org.jboss.messaging.core.asyncio.impl.JlibAIO" classpathref="compilation.classpath" destdir="${native.include}">
+ <javah class="org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl" classpathref="compilation.classpath" destdir="${native.include}">
</javah>
</target>
Property changes on: branches/trunk_tmp_aio/native/src
___________________________________________________________________
Name: svn:ignore
- Makefile
Makefile.in
.deps
.libs
+ Makefile
Makefile.in
.deps
Property changes on: branches/trunk_tmp_aio/native/src/.libs
___________________________________________________________________
Name: svn:ignore
+ *.lai
*.0
Added: branches/trunk_tmp_aio/native/src/.libs/libJBMLibAIO.so
===================================================================
--- branches/trunk_tmp_aio/native/src/.libs/libJBMLibAIO.so (rev 0)
+++ branches/trunk_tmp_aio/native/src/.libs/libJBMLibAIO.so 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1 @@
+link libJBMLibAIO.so.0.0.0
\ No newline at end of file
Property changes on: branches/trunk_tmp_aio/native/src/.libs/libJBMLibAIO.so
___________________________________________________________________
Name: svn:special
+ *
Modified: branches/trunk_tmp_aio/native/src/AsyncFile.cpp
===================================================================
--- branches/trunk_tmp_aio/native/src/AsyncFile.cpp 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/native/src/AsyncFile.cpp 2008-04-10 01:27:55 UTC (rev 4026)
@@ -114,7 +114,7 @@
while (pollerRunning)
{
- int result = io_getevents(this->aioContext, 1, maxIO, events, &oneSecond);
+ int result = io_getevents(this->aioContext, 1, maxIO, events, 0);
#ifdef DEBUG
fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
@@ -134,39 +134,40 @@
struct iocb * iocbp = events[i].obj;
- CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data;
-
- long result = events[i].res;
- if (result < 0)
+ if (iocbp->data == (void *) -1)
{
- std::string strerror = io_error(result);
- adapter->onError(threadContext, result, strerror);
+ pollerRunning = 0;
+// controller->log(threadContext, 2, "Received poller request to stop");
}
else
{
- adapter->completeBlock(threadContext);
- adapter->deleteRef(threadContext);
+ CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data;
+
+ long result = events[i].res;
+ if (result < 0)
+ {
+ std::string strerror = io_error(result);
+ adapter->onError(threadContext, result, strerror);
+ }
+ else
+ {
+ adapter->completeBlock(threadContext);
+ adapter->deleteRef(threadContext);
+ }
}
delete iocbp;
}
}
- controller->log(threadContext, 2, "Poller finished execution");
+// controller->log(threadContext, 2, "Poller finished execution");
}
-void AsyncFile::preAllocate(THREAD_CONTEXT threadContext, int blocks, size_t size)
+void AsyncFile::preAllocate(THREAD_CONTEXT , off_t position, int blocks, size_t size, int fillChar)
{
- size_t currentSize = lseek (fileHandle, 0, SEEK_END);
-
- if (currentSize >= blocks * size)
- {
- controller->log(threadContext,2,"File being reused");
- return;
- }
-
+
if (size % ALIGNMENT != 0)
{
throw AIOException (101, "You can only pre allocate files in multiples of 512");
@@ -178,10 +179,10 @@
throw AIOException(10, "Error on posix_memalign");
}
- memset(preAllocBuffer, 0, size);
+ memset(preAllocBuffer, fillChar, size);
- if (::lseek (fileHandle, 0, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+ if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
for (int i=0; i<blocks; i++)
{
@@ -191,7 +192,7 @@
}
}
- if (::lseek (fileHandle, 0, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+ if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
free (preAllocBuffer);
}
@@ -285,7 +286,22 @@
void AsyncFile::stopPoller(THREAD_CONTEXT threadContext)
{
pollerRunning = 0;
- controller->log(threadContext, 2,"Setting poller to stop");
+
+
+ struct iocb * iocb = new struct iocb();
+ ::io_prep_pwrite(iocb, fileHandle, 0, 0, 0);
+ iocb->data = (void *) -1;
+
+ int result = 0;
+
+ while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
+ {
+ fprintf(stderr, "Couldn't send request to stop poller, trying again");
+ controller->log(threadContext, 1, "Couldn't send request to stop poller, trying again");
+ ::usleep(WAIT_FOR_SPOT);
+ }
+
+// controller->log(threadContext, 2,"Sent data to stop");
// It will wait the Poller to gives up its lock
LockClass lock(&pollerMutex);
}
Modified: branches/trunk_tmp_aio/native/src/AsyncFile.h
===================================================================
--- branches/trunk_tmp_aio/native/src/AsyncFile.h 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/native/src/AsyncFile.h 2008-04-10 01:27:55 UTC (rev 4026)
@@ -79,9 +79,8 @@
// Finishes the polling thread (if any) and return
void stopPoller(THREAD_CONTEXT threadContext);
+ void preAllocate(THREAD_CONTEXT threadContext, off_t position, int blocks, size_t size, int fillChar);
- void preAllocate(THREAD_CONTEXT threadContext, int numberOfBlocks, size_t size);
-
void pollEvents(THREAD_CONTEXT threadContext);
};
Modified: branches/trunk_tmp_aio/native/src/LibAIOController.cpp
===================================================================
--- branches/trunk_tmp_aio/native/src/LibAIOController.cpp 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/native/src/LibAIOController.cpp 2008-04-10 01:27:55 UTC (rev 4026)
@@ -25,7 +25,7 @@
#include <string>
-#include "org_jboss_messaging_core_asyncio_impl_JlibAIO.h"
+#include "org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h"
#include "JavaUtilities.h"
@@ -41,7 +41,7 @@
* Method: init
* Signature: (Ljava/lang/String;Ljava/lang/Class;)J
*/
-JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_init
+JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_init
(JNIEnv * env, jclass, jstring jstrFileName, jclass callbackClass, jint maxIO, jobject logger)
{
try
@@ -64,7 +64,7 @@
controller->logger = env->NewGlobalRef(logger);
- controller->log(env,4, "Controller initialized");
+// controller->log(env,4, "Controller initialized");
return (jlong)controller;
}
@@ -74,13 +74,26 @@
}
}
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_read
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_read
(JNIEnv *env, jclass, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
{
try
{
AIOController * controller = (AIOController *) controllerAddress;
void * buffer = env->GetDirectBufferAddress(jbuffer);
+
+ if (buffer == 0)
+ {
+ throwException(env, "java/lang/IllegalStateException", "Invalid Direct Buffer used");
+ return;
+ }
+
+ if (((long)buffer) % 512)
+ {
+ throwException(env, "java/lang/IllegalStateException", "Buffer not aligned for use with DMA");
+ return;
+ }
+
CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback));
controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
@@ -91,13 +104,19 @@
}
}
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_write
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
(JNIEnv *env, jclass, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
{
try
{
AIOController * controller = (AIOController *) controllerAddress;
void * buffer = env->GetDirectBufferAddress(jbuffer);
+ if (buffer == 0)
+ {
+ throwException(env, "java/lang/IllegalStateException", "Invalid Direct Buffer used");
+ return;
+ }
+
CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback));
controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
@@ -110,7 +129,7 @@
-JNIEXPORT void Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_internalPollEvents
+JNIEXPORT void Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_internalPollEvents
(JNIEnv *env, jclass, jlong controllerAddress)
{
try
@@ -124,7 +143,7 @@
}
}
-JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_newBuffer
+JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_newBuffer
(JNIEnv * env, jobject, jlong size)
{
try
@@ -150,7 +169,7 @@
}
}
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_destroyBuffer
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_destroyBuffer
(JNIEnv * env, jobject, jobject jbuffer)
{
void * buffer = env->GetDirectBufferAddress(jbuffer);
@@ -159,7 +178,7 @@
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_closeInternal
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_closeInternal
(JNIEnv *env, jclass, jlong controllerAddress)
{
try
@@ -175,13 +194,17 @@
}
}
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_preAllocate
- (JNIEnv * env, jclass, jlong controllerAddress, jint blocks, jlong size)
+
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_fill
+ (JNIEnv * env, jclass, jlong controllerAddress, jlong position, jint blocks, jlong size, jbyte fillChar)
{
try
{
AIOController * controller = (AIOController *) controllerAddress;
- controller->fileOutput.preAllocate(env, blocks, size);
+
+ controller->fileOutput.preAllocate(env, position, blocks, size, fillChar);
+
+ //controller->fileOutput.preAllocate(env, blocks, size);
}
catch (AIOException& e)
{
Modified: branches/trunk_tmp_aio/native/src/Makefile.am
===================================================================
--- branches/trunk_tmp_aio/native/src/Makefile.am 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/native/src/Makefile.am 2008-04-10 01:27:55 UTC (rev 4026)
@@ -4,4 +4,4 @@
libJBMLibAIO_la_SOURCES = AIOController.cpp AIOController.h AIOException.h AsyncFile.cpp \
AsyncFile.h CallbackAdapter.h JAIODatatypes.h JavaUtilities.cpp \
JavaUtilities.h JNICallbackAdapter.cpp JNICallbackAdapter.h LibAIOController.cpp \
- LockClass.h org_jboss_messaging_core_persistence_impl_libaio_jni_impl_JlibAIO
+ LockClass.h org_jboss_messaging_core_persistence_impl_libaio_jni_impl_AsynchronousFileImple.h
Added: branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
===================================================================
--- branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h (rev 0)
+++ branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,79 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl */
+
+#ifndef _Included_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+#define _Included_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+#ifdef __cplusplus
+extern "C" {
+#endif
+/* Inaccessible static: log */
+/* Inaccessible static: loaded */
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: init
+ * Signature: (Ljava/lang/String;Ljava/lang/Class;ILorg/jboss/messaging/core/logging/Logger;)J
+ */
+JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_init
+ (JNIEnv *, jclass, jstring, jclass, jint, jobject);
+
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: write
+ * Signature: (JJJLjava/nio/ByteBuffer;Lorg/jboss/messaging/core/asyncio/AIOCallback;)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
+ (JNIEnv *, jclass, jlong, jlong, jlong, jobject, jobject);
+
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: read
+ * Signature: (JJJLjava/nio/ByteBuffer;Lorg/jboss/messaging/core/asyncio/AIOCallback;)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_read
+ (JNIEnv *, jclass, jlong, jlong, jlong, jobject, jobject);
+
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: fill
+ * Signature: (JJIJB)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_fill
+ (JNIEnv *, jclass, jlong, jlong, jint, jlong, jbyte);
+
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: closeInternal
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_closeInternal
+ (JNIEnv *, jclass, jlong);
+
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: internalPollEvents
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_internalPollEvents
+ (JNIEnv *, jclass, jlong);
+
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: destroyBuffer
+ * Signature: (Ljava/nio/ByteBuffer;)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_destroyBuffer
+ (JNIEnv *, jobject, jobject);
+
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: newBuffer
+ * Signature: (J)Ljava/nio/ByteBuffer;
+ */
+JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_newBuffer
+ (JNIEnv *, jobject, jlong);
+
+#ifdef __cplusplus
+}
+#endif
+#endif
Deleted: branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_JlibAIO.h
===================================================================
--- branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_JlibAIO.h 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_JlibAIO.h 2008-04-10 01:27:55 UTC (rev 4026)
@@ -1,79 +0,0 @@
-/* DO NOT EDIT THIS FILE - it is machine generated */
-#include <jni.h>
-/* Header for class org_jboss_messaging_core_asyncio_impl_JlibAIO */
-
-#ifndef _Included_org_jboss_messaging_core_asyncio_impl_JlibAIO
-#define _Included_org_jboss_messaging_core_asyncio_impl_JlibAIO
-#ifdef __cplusplus
-extern "C" {
-#endif
-/* Inaccessible static: log */
-/* Inaccessible static: loaded */
-/*
- * Class: org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method: init
- * Signature: (Ljava/lang/String;Ljava/lang/Class;ILorg/jboss/messaging/core/logging/Logger;)J
- */
-JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_init
- (JNIEnv *, jclass, jstring, jclass, jint, jobject);
-
-/*
- * Class: org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method: write
- * Signature: (JJJLjava/nio/ByteBuffer;Lorg/jboss/messaging/core/asyncio/AIOCallback;)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_write
- (JNIEnv *, jclass, jlong, jlong, jlong, jobject, jobject);
-
-/*
- * Class: org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method: read
- * Signature: (JJJLjava/nio/ByteBuffer;Lorg/jboss/messaging/core/asyncio/AIOCallback;)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_read
- (JNIEnv *, jclass, jlong, jlong, jlong, jobject, jobject);
-
-/*
- * Class: org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method: preAllocate
- * Signature: (JIJ)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_preAllocate
- (JNIEnv *, jclass, jlong, jint, jlong);
-
-/*
- * Class: org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method: closeInternal
- * Signature: (J)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_closeInternal
- (JNIEnv *, jclass, jlong);
-
-/*
- * Class: org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method: internalPollEvents
- * Signature: (J)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_internalPollEvents
- (JNIEnv *, jclass, jlong);
-
-/*
- * Class: org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method: destroyBuffer
- * Signature: (Ljava/nio/ByteBuffer;)V
- */
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_destroyBuffer
- (JNIEnv *, jobject, jobject);
-
-/*
- * Class: org_jboss_messaging_core_asyncio_impl_JlibAIO
- * Method: newBuffer
- * Signature: (J)Ljava/nio/ByteBuffer;
- */
-JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_JlibAIO_newBuffer
- (JNIEnv *, jobject, jlong);
-
-#ifdef __cplusplus
-}
-#endif
-#endif
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -37,10 +37,12 @@
void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage);
- void preAllocate(int blocks, long size);
-
+ void fill(long position, int blocks, long size, byte fillChar);
+
ByteBuffer newBuffer(long size);
void destroyBuffer(ByteBuffer buffer);
+
+ int getBlockSize();
}
Copied: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java (from rev 3898, trunk/src/main/org/jboss/messaging/core/asyncio/impl/JlibAIO.java)
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java (rev 0)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,170 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.asyncio.impl;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.AsynchronousFile;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ *
+ * @author clebert.suconic at jboss.com
+ * Warning: Case you refactor the name or the package of this class
+ * You need to make sure you also rename the C++ native calls
+ */
+public class AsynchronousFileImpl implements AsynchronousFile
+{
+ private static Logger log = Logger.getLogger(AsynchronousFileImpl.class);
+ private boolean opened = false;
+ private String fileName;
+ private Thread poller;
+ private static boolean loaded = true;
+
+ /**
+ * Warning: Beware of the C++ pointer! It will bite you! :-)
+ */
+ private long handler;
+
+ static
+ {
+ try
+ {
+ log.info("JLibAIO being loaded");
+ System.loadLibrary("JBMLibAIO");
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getLocalizedMessage(), e);
+ loaded = false;
+ }
+ }
+
+ public static boolean isLoaded()
+ {
+ return loaded;
+ }
+
+
+
+
+ public void open(String fileName, int maxIO)
+ {
+ opened = true;
+ this.fileName=fileName;
+ handler = init (fileName, AIOCallback.class, maxIO, log);
+ startPoller();
+ }
+
+ class PollerThread extends Thread
+ {
+ PollerThread ()
+ {
+ super("NativePoller for " + fileName);
+ }
+ public void run()
+ {
+ pollEvents();
+ }
+ }
+
+ private synchronized void startPoller()
+ {
+ checkOpened();
+ poller = new PollerThread();
+ poller.start();
+ }
+
+ public synchronized void close()
+ {
+ checkOpened();
+ closeInternal(handler);
+ opened = false;
+ handler = 0;
+ }
+
+
+ public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+ {
+ checkOpened();
+ write (handler, position, size, directByteBuffer, aioPackage);
+
+ }
+
+ public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+ {
+ checkOpened();
+ read (handler, position, size, directByteBuffer, aioPackage);
+
+ }
+
+ public long size()
+ {
+ checkOpened();
+ // TODO: wire this method to ftell
+ return 0;
+ }
+
+ public void fill(long position, int blocks, long size, byte fillChar)
+ {
+ checkOpened();
+ fill(handler, position, blocks, size, fillChar);
+ }
+
+ public int getBlockSize()
+ {
+ return 512;
+ }
+
+
+ private void pollEvents()
+ {
+ if (!opened)
+ {
+ return;
+ }
+ internalPollEvents(handler);
+ }
+
+ private void checkOpened()
+ {
+ if (!opened)
+ {
+ throw new RuntimeException("File is not opened");
+ }
+ }
+
+ /**
+ * I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
+ */
+ @SuppressWarnings("unchecked")
+ private static native long init(String fileName, Class aioPackageClazz, int maxIO, Logger logger);
+
+ private static native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
+
+ private static native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
+
+ private static native void fill(long handle, long position, int blocks, long size, byte fillChar);
+
+ private static native void closeInternal(long handler);
+
+ /** Poll asynchrounous events from internal queues */
+ private static native void internalPollEvents(long handler);
+
+ // Should we make this method static?
+ public native void destroyBuffer(ByteBuffer buffer);
+
+ // Should we make this method static?
+ public native ByteBuffer newBuffer(long size);
+
+
+
+
+
+}
Deleted: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/JlibAIO.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/JlibAIO.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/JlibAIO.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -1,158 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.messaging.core.asyncio.impl;
-
-import java.nio.ByteBuffer;
-
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.AsynchronousFile;
-import org.jboss.messaging.core.logging.Logger;
-
-/**
- *
- * @author clebert.suconic at jboss.com
- * Warning: Case you refactor the name or the package of this class
- * You need to make sure you also rename the C++ native calls
- */
-public class JlibAIO implements AsynchronousFile
-{
- private static Logger log = Logger.getLogger(JlibAIO.class);
- private boolean opened = false;
- private String fileName;
- private Thread poller;
- private static boolean loaded = true;
-
- /**
- * Warning: Beware of the C++ pointer! It will bite you! :-)
- */
- private long handler;
-
- static
- {
- try
- {
- log.info("JLibAIO being loaded");
- System.loadLibrary("JBMLibAIO");
- }
- catch (Throwable e)
- {
- log.error(e.getLocalizedMessage(), e);
- loaded = false;
- }
- }
-
- public static boolean isLoaded()
- {
- return loaded;
- }
-
-
-
-
- public void open(String fileName, int maxIO)
- {
- opened = true;
- this.fileName=fileName;
- handler = init (fileName, AIOCallback.class, maxIO, log);
- startPoller();
- }
-
- class PollerThread extends Thread
- {
- PollerThread ()
- {
- super("NativePoller for " + fileName);
- }
- public void run()
- {
- pollEvents();
- }
- }
-
- private void startPoller()
- {
- checkOpened();
- poller = new PollerThread();
- poller.start();
- }
-
- public void close()
- {
- checkOpened();
- opened = false;
- closeInternal(handler);
- handler = 0;
- }
-
-
- public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
- {
- checkOpened();
- write (handler, position, size, directByteBuffer, aioPackage);
-
- }
-
- public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
- {
- checkOpened();
- read (handler, position, size, directByteBuffer, aioPackage);
-
- }
-
- public long size()
- {
- checkOpened();
- // TODO: wire this method to ftell
- return 0;
- }
-
- public void preAllocate(int blocks, long size)
- {
- checkOpened();
- preAllocate(handler, blocks, size);
- }
-
- private void pollEvents()
- {
- checkOpened();
- internalPollEvents(handler);
- }
-
- private void checkOpened()
- {
- if (!opened)
- {
- throw new RuntimeException("File is not opened");
- }
- }
-
- /**
- * I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
- */
- @SuppressWarnings("unchecked")
- private static native long init(String fileName, Class aioPackageClazz, int maxIO, Logger logger);
-
- private static native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
-
- private static native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
-
- private static native void preAllocate(long handle, int blocks, long size);
-
- private static native void closeInternal(long handler);
-
- /** Poll asynchrounous events from internal queues */
- private static native void internalPollEvents(long handler);
-
- // Should we make this method static?
- public native void destroyBuffer(ByteBuffer buffer);
-
- // Should we make this method static?
- public native ByteBuffer newBuffer(long size);
-
-
-}
Added: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/IOCallback.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/IOCallback.java (rev 0)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/IOCallback.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,15 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+
+public interface IOCallback extends AIOCallback
+{
+
+}
Property changes on: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/IOCallback.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/Journal.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/Journal.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -36,7 +36,10 @@
{
// Non transactional operations
- void appendAddRecord(long id, byte[] record) throws Exception;
+ // TODO: Implement callbacks
+ void appendAddRecord(long id, byte[] record, IOCallback callback) throws Exception;
+
+ void appendAddRecord(long id, byte[] record) throws Exception;
void appendUpdateRecord(long id, byte[] record) throws Exception;
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -37,17 +37,27 @@
*/
void open() throws Exception;
+ int getAlignment() throws Exception;
+
String getFileName();
void fill(int position, int size, byte fillCharacter) throws Exception;
void delete() throws Exception;
- int write(ByteBuffer bytes, boolean sync) throws Exception;
-
- int read(ByteBuffer bytes) throws Exception;
-
+ int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
+
+ int write(ByteBuffer bytes, boolean sync) throws Exception;
+
+ int read(ByteBuffer bytes, IOCallback callback) throws Exception;
+
+ int read(ByteBuffer bytes) throws Exception;
+
void position(int pos) throws Exception;
void close() throws Exception;
+
+ ByteBuffer newBuffer(int size);
+
+ ByteBuffer wrapBuffer(byte bytes[]);
}
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -32,7 +32,7 @@
*/
public interface SequentialFileFactory
{
- SequentialFile createSequentialFile(String fileName, boolean sync) throws Exception;
+ SequentialFile createSequentialFile(String fileName, boolean sync, boolean control) throws Exception;
List<String> listFiles(String extension) throws Exception;
}
Added: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java (rev 0)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,230 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal.impl;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.messaging.core.asyncio.AsynchronousFile;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.SequentialFile;
+
+public class AIOSequentialFile implements SequentialFile
+{
+
+ String journalDir;
+ String fileName;
+
+ AsynchronousFile aioFile;
+
+ AtomicLong position = new AtomicLong(0);
+
+ public AIOSequentialFile(String journalDir, String fileName) throws Exception
+ {
+ this.journalDir = journalDir;
+ this.fileName = fileName;
+ }
+
+ public int getAlignment() throws Exception
+ {
+ checkOpened();
+ return aioFile.getBlockSize();
+ }
+
+
+ public void close() throws Exception
+ {
+ //Thread.sleep(1000);
+ checkOpened();
+ aioFile.close();
+ aioFile = null;
+
+ }
+
+ public void delete() throws Exception
+ {
+ if (aioFile != null)
+ {
+ aioFile.close();
+ aioFile = null;
+ }
+
+ File file = new File(journalDir + "/" + fileName);
+ file.delete();
+ }
+
+ public void fill(int position, int size, byte fillCharacter)
+ throws Exception
+ {
+ checkOpened();
+
+ int blockSize = aioFile.getBlockSize();
+
+ if (size % (10*1024*1024) == 0)
+ {
+ blockSize = 10*1024*1024;
+ }
+ else
+ if (size % (1024*1024) == 0)
+ {
+ blockSize = 1024*1024;
+ }
+ else
+ if (size % (10*1024) == 0)
+ {
+ blockSize = 10*1024;
+ }
+ else
+ {
+ blockSize = aioFile.getBlockSize();
+ }
+
+ int blocks = size / blockSize;
+ if (size % blockSize != 0)
+ {
+ blocks++;
+ }
+
+ if (position % aioFile.getBlockSize() != 0)
+ {
+ position = ((position / aioFile.getBlockSize()) + 1) * aioFile.getBlockSize();
+ }
+ //System.out.println("filling " + blocks + " blocks with blockSize=" + blockSize + " on file=" + this.getFileName());
+ aioFile.fill((long)position, blocks, blockSize, (byte)fillCharacter);
+
+ }
+
+ public String getFileName()
+ {
+ return fileName;
+ }
+
+ public void open() throws Exception
+ {
+ aioFile = new AsynchronousFileImpl();
+ aioFile.open(journalDir + "/" + fileName, 500);
+ position.set(0);
+
+ }
+
+ public void position(int pos) throws Exception
+ {
+ position.set(pos);
+
+ }
+
+ public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+ {
+ int bytesToRead = bytes.limit();
+ long positionToRead = position.getAndAdd(bytesToRead);
+
+ bytes.rewind();
+ aioFile.read(positionToRead, bytesToRead, bytes, callback);
+
+ return bytesToRead;
+ }
+
+ public int read(ByteBuffer bytes) throws Exception
+ {
+ WaitCompletion waitCompletion = new WaitCompletion();
+ int bytesRead = read (bytes, waitCompletion);
+
+ waitCompletion.waitLatch();
+
+ if (waitCompletion.errorMessage != null)
+ {
+ throw new MessagingException(waitCompletion.errorCode, waitCompletion.errorMessage);
+ }
+
+ return bytesRead;
+ }
+
+ public int write(ByteBuffer bytes, boolean sync, IOCallback callback)
+ throws Exception
+ {
+ int bytesToWrite = bytes.limit();
+ long positionToWrite = position.getAndAdd(bytesToWrite);
+
+ aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
+ return bytesToWrite;
+ }
+
+ public int write(ByteBuffer bytes, boolean sync) throws Exception
+ {
+ WaitCompletion waitCompletion = new WaitCompletion();
+ int bytesWritten = write (bytes, sync, waitCompletion);
+
+ waitCompletion.waitLatch();
+
+ if (waitCompletion.errorMessage != null)
+ {
+ throw new MessagingException(waitCompletion.errorCode, waitCompletion.errorMessage);
+ }
+
+ return bytesWritten;
+ }
+
+ private void checkOpened() throws Exception
+ {
+ if (aioFile == null)
+ {
+ throw new IllegalStateException ("File not opened");
+ }
+ }
+
+ class WaitCompletion implements IOCallback
+ {
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ String errorMessage;
+ int errorCode = 0;
+
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ System.out.println("OK Error!");
+ this.errorCode = errorCode;
+ this.errorMessage = errorMessage;
+
+ latch.countDown();
+
+ }
+
+ public void waitLatch() throws Exception
+ {
+ latch.await();
+ }
+
+ }
+
+ public ByteBuffer newBuffer(int size)
+ {
+ if (size % aioFile.getBlockSize() != 0)
+ {
+ size = ((size / aioFile.getBlockSize()) + 1) * aioFile.getBlockSize();
+ }
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ public ByteBuffer wrapBuffer(byte[] bytes)
+ {
+ ByteBuffer newbuffer = newBuffer(bytes.length);
+ newbuffer.put(bytes);
+ return newbuffer;
+ };
+
+}
Property changes on: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Added: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java (rev 0)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+
+public class AIOSequentialFileFactory extends AbstractSequentialFactory
+{
+
+ public AIOSequentialFileFactory(String journalDir)
+ {
+ super(journalDir);
+ }
+
+ public SequentialFile createSequentialFile(String fileName, boolean sync, boolean control)
+ throws Exception
+ {
+ if (control)
+ {
+ return new NIOSequentialFile(journalDir, fileName, sync);
+ }
+ else
+ {
+ return new AIOSequentialFile(journalDir, fileName);
+ }
+ }
+
+}
Property changes on: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Added: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java (rev 0)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+
+public abstract class AbstractSequentialFactory implements SequentialFileFactory
+{
+ protected final String journalDir;
+
+ public AbstractSequentialFactory(final String journalDir)
+ {
+ this.journalDir = journalDir;
+ }
+
+ public List<String> listFiles(final String extension) throws Exception
+ {
+ File dir = new File(journalDir);
+
+ FilenameFilter fnf = new FilenameFilter()
+ {
+ public boolean accept(File file, String name)
+ {
+ return name.endsWith("." + extension);
+ }
+ };
+
+ String[] fileNames = dir.list(fnf);
+
+ if (fileNames == null)
+ {
+ throw new IOException("Failed to list: " + journalDir);
+ }
+
+ return Arrays.asList(fileNames);
+ }
+
+}
Property changes on: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -39,6 +39,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
import org.jboss.messaging.core.journal.RecordInfo;
import org.jboss.messaging.core.journal.SequentialFile;
@@ -86,8 +87,13 @@
public static final byte ADD_RECORD_TX = 14;
+ public static final int SIZE_ADD_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
+
+
public static final byte UPDATE_RECORD_TX = 15;
+ public static final int SIZE_UPDATE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
+
public static final byte DELETE_RECORD_TX = 16;
public static final byte PREPARE_RECORD = 17;
@@ -192,11 +198,36 @@
// Journal implementation ----------------------------------------------------------------
- public ByteBuffer allocateBuffer(final int size) throws Exception
+ /*public ByteBuffer allocateBuffer(final int size) throws Exception
{
return ByteBuffer.allocateDirect(size);
- }
+ }*/
+ public void appendAddRecord(long id, byte[] record, IOCallback callback)
+ throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+
+ ByteBuffer bb = currentFile.getFile().newBuffer(size);
+
+ bb.put(ADD_RECORD);
+ bb.putLong(id);
+ bb.putInt(record.length);
+ bb.put(record);
+ bb.put(DONE);
+ bb.rewind();
+
+ appendRecord(bb, true, callback);
+
+ posFilesMap.put(id, new PosFiles(currentFile));
+ }
+
+
public void appendAddRecord(final long id, final byte[] record) throws Exception
{
if (state != STATE_LOADED)
@@ -206,14 +237,14 @@
int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
- ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
+ ByteBuffer bb = currentFile.getFile().newBuffer(size);
+
bb.put(ADD_RECORD);
bb.putLong(id);
bb.putInt(record.length);
bb.put(record);
- bb.put(DONE);
- bb.flip();
+ bb.put(DONE);
+ bb.rewind();
appendRecord(bb, true);
@@ -236,14 +267,14 @@
int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
- ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
+ ByteBuffer bb = currentFile.getFile().newBuffer(size);
+
bb.put(UPDATE_RECORD);
bb.putLong(id);
bb.putInt(record.length);
bb.put(record);
bb.put(DONE);
- bb.flip();
+ bb.rewind();
appendRecord(bb, true);
@@ -268,12 +299,12 @@
int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
- ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
+ ByteBuffer bb = currentFile.getFile().newBuffer(size);
+
bb.put(DELETE_RECORD);
bb.putLong(id);
bb.put(DONE);
- bb.flip();
+ bb.rewind();
appendRecord(bb, true);
}
@@ -291,17 +322,17 @@
throw new IllegalStateException("Journal must be loaded first");
}
- int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+ int size = SIZE_ADD_RECORD_TX + record.length;
+
+ ByteBuffer bb = currentFile.getFile().newBuffer(size);
- ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
bb.put(ADD_RECORD_TX);
bb.putLong(txID);
bb.putLong(id);
bb.putInt(record.length);
bb.put(record);
bb.put(DONE);
- bb.flip();
+ bb.rewind();
appendRecord(bb, false);
@@ -318,17 +349,17 @@
throw new IllegalStateException("Journal must be loaded first");
}
- int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+ int size = SIZE_UPDATE_RECORD_TX + record.length;
- ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
+ ByteBuffer bb = currentFile.getFile().newBuffer(size);
+
bb.put(UPDATE_RECORD_TX);
bb.putLong(txID);
bb.putLong(id);
bb.putInt(record.length);
- bb.put(record);
+ bb.put(record);
bb.put(DONE);
- bb.flip();
+ bb.rewind();
appendRecord(bb, false);
@@ -346,13 +377,13 @@
int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
- ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
+ ByteBuffer bb = currentFile.getFile().newBuffer(size);
+
bb.put(DELETE_RECORD_TX);
bb.putLong(txID);
bb.putLong(id);
bb.put(DONE);
- bb.flip();
+ bb.rewind();
appendRecord(bb, false);
@@ -377,12 +408,12 @@
int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
- ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
+ ByteBuffer bb = currentFile.getFile().newBuffer(size);
+
bb.put(PREPARE_RECORD);
- bb.putLong(txID);
+ bb.putLong(txID);
bb.put(DONE);
- bb.flip();
+ bb.rewind();
appendRecord(bb, true);
@@ -405,12 +436,12 @@
int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
- ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
+ ByteBuffer bb = currentFile.getFile().newBuffer(size);
+
bb.put(COMMIT_RECORD);
bb.putLong(txID);
bb.put(DONE);
- bb.flip();
+ bb.rewind();
appendRecord(bb, true);
@@ -433,12 +464,12 @@
int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
- ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
+ ByteBuffer bb = currentFile.getFile().newBuffer(size);
+
bb.put(ROLLBACK_RECORD);
- bb.putLong(txID);
+ bb.putLong(txID);
bb.put(DONE);
- bb.flip();
+ bb.rewind();
appendRecord(bb, true);
@@ -465,17 +496,20 @@
for (String fileName: fileNames)
{
- SequentialFile file = fileFactory.createSequentialFile(fileName, sync);
+ SequentialFile file = fileFactory.createSequentialFile(fileName, sync, false);
file.open();
- ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
+ ByteBuffer bb = file.newBuffer(SIZE_LONG);
file.read(bb);
- bb.flip();
+ //bb.flip();
+ //bb.rewind();
long orderingID = bb.getLong();
+
+ log.info("file=" + file + " with orderingID=" + orderingID);
orderedFiles.add(new JournalFileImpl(file, orderingID));
@@ -503,10 +537,12 @@
for (JournalFile file: orderedFiles)
{
- ByteBuffer bb = ByteBuffer.wrap(new byte[fileSize]);
-
file.getFile().open();
+ log.info("Loading file " + file.getFile().getFileName());
+
+ ByteBuffer bb = file.getFile().newBuffer(fileSize);
+
int bytesRead = file.getFile().read(bb);
if (bytesRead != fileSize)
@@ -517,10 +553,11 @@
" expected " + fileSize + " : " + file.getFile().getFileName());
}
- bb.flip();
+// bb.flip();
+// bb.rewind();
- //First long is the ordering timestamp
- bb.getLong();
+ //First long is the ordering timestamp, we just jump its position
+ bb.position(calculateBlockStart(SIZE_LONG, file.getFile().getAlignment()));
boolean hasData = false;
@@ -538,7 +575,7 @@
int size = bb.getInt();
byte[] record = new byte[size];
- bb.get(record);
+ bb.get(record);
byte end = bb.get();
if (end != DONE)
@@ -563,7 +600,7 @@
byte[] record = new byte[size];
bb.get(record);
byte end = bb.get();
-
+
if (end != DONE)
{
repairFrom(pos, file);
@@ -590,7 +627,7 @@
case DELETE_RECORD:
{
long id = bb.getLong();
- byte end = bb.get();
+ byte end = bb.get();
if (end != DONE)
{
@@ -616,10 +653,14 @@
long txID = bb.getLong();
maxTransactionID = Math.max(maxTransactionID, txID);
long id = bb.getLong();
+
+ log.info("read AddRecordTX txID = " + txID + " , id=" + id);
+
+
int size = bb.getInt();
byte[] record = new byte[size];
bb.get(record);
- byte end = bb.get();
+ byte end = bb.get();
if (end != DONE)
{
@@ -657,11 +698,14 @@
{
long txID = bb.getLong();
maxTransactionID = Math.max(maxTransactionID, txID);
- long id = bb.getLong();
+ long id = bb.getLong();
+
+ log.info("read UpdateRecordTX txID = " + txID + " , id=" + id);
+
int size = bb.getInt();
byte[] record = new byte[size];
bb.get(record);
- byte end = bb.get();
+ byte end = bb.get();
if (end != DONE)
{
@@ -700,7 +744,10 @@
long txID = bb.getLong();
maxTransactionID = Math.max(maxTransactionID, txID);
long id = bb.getLong();
- byte end = bb.get();
+
+ log.info("read DeleteRecordTX txID = " + txID + " , id=" + id);
+
+ byte end = bb.get();
if (end != DONE)
{
@@ -737,8 +784,11 @@
case PREPARE_RECORD:
{
long txID = bb.getLong();
+
+ log.info("read Prepare txID=" + txID);
+
maxTransactionID = Math.max(maxTransactionID, txID);
- byte end = bb.get();
+ byte end = bb.get();
if (end != DONE)
{
@@ -772,8 +822,11 @@
case COMMIT_RECORD:
{
long txID = bb.getLong();
+
+ log.info("read Commit txID=" + txID);
+
maxTransactionID = Math.max(maxTransactionID, txID);
- byte end = bb.get();
+ byte end = bb.get();
if (end != DONE)
{
@@ -806,8 +859,11 @@
case ROLLBACK_RECORD:
{
long txID = bb.getLong();
+
+ log.info("read RollbacktxID=" + txID);
+
maxTransactionID = Math.max(maxTransactionID, txID);
- byte end = bb.get();
+ byte end = bb.get();
if (end != DONE)
{
@@ -857,6 +913,8 @@
}
}
+ bb.position(calculateBlockStart(bb.position(), file.getFile().getAlignment()));
+
if (recordType != FILL_CHARACTER)
{
lastDataPos = bb.position();
@@ -875,7 +933,7 @@
freeFiles.add(file);
//Position it ready for writing
- file.getFile().position(SIZE_LONG);
+ file.getFile().position(calculateBlockStart(SIZE_LONG, file.getFile().getAlignment()));
}
}
@@ -971,6 +1029,8 @@
{
//File can be reclaimed or deleted
+ log.info("Reclaiming file " + file);
+
dataFiles.remove(file);
//FIXME - size() involves a scan!!!
@@ -980,13 +1040,13 @@
long newOrderingID = generateOrderingID();
- ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
-
- bb.putLong(newOrderingID);
-
SequentialFile sf = file.getFile();
-
+
sf.open();
+
+ ByteBuffer bb = sf.newBuffer(SIZE_LONG);
+
+ bb.putLong(newOrderingID);
//Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
@@ -994,13 +1054,13 @@
//operation and can impact other IO operations on the disk
sf.fill(0, fileSize, FILL_CHARACTER);
- sf.write(bb, true);
+ int bytesWritten = sf.write(bb, true);
JournalFile jf = new JournalFileImpl(sf, newOrderingID);
- sf.position(SIZE_LONG);
+ sf.position(bytesWritten);
- jf.setOffset(SIZE_LONG);
+ jf.setOffset(bytesWritten);
freeFiles.add(jf);
}
@@ -1098,24 +1158,51 @@
// Private -----------------------------------------------------------------------------
- private void appendRecord(ByteBuffer bb, boolean sync) throws Exception
+ private int calculateBlockStart(int position, int alignment)
{
- lock.acquire();
-
- int size = bb.capacity();
-
- try
- {
- checkFile(size);
- currentFile.getFile().write(bb, sync);
- currentFile.extendOffset(size);
- }
- finally
- {
- lock.release();
- }
+ int pos = ((position / alignment) + (position % alignment != 0 ? 1 : 0)) * alignment;
+
+ //System.out.println("Calculated " + pos + " and received " + position);
+
+ return pos;
}
+ private void appendRecord(ByteBuffer bb, boolean sync) throws Exception
+ {
+ lock.acquire();
+
+ int size = bb.capacity();
+
+ try
+ {
+ checkFile(size);
+ currentFile.getFile().write(bb, sync);
+ currentFile.extendOffset(size);
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
+ private void appendRecord(ByteBuffer bb, boolean sync, IOCallback callback) throws Exception
+ {
+ lock.acquire();
+
+ int size = bb.capacity();
+
+ try
+ {
+ checkFile(size);
+ currentFile.getFile().write(bb, sync, callback);
+ currentFile.extendOffset(size);
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
private void repairFrom(int pos, JournalFile file) throws Exception
{
log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
@@ -1132,27 +1219,30 @@
long orderingID = generateOrderingID();
String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
+
+ log.info("Creating file " + fileName);
- SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync);
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync, false);
sequentialFile.open();
sequentialFile.fill(0, fileSize, FILL_CHARACTER);
- ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
+ ByteBuffer bb = sequentialFile.newBuffer(SIZE_LONG);
bb.putLong(orderingID);
- bb.flip();
+ bb.rewind();
- sequentialFile.write(bb, true);
+ int bytesWritten = sequentialFile.write(bb, true);
- sequentialFile.position(SIZE_LONG);
+// sequentialFile.position(SIZE_LONG);
JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
- info.extendOffset(SIZE_LONG);
+ info.extendOffset(bytesWritten);
+
return info;
}
@@ -1179,14 +1269,20 @@
private void checkFile(final int size) throws Exception
{
+
+ if (size % currentFile.getFile().getAlignment() != 0)
+ {
+ throw new IllegalStateException("You can't write blocks in a size different than " + currentFile.getFile().getAlignment());
+ }
//We take into account the first timestamp long
- if (size > fileSize - SIZE_LONG)
+ if (size > fileSize - calculateBlockStart(8, currentFile.getFile().getAlignment()))
{
throw new IllegalArgumentException("Record is too large to store " + size);
}
if (currentFile == null || fileSize - currentFile.getOffset() < size)
- {
+ {
+ //Thread.sleep(1000);
currentFile.getFile().close();
dataFiles.add(currentFile);
@@ -1407,4 +1503,5 @@
}
}
}
+
}
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -26,6 +26,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.logging.Logger;
@@ -61,6 +62,11 @@
this.sync = sync;
}
+ public int getAlignment()
+ {
+ return 1;
+ }
+
public String getFileName()
{
return fileName;
@@ -115,14 +121,41 @@
close();
}
- public int read(ByteBuffer bytes) throws Exception
- {
- int bytesRead = channel.read(bytes);
-
- return bytesRead;
- }
+ public int read(ByteBuffer bytes) throws Exception
+ {
+ return read(bytes, null);
+ }
- public int write(ByteBuffer bytes, boolean sync) throws Exception
+ public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+ {
+ try
+ {
+ int bytesRead = channel.read(bytes);
+ if (callback != null)
+ {
+ callback.done();
+ }
+ bytes.flip();
+ return bytesRead;
+ }
+ catch (Exception e)
+ {
+ if (callback != null)
+ {
+ callback.onError(-1, e.getLocalizedMessage());
+ }
+
+ throw e;
+ }
+
+ }
+
+ public int write(ByteBuffer bytes, boolean sync) throws Exception
+ {
+ return write(bytes, sync, null);
+ }
+
+ public int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception
{
int bytesRead = channel.write(bytes);
@@ -138,4 +171,14 @@
{
channel.position(pos);
}
+
+ public ByteBuffer newBuffer(int size)
+ {
+ return ByteBuffer.allocate(size);
+ }
+
+ public ByteBuffer wrapBuffer(byte[] bytes)
+ {
+ return ByteBuffer.wrap(bytes);
+ }
}
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -37,39 +37,16 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
-public class NIOSequentialFileFactory implements SequentialFileFactory
+public class NIOSequentialFileFactory extends AbstractSequentialFactory
{
- private final String journalDir;
-
public NIOSequentialFileFactory(final String journalDir)
{
- this.journalDir = journalDir;
+ super(journalDir);
}
- public SequentialFile createSequentialFile(final String fileName, final boolean sync)
+ public SequentialFile createSequentialFile(final String fileName, final boolean sync, boolean control)
{
return new NIOSequentialFile(journalDir, fileName, sync);
}
- public List<String> listFiles(final String extension) throws Exception
- {
- File dir = new File(journalDir);
-
- FilenameFilter fnf = new FilenameFilter()
- {
- public boolean accept(File file, String name)
- {
- return name.endsWith("." + extension);
- }
- };
-
- String[] fileNames = dir.list(fnf);
-
- if (fileNames == null)
- {
- throw new IOException("Failed to list: " + journalDir);
- }
-
- return Arrays.asList(fileNames);
- }
}
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -21,7 +21,7 @@
import org.jboss.messaging.core.journal.RecordInfo;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.message.MessageReference;
@@ -98,7 +98,7 @@
checkAndCreateDir(bindingsDir, config.isCreateBindingsDir());
- SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
+ SequentialFileFactory bindingsFF = new AIOSequentialFileFactory(bindingsDir);
bindingsJournal = new JournalImpl(1024 * 1024, 2, true, bindingsFF, 10000, "jbm-bindings", "bindings");
@@ -111,7 +111,7 @@
checkAndCreateDir(journalDir, config.isCreateBindingsDir());
- SequentialFileFactory journalFF = new NIOSequentialFileFactory(journalDir);
+ SequentialFileFactory journalFF = new AIOSequentialFileFactory(journalDir);
messageJournal = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(), config.isJournalSync(), journalFF,
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/MultiThreadWriteNativeTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/MultiThreadWriteNativeTest.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/MultiThreadWriteNativeTest.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -20,7 +20,7 @@
import junit.framework.TestCase;
import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.impl.JlibAIO;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.logging.Logger;
// you need to define java.library.path=${project-root}/native/src/.libs
@@ -47,12 +47,12 @@
static class ExecClass implements Runnable
{
- JlibAIO aio;
+ AsynchronousFileImpl aio;
ByteBuffer buffer;
AIOCallback callback;
- public ExecClass(JlibAIO aio, ByteBuffer buffer, AIOCallback callback)
+ public ExecClass(AsynchronousFileImpl aio, ByteBuffer buffer, AIOCallback callback)
{
this.aio = aio;
this.buffer = buffer;
@@ -81,7 +81,7 @@
- private static void addData(JlibAIO aio, ByteBuffer buffer, AIOCallback callback) throws Exception
+ private static void addData(AsynchronousFileImpl aio, ByteBuffer buffer, AIOCallback callback) throws Exception
{
//aio.write(getNewPosition()*SIZE, SIZE, buffer, callback);
executor.execute(new ExecClass(aio, buffer, callback));
@@ -125,10 +125,11 @@
private void executeTest(boolean sync) throws Throwable
{
log.info(sync?"Sync test:":"Async test");
- JlibAIO jlibAIO = new JlibAIO();
+ AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl();
jlibAIO.open(FILE_NAME, 21000);
log.debug("Preallocating file");
- jlibAIO.preAllocate(NUMBER_OF_THREADS, SIZE * NUMBER_OF_LINES);
+
+ jlibAIO.fill(0l, NUMBER_OF_THREADS, SIZE * NUMBER_OF_LINES, (byte)0);
log.debug("Done Preallocating file");
CountDownLatch latchStart = new CountDownLatch (NUMBER_OF_THREADS + 1);
@@ -179,9 +180,9 @@
Throwable failed = null;
CountDownLatch latchStart;
boolean sync;
- JlibAIO libaio;
+ AsynchronousFileImpl libaio;
- public ThreadProducer(String name, CountDownLatch latchStart, JlibAIO libaio, boolean sync)
+ public ThreadProducer(String name, CountDownLatch latchStart, AsynchronousFileImpl libaio, boolean sync)
{
super(name);
this.latchStart = latchStart;
@@ -281,9 +282,9 @@
boolean errorCalled = false;
CountDownLatch latchDone;
ByteBuffer releaseMe;
- JlibAIO libaio;
+ AsynchronousFileImpl libaio;
- public LocalCallback(CountDownLatch latchDone, ByteBuffer releaseMe, JlibAIO libaio)
+ public LocalCallback(CountDownLatch latchDone, ByteBuffer releaseMe, AsynchronousFileImpl libaio)
{
this.latchDone = latchDone;
this.releaseMe = releaseMe;
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -13,16 +13,21 @@
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.impl.JlibAIO;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.logging.Logger;
import junit.framework.TestCase;
//you need to define java.library.path=${project-root}/native/src/.libs
public class SingleThreadWriteNativeTest extends TestCase
{
+ private static final Logger log = Logger.getLogger(SingleThreadWriteNativeTest.class);
+
private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
@@ -52,7 +57,97 @@
buffer.put((byte)'\n');
}
+
+ public void testTwoFiles() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller2 = new AsynchronousFileImpl();
+ controller.open(FILE_NAME + ".1", 10000);
+ controller2.open(FILE_NAME + ".2", 10000);
+
+ int numberOfLines = 100000;
+ int size = 1024;
+
+ try
+ {
+ System.out.println("++testDirectDataNoPage"); System.out.flush();
+ CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+ CountDownLatch latchDone2 = new CountDownLatch(numberOfLines);
+
+ ByteBuffer block = controller.newBuffer(size);
+ encodeBufer(block);
+
+ preAlloc(controller, numberOfLines * size);
+ preAlloc(controller2, numberOfLines * size);
+
+ ArrayList<LocalAIO> list = new ArrayList<LocalAIO>();
+ ArrayList<LocalAIO> list2 = new ArrayList<LocalAIO>();
+ for (int i=0; i<numberOfLines; i++)
+ {
+ list.add(new LocalAIO(latchDone));
+ list2.add(new LocalAIO(latchDone2));
+ }
+
+
+ long valueInitial = System.currentTimeMillis();
+
+ System.out.println("Adding data");
+
+ long lastTime = System.currentTimeMillis();
+ int counter = 0;
+ Iterator<LocalAIO> iter2 = list2.iterator();
+
+ for (LocalAIO tmp: list)
+ {
+ LocalAIO tmp2 = iter2.next();
+
+ controller.write(counter * size, size, block, tmp);
+ controller.write(counter * size, size, block, tmp2);
+ if (++counter % 5000 == 0)
+ {
+ System.out.println(5000*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Async)");
+ lastTime = System.currentTimeMillis();
+ }
+
+ }
+
+ System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
+
+
+ System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
+ System.out.println("Flush now");
+ System.out.println("Received " + LocalAIO.staticDone);
+ long timeTotal = System.currentTimeMillis() - valueInitial;
+
+ System.out.println("Asynchronous time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
+
+ latchDone.await();
+ latchDone2.await();
+
+ timeTotal = System.currentTimeMillis() - valueInitial;
+ System.out.println("After completions time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
+
+ for (LocalAIO tmp: list)
+ {
+ assertEquals(1, tmp.timesDoneCalled);
+ assertTrue(tmp.doneCalled);
+ assertFalse(tmp.errorCalled);
+ }
+
+ controller.destroyBuffer(block);
+
+ controller.close();
+ }
+ finally
+ {
+ try {controller.close();} catch (Exception ignored){}
+ }
+
+
+ }
+
+
public void testAddBeyongSimultaneousLimit() throws Exception
{
asyncData(150000,1024,100);
@@ -63,13 +158,122 @@
asyncData(150000,1024,20000);
}
+ public void testValidateData() throws Exception
+ {
+ validateData(150000,1024,20000);
+ }
+
+ public void testInvalidReads() throws Exception
+ {
+ class LocalCallback implements AIOCallback
+ {
+
+ CountDownLatch latch = new CountDownLatch(1);
+ boolean error;
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ this.error = true;
+ latch.countDown();
+ }
+ }
+
+ AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+
+ final int NUMBER_LINES = 1;
+ final int SIZE = 512;
+
+ controller.open(FILE_NAME, 10);
+ controller.close();
+
+ controller = new AsynchronousFileImpl();
+
+ controller.open(FILE_NAME, 10);
+
+ controller.fill(0,1, 512, (byte)'j');
+
+
+ ByteBuffer buffer = controller.newBuffer(SIZE);
+
+
+ buffer.clear();
+
+ for (int i=0; i<SIZE; i++)
+ {
+ buffer.put((byte)(i%100));
+ }
+
+ LocalCallback callbackLocal = new LocalCallback();
+
+ controller.write(0, 512, buffer, callbackLocal);
+
+ callbackLocal.latch.await();
+
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(50);
+
+ callbackLocal = new LocalCallback();
+
+ controller.read(0, 50, newBuffer, callbackLocal);
+
+ callbackLocal.latch.await();
+
+ //assertTrue(callbackLocal.error);
+
+ callbackLocal = new LocalCallback();
+
+ byte bytes[] = new byte[512];
+
+ try
+ {
+ newBuffer = ByteBuffer.wrap(bytes);
+
+ controller.read(0, 512, newBuffer, callbackLocal);
+
+ fail("An exception was supposed to be thrown");
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ //newBuffer = ByteBuffer.allocateDirect(512);
+ newBuffer = controller.newBuffer(512);
+ callbackLocal = new LocalCallback();
+ controller.read(0, 512, newBuffer,callbackLocal);
+ callbackLocal.latch.await();
+ assertFalse(callbackLocal.error);
+
+ newBuffer.rewind();
+
+ byte[] bytesRead = new byte[SIZE];
+
+ newBuffer.get(bytesRead);
+
+ for (int i=0; i<SIZE;i++)
+ {
+ assertEquals((byte)(i%100), bytesRead[i]);
+ }
+
+
+ controller.destroyBuffer(buffer);
+ }
+ finally
+ {
+ try { controller.close(); } catch (Throwable ignored){}
+
+ }
+
+ }
+
+
public void testRead() throws Exception
{
-
-
-
-
- final JlibAIO controller = new JlibAIO();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
try
{
@@ -150,9 +354,94 @@
}
+ /**
+ * This method is not used unless you uncomment testValidateData
+ * The purpose of this method is to verify if the information generated by one of the write methods is correct
+ * @param numberOfLines
+ * @param size
+ * @param aioLimit
+ * @throws Exception
+ */
+ private void validateData(int numberOfLines, int size, int aioLimit) throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ controller.open(FILE_NAME, aioLimit);
+
+ ByteBuffer compareBlock = ByteBuffer.allocateDirect(size);
+ encodeBufer(compareBlock);
+
+ ByteBuffer readBuffer = controller.newBuffer(size);
+
+
+ boolean firstInvalid = false;
+ for (int i=0;i<numberOfLines;i++)
+ {
+ if (i % 1000 == 0)
+ {
+ log.info("line = " + i);
+ }
+ CountDownLatch latch = new CountDownLatch(1);
+ LocalAIO callback = new LocalAIO(latch);
+ controller.read(i * size, size, readBuffer, callback);
+
+ latch.await();
+
+ if (!compareBuffers(compareBlock, readBuffer))
+ {
+ //log.info("Invalid line at " + i);
+ firstInvalid=true;
+ }
+ else
+ {
+ if (firstInvalid)
+ {
+ for (int line=0;line<10;line++) log.info("*********************************************");
+ log.warn("Valid line after an invalid line!!!");
+ }
+ }
+
+ readBuffer.position(100);
+ ByteBuffer buf1 = readBuffer.slice();
+
+ //System.out.println("buf1=" + buf1);
+
+
+
+ }
+ }
+
+
+ private boolean compareBuffers(ByteBuffer buffer1, ByteBuffer buffer2)
+ {
+
+ buffer1.rewind();
+ buffer2.rewind();
+
+ if (buffer1.limit() != buffer2.limit())
+ {
+ return false;
+ }
+
+ byte bytes1[] = new byte[buffer1.limit()];
+ byte bytes2[] = new byte[buffer2.limit()];
+
+ buffer1.get(bytes1);
+ buffer2.get(bytes2);
+
+ for (int i=0; i< bytes1.length; i++)
+ {
+ if (bytes1[i] != bytes2[i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
private void asyncData(int numberOfLines, int size, int aioLimit) throws Exception
{
- final JlibAIO controller = new JlibAIO();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
controller.open(FILE_NAME, aioLimit);
try
@@ -233,7 +522,7 @@
final int SIZE = 1024;
//final int SIZE = 512;
- final JlibAIO controller = new JlibAIO();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
controller.open(FILE_NAME, 2000);
ByteBuffer block = controller.newBuffer(SIZE);
@@ -285,11 +574,11 @@
}
- private void preAlloc(JlibAIO controller, long size)
+ private void preAlloc(AsynchronousFileImpl controller, long size)
{
System.out.println("Pre allocating"); System.out.flush();
long startPreAllocate = System.currentTimeMillis();
- controller.preAllocate(1, size);
+ controller.fill(0l, 1, size, (byte)0);
long endPreAllocate = System.currentTimeMillis() - startPreAllocate;
if (endPreAllocate != 0) System.out.println("PreAllocated the file in " + endPreAllocate + " seconds, What means " + (size/endPreAllocate) + " bytes per millisecond");
}
@@ -297,7 +586,7 @@
public void testInvalidWrite() throws Exception
{
- final JlibAIO controller = new JlibAIO();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
controller.open(FILE_NAME, 2000);
try
@@ -337,7 +626,7 @@
public void testInvalidAlloc() throws Exception
{
- JlibAIO controller = new JlibAIO();
+ AsynchronousFileImpl controller = new AsynchronousFileImpl();
try
{
// You don't need to open the file to alloc it
Added: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java (rev 0)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,196 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.timing;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealJournalImplAIOTest extends JournalImplTestUnit
+{
+ private static final Logger log = Logger.getLogger(RealJournalImplAIOTest.class);
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(journalDir);
+
+ log.info("deleting directory " + journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new NIOSequentialFileFactory(journalDir);
+ }
+
+ public void testSpeedNonTransactional() throws Exception
+ {
+
+ Journal journal =
+ new JournalImpl(10 * 1024 * 1024, 10, true, new AIOSequentialFileFactory(journalDir),
+ 5000, "jbm-data", "jbm");
+
+ journal.start();
+
+ journal.load(new ArrayList<RecordInfo>(), null);
+
+ final int numMessages = 50000;
+
+ final CountDownLatch latch = new CountDownLatch(numMessages);
+
+
+ class LocalCallback implements IOCallback
+ {
+
+ int i=0;
+ String message = null;
+ boolean done = false;
+ CountDownLatch latch;
+
+ public LocalCallback(int i, CountDownLatch latch)
+ {
+ this.i = i;
+ this.latch = latch;
+ }
+ public void done()
+ {
+ synchronized (this)
+ {
+ if (done)
+ {
+ message = "done received in duplicate";
+ }
+ done = true;
+ this.latch.countDown();
+ }
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ synchronized (this)
+ {
+ System.out.println("********************** Error = " + (i++));
+ message = errorMessage;
+ latch.countDown();
+ }
+ }
+
+ }
+
+
+ byte[] data = new byte[700];
+
+ long start = System.currentTimeMillis();
+
+ ArrayList<LocalCallback> callbacks = new ArrayList<LocalCallback>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ LocalCallback callback = new LocalCallback(i, latch);
+ callbacks.add(callback);
+ journal.appendAddRecord(i, data, callback);
+ }
+
+ latch.await(2, TimeUnit.SECONDS);
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * (double)numMessages / (end - start);
+
+ boolean failed = false;
+
+ for (LocalCallback callback: callbacks)
+ {
+ if (callback.message != null)
+ {
+ fail(callback.message);
+ }
+
+ if (!callback.done)
+ {
+ System.out.println("callback i=" + callback.i + " was not received!");
+ failed = true;
+ }
+ }
+
+
+ // If this fails it is probably because JournalImpl it is closing the files without waiting all the completes to arrive first
+ assertFalse(failed);
+
+
+ log.info("Rate " + rate + " records/sec");
+
+ }
+
+ public void testSpeedTransactional() throws Exception
+ {
+ Journal journal =
+ new JournalImpl(10 * 1024 * 1024, 10, true, new AIOSequentialFileFactory(journalDir),
+ 5000, "jbm-data", "jbm");
+
+ journal.start();
+
+ journal.load(new ArrayList<RecordInfo>(), null);
+
+ final int numMessages = 10000;
+
+ byte[] data = new byte[1024];
+
+ long start = System.currentTimeMillis();
+
+ int count = 0;
+ for (int i = 0; i < numMessages; i++)
+ {
+ journal.appendAddRecordTransactional(i, count++, data);
+
+ journal.appendCommitRecord(i);
+ }
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * (double)numMessages / (end - start);
+
+ log.info("Rate " + rate + " records/sec");
+
+ }
+}
+
Property changes on: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplTest.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplTest.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -60,16 +60,16 @@
public void testSpeedNonTransactional() throws Exception
{
Journal journal =
- new JournalImpl(10 * 1024 * 1024, 10, true, new NIOSequentialFileFactory(journalDir),
+ new JournalImpl(50 * 1024 * 1024, 2, true, new NIOSequentialFileFactory(journalDir),
5000, "jbm-data", "jbm");
journal.start();
journal.load(new ArrayList<RecordInfo>(), null);
- final int numMessages = 10000;
+ final int numMessages = 30000;
- byte[] data = new byte[1024];
+ byte[] data = new byte[700];
long start = System.currentTimeMillis();
Added: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/AIOSequentialFileFactoryTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/AIOSequentialFileFactoryTest.java (rev 0)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/AIOSequentialFileFactoryTest.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,304 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+
+public class AIOSequentialFileFactoryTest extends FileFactoryTestBase
+{
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(journalDir);
+
+ System.out.println("Filling files on " + journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ protected SequentialFileFactory createFactory()
+ {
+ return new AIOSequentialFileFactory(journalDir);
+ }
+
+ public void testBuffer() throws Exception
+ {
+ SequentialFile file = factory.createSequentialFile("filtetmp.log", true, false);
+ file.open();
+ ByteBuffer buff = file.newBuffer(10);
+ assertEquals(512, buff.limit());
+ //ByteBuffer buffer =
+ }
+
+ public void testCreateAndListFiles() throws Exception
+ {
+ List<String> expectedFiles = new ArrayList<String>();
+
+ final int numFiles = 10;
+
+ for (int i = 0; i < numFiles; i++)
+ {
+ String fileName = UUID.randomUUID().toString() + ".jbm";
+
+ expectedFiles.add(fileName);
+
+ SequentialFile sf = factory.createSequentialFile(fileName, false, false);
+
+ sf.open();
+
+ assertEquals(fileName, sf.getFileName());
+ }
+
+ //Create a couple with a different extension - they shouldn't be picked up
+
+ SequentialFile sf1 = factory.createSequentialFile("different.file", false, false);
+ sf1.open();
+
+ SequentialFile sf2 = factory.createSequentialFile("different.cheese", false, false);
+ sf2.open();
+
+ List<String> fileNames = factory.listFiles("jbm");
+
+ assertEquals(expectedFiles.size(), fileNames.size());
+
+ for (String fileName: expectedFiles)
+ {
+ assertTrue(fileNames.contains(fileName));
+ }
+
+ fileNames = factory.listFiles("file");
+
+ assertEquals(1, fileNames.size());
+
+ assertTrue(fileNames.contains("different.file"));
+
+ fileNames = factory.listFiles("cheese");
+
+ assertEquals(1, fileNames.size());
+
+ assertTrue(fileNames.contains("different.cheese"));
+ }
+
+
+ public void testFill() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("fill.jbm", true, false);
+
+ sf.open();
+
+ checkFill(sf, 0, 512, (byte)'X');
+
+ checkFill(sf, 512, 512, (byte)'Y');
+
+ checkFill(sf, 0, 512, (byte)'Z');
+
+ checkFill(sf, 512, 512, (byte)'A');
+
+ checkFill(sf, 1024, 10*1024, (byte)'B');
+ }
+
+ public void testDelete() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("delete-me.jbm", true, false);
+
+ sf.open();
+
+ SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", true, false);
+
+ sf2.open();
+
+ List<String> fileNames = factory.listFiles("jbm");
+
+ assertEquals(2, fileNames.size());
+
+ assertTrue(fileNames.contains("delete-me.jbm"));
+
+ assertTrue(fileNames.contains("delete-me2.jbm"));
+
+ sf.delete();
+
+ fileNames = factory.listFiles("jbm");
+
+ assertEquals(1, fileNames.size());
+
+ assertTrue(fileNames.contains("delete-me2.jbm"));
+
+ }
+
+ public void testWriteandRead() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("write.jbm", true, false);
+
+ sf.open();
+
+ String s1 = "aardvark";
+ byte[] bytes1 = s1.getBytes("UTF-8");
+
+ ByteBuffer bb1 = sf.wrapBuffer(bytes1);
+
+ String s2 = "hippopotamus";
+ byte[] bytes2 = s2.getBytes("UTF-8");
+ ByteBuffer bb2 = sf.wrapBuffer(bytes2);
+
+ String s3 = "echidna";
+ byte[] bytes3 = s3.getBytes("UTF-8");
+ ByteBuffer bb3 = sf.wrapBuffer(bytes3);
+
+ int bytesWritten = sf.write(bb1, true);
+
+ assertEquals(bb1.limit(), bytesWritten);
+
+ bytesWritten = sf.write(bb2, true);
+
+ assertEquals(bb2.limit(), bytesWritten);
+
+ bytesWritten = sf.write(bb3, true);
+
+ assertEquals(bb3.limit(), bytesWritten);
+
+ sf.position(0);
+
+ byte[] rbytes1 = new byte[bytes1.length];
+
+ byte[] rbytes2 = new byte[bytes2.length];
+
+ byte[] rbytes3 = new byte[bytes3.length];
+
+ ByteBuffer rb1 = sf.newBuffer(rbytes1.length);
+ ByteBuffer rb2 = sf.newBuffer(rbytes2.length);
+ ByteBuffer rb3 = sf.newBuffer(rbytes3.length);
+
+ int bytesRead = sf.read(rb1);
+ assertEquals(rb1.limit(), bytesRead);
+ rb1.get(rbytes1);
+ assertByteArraysEquivalent(bytes1, rbytes1);
+
+ bytesRead = sf.read(rb2);
+ assertEquals(rb2.limit(), bytesRead);
+ rb2.get(rbytes2);
+ assertByteArraysEquivalent(bytes2, rbytes2);
+
+ bytesRead = sf.read(rb3);
+ assertEquals(rb3.limit(), bytesRead);
+ rb3.get(rbytes3);
+ assertByteArraysEquivalent(bytes3, rbytes3);
+ }
+
+ public void testPosition() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("position.jbm", true, false);
+
+ sf.open();
+
+ String s1 = "orange";
+ byte[] bytes1 = s1.getBytes("UTF-8");
+ ByteBuffer bb1 = sf.wrapBuffer(bytes1);
+
+ String s2 = "grapefruit";
+ byte[] bytes2 = s2.getBytes("UTF-8");
+ ByteBuffer bb2 = sf.wrapBuffer(bytes2);
+
+ String s3 = "lemon";
+ byte[] bytes3 = s3.getBytes("UTF-8");
+ ByteBuffer bb3 = sf.wrapBuffer(bytes3);
+
+ int bytesWritten = sf.write(bb1, true);
+
+ assertEquals(bb1.limit(), bytesWritten);
+
+ bytesWritten = sf.write(bb2, true);
+
+ assertEquals(bb2.limit(), bytesWritten);
+
+ bytesWritten = sf.write(bb3, true);
+
+ assertEquals(bb3.limit(), bytesWritten);
+
+ byte[] rbytes1 = new byte[bytes1.length];
+
+ byte[] rbytes2 = new byte[bytes2.length];
+
+ byte[] rbytes3 = new byte[bytes3.length];
+
+ ByteBuffer rb1 = sf.newBuffer(rbytes1.length);
+ ByteBuffer rb2 = sf.newBuffer(rbytes2.length);
+ ByteBuffer rb3 = sf.newBuffer(rbytes3.length);
+
+ sf.position(bb1.limit() + bb2.limit());
+
+ int bytesRead = sf.read(rb3);
+ assertEquals(rb3.limit(), bytesRead);
+ rb3.rewind();
+ rb3.get(rbytes3);
+ assertByteArraysEquivalent(bytes3, rbytes3);
+
+ sf.position(rb1.limit());
+
+ bytesRead = sf.read(rb2);
+ assertEquals(rb2.limit(), bytesRead);
+ rb2.get(rbytes2);
+ assertByteArraysEquivalent(bytes2, rbytes2);
+
+ sf.position(0);
+
+ bytesRead = sf.read(rb1);
+ assertEquals(rb1.limit(), bytesRead);
+ rb1.get(rbytes1);
+
+ assertByteArraysEquivalent(bytes1, rbytes1);
+ }
+
+ public void testOpenClose() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("openclose.jbm", true, false);
+
+ sf.open();
+
+ String s1 = "cheesecake";
+ byte[] bytes1 = s1.getBytes("UTF-8");
+ ByteBuffer bb1 = sf.wrapBuffer(bytes1);
+
+ int bytesWritten = sf.write(bb1, true);
+
+ assertEquals(bb1.limit(), bytesWritten);
+
+ sf.close();
+
+ try
+ {
+ sf.write(bb1, true);
+
+ fail("Should throw exception");
+ }
+ catch (Exception e)
+ {
+ //OK
+ }
+
+ sf.open();
+
+ sf.write(bb1, true);
+ }
+
+
+
+}
Property changes on: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/AIOSequentialFileFactoryTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Added: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FileFactoryTestBase.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FileFactoryTestBase.java (rev 0)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FileFactoryTestBase.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.test.unit.UnitTestCase;
+
+public abstract class FileFactoryTestBase extends UnitTestCase
+{
+ protected abstract SequentialFileFactory createFactory();
+
+ protected SequentialFileFactory factory;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ factory = createFactory();
+ }
+
+
+
+ // Protected ---------------------------------
+
+ protected void checkFill(SequentialFile file, int pos, int size, byte fillChar) throws Exception
+ {
+ file.fill(pos, size, fillChar);
+
+ file.close();
+
+ file.open();
+
+ file.position(pos);
+
+
+
+ ByteBuffer bb = ByteBuffer.allocateDirect(size);
+
+ int bytesRead = file.read(bb);
+
+ assertEquals(size, bytesRead);
+
+ bb.rewind();
+
+ byte bytes[] = new byte[size];
+
+ bb.get(bytes);
+
+ for (int i = 0; i < size; i++)
+ {
+ //log.info(" i is " + i);
+ assertEquals(fillChar, bytes[i]);
+ }
+
+ }
+
+
+
+}
Property changes on: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FileFactoryTestBase.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -210,8 +210,9 @@
TransactionHolder tx = getTransaction(txID);
for (int i = 0; i < arguments.length; i++)
- {
- byte[] record = generateRecord(recordLength);
+ {
+ // SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE
+ byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX );
journal.appendAddRecordTransactional(txID, arguments[i], record);
@@ -226,7 +227,7 @@
for (int i = 0; i < arguments.length; i++)
{
- byte[] updateRecord = generateRecord(recordLength);
+ byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_UPDATE_RECORD_TX );
journal.appendUpdateRecordTransactional(txID, arguments[i], updateRecord);
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestUnit.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestUnit.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestUnit.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -253,6 +253,36 @@
}
}
+ public void testEmptyReopen() throws Exception
+ {
+ setup(2, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+
+ List<String> files1 = fileFactory.listFiles(fileExtension);
+
+ assertEquals(2, files1.size());
+
+ stopJournal();
+
+ setup(2, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+
+ List<String> files2 = fileFactory.listFiles(fileExtension);
+
+ assertEquals(2, files2.size());
+
+ for (String file: files1)
+ {
+ assertTrue(files2.contains(file));
+ }
+
+ stopJournal();
+ }
+
public void testCreateFilesOnLoad() throws Exception
{
setup(10, 10 * 1024, true);
@@ -2015,9 +2045,10 @@
loadAndCheck();
}
+ /// aki
public void testMultipleTransactionsDifferentIDs() throws Exception
{
- setup(10, 10 * 1024, true);
+ setup(5, 512 + 9 * 1024, true);
createJournal();
startJournal();
load();
@@ -2028,8 +2059,6 @@
commit(1);
addTx(2, 11, 12, 13, 14, 15, 16);
- updateTx(2, 11, 13, 15);
- deleteTx(2, 11, 12, 13, 14, 15, 16);
commit(2);
addTx(3, 21, 22, 23, 24, 25, 26);
@@ -2043,6 +2072,29 @@
loadAndCheck();
}
+ public void testTransactionOnDifferentFiles() throws Exception
+ {
+ setup(2, 512 + 2*1024, true);
+
+ createJournal();
+ startJournal();
+ load();
+
+ addTx(1, 1, 2, 3, 4, 5, 6);
+ updateTx(1, 1, 3, 5);
+ deleteTx(1, 1, 2, 3, 4, 5, 6);
+ commit(1);
+
+ addTx(2, 11, 12);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+
+ }
+
public void testMultipleInterleavedTransactionsDifferentIDs() throws Exception
{
setup(10, 10 * 1024, true);
Added: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealAIOJournalImplTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealAIOJournalImplTest.java (rev 0)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealAIOJournalImplTest.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealAIOJournalImplTest extends JournalImplTestUnit
+{
+ private static final Logger log = Logger.getLogger(RealAIOJournalImplTest.class);
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(journalDir);
+
+ log.info("deleting directory " + journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new AIOSequentialFileFactory(journalDir);
+ }
+
+}
Property changes on: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealAIOJournalImplTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Deleted: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -1,60 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.journal.impl.test.unit;
-
-import java.io.File;
-import java.util.ArrayList;
-
-import org.jboss.messaging.core.journal.Journal;
-import org.jboss.messaging.core.journal.RecordInfo;
-import org.jboss.messaging.core.journal.SequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
-import org.jboss.messaging.core.logging.Logger;
-
-/**
- *
- * A RealJournalImplTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class RealJournalImplTest extends JournalImplTestUnit
-{
- private static final Logger log = Logger.getLogger(RealJournalImplTest.class);
-
- protected String journalDir = System.getProperty("user.home") + "/journal-test";
-
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- File file = new File(journalDir);
-
- log.info("deleting directory " + journalDir);
-
- deleteDirectory(file);
-
- file.mkdir();
-
- return new NIOSequentialFileFactory(journalDir);
- }
-
-}
Copied: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealNIOJournalImplTest.java (from rev 4009, trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java)
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealNIOJournalImplTest.java (rev 0)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealNIOJournalImplTest.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+import java.util.ArrayList;
+
+import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealNIOJournalImplTest extends JournalImplTestUnit
+{
+ private static final Logger log = Logger.getLogger(RealNIOJournalImplTest.class);
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(journalDir);
+
+ log.info("deleting directory " + journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new NIOSequentialFileFactory(journalDir);
+ }
+
+}
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -66,7 +66,7 @@
expectedFiles.add(fileName);
- SequentialFile sf = factory.createSequentialFile(fileName, false);
+ SequentialFile sf = factory.createSequentialFile(fileName, false, false);
sf.open();
@@ -75,10 +75,10 @@
//Create a couple with a different extension - they shouldn't be picked up
- SequentialFile sf1 = factory.createSequentialFile("different.file", false);
+ SequentialFile sf1 = factory.createSequentialFile("different.file", false, false);
sf1.open();
- SequentialFile sf2 = factory.createSequentialFile("different.cheese", false);
+ SequentialFile sf2 = factory.createSequentialFile("different.cheese", false, false);
sf2.open();
List<String> fileNames = factory.listFiles("jbm");
@@ -106,7 +106,7 @@
public void testFill() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("fill.jbm", true);
+ SequentialFile sf = factory.createSequentialFile("fill.jbm", true, false);
sf.open();
@@ -123,11 +123,11 @@
public void testDelete() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("delete-me.jbm", true);
+ SequentialFile sf = factory.createSequentialFile("delete-me.jbm", true, false);
sf.open();
- SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", true);
+ SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", true, false);
sf2.open();
@@ -151,7 +151,7 @@
public void testWriteandRead() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("write.jbm", true);
+ SequentialFile sf = factory.createSequentialFile("write.jbm", true, false);
sf.open();
@@ -206,7 +206,7 @@
public void testPosition() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("position.jbm", true);
+ SequentialFile sf = factory.createSequentialFile("position.jbm", true, false);
sf.open();
@@ -265,7 +265,7 @@
public void testOpenClose() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("openclose.jbm", true);
+ SequentialFile sf = factory.createSequentialFile("openclose.jbm", true, false);
sf.open();
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
@@ -44,7 +45,7 @@
private Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<String, FakeSequentialFile>();
- public SequentialFile createSequentialFile(final String fileName, final boolean sync) throws Exception
+ public SequentialFile createSequentialFile(final String fileName, final boolean sync, boolean control) throws Exception
{
FakeSequentialFile sf = fileMap.get(fileName);
@@ -178,6 +179,11 @@
public int read(ByteBuffer bytes) throws Exception
{
+ return read(bytes, null);
+ }
+
+ public int read(ByteBuffer bytes, IOCallback callback) throws Exception
+ {
if (!open)
{
throw new IllegalStateException("Is closed");
@@ -193,6 +199,8 @@
bytes.put(bytesRead);
+ if (callback != null) callback.done();
+
return bytesRead.length;
}
@@ -208,22 +216,30 @@
data.position(pos);
}
+ public int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception
+ {
+ if (!open)
+ {
+ throw new IllegalStateException("Is closed");
+ }
+
+ int position = data == null ? 0 : data.position();
+
+ checkAndResize(bytes.capacity() + position);
+
+ //log.info("write called, position is " + data.position() + " bytes is " + bytes.array().length);
+
+ data.put(bytes);
+
+ if (callback!=null) callback.done();
+
+ return bytes.array().length;
+
+ }
+
public int write(ByteBuffer bytes, boolean sync) throws Exception
{
- if (!open)
- {
- throw new IllegalStateException("Is closed");
- }
-
- int position = data == null ? 0 : data.position();
-
- checkAndResize(bytes.capacity() + position);
-
- //log.info("write called, position is " + data.position() + " bytes is " + bytes.array().length);
-
- data.put(bytes);
-
- return bytes.array().length;
+ return write(bytes, sync, null);
}
private void checkAndResize(int size)
@@ -245,7 +261,22 @@
}
}
+ public ByteBuffer newBuffer(int size)
+ {
+ return ByteBuffer.allocate(size);
+ }
+ public ByteBuffer wrapBuffer(byte[] bytes)
+ {
+ return ByteBuffer.wrap(bytes);
+ }
+
+ public int getAlignment() throws Exception
+ {
+ return 1;
+ }
+
+
}
}
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/test/messaging/JBMServerTestCase.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/test/messaging/JBMServerTestCase.java 2008-04-09 23:02:06 UTC (rev 4025)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/test/messaging/JBMServerTestCase.java 2008-04-10 01:27:55 UTC (rev 4026)
@@ -247,7 +247,7 @@
protected boolean getClearDatabase()
{
- return true;
+ return false;
}
protected HashMap<String, Object> getConfiguration()
More information about the jboss-cvs-commits
mailing list