[jboss-cvs] JBoss Messaging SVN: r7202 - in trunk: native/bin and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 4 16:08:33 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-04 16:08:33 -0400 (Thu, 04 Jun 2009)
New Revision: 7202
Added:
trunk/native/src/JNI_AsynchronousFileImpl.cpp
Removed:
trunk/native/src/LibAIOController.cpp
Modified:
trunk/native/bin/libJBMLibAIO64.so
trunk/native/configure.ac
trunk/native/src/Makefile.am
trunk/native/src/Version.h
trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
Log:
nanoSleep changes
Modified: trunk/native/bin/libJBMLibAIO64.so
===================================================================
(Binary files differ)
Modified: trunk/native/configure.ac
===================================================================
--- trunk/native/configure.ac 2009-06-04 19:49:50 UTC (rev 7201)
+++ trunk/native/configure.ac 2009-06-04 20:08:33 UTC (rev 7202)
@@ -11,7 +11,7 @@
AC_JNI_INCLUDE_DIR
AC_CONFIG_HEADERS([config.h:config.in])
-AC_CONFIG_SRCDIR([src/LibAIOController.cpp])
+AC_CONFIG_SRCDIR([src/JNI_AsynchronousFileImpl.cpp])
# Check for libaio and libaio-devel
libaio_fail=0
Copied: trunk/native/src/JNI_AsynchronousFileImpl.cpp (from rev 7196, trunk/native/src/LibAIOController.cpp)
===================================================================
--- trunk/native/src/JNI_AsynchronousFileImpl.cpp (rev 0)
+++ trunk/native/src/JNI_AsynchronousFileImpl.cpp 2009-06-04 20:08:33 UTC (rev 7202)
@@ -0,0 +1,323 @@
+/*
+ Copyright (C) 2008 Red Hat Software - JBoss Middleware Division
+
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
+ USA
+
+ The GNU Lesser General Public License is available in the file COPYING.
+
+ Software written by Clebert Suconic (csuconic at redhat dot com)
+*/
+
+#include <jni.h>
+#include <stdlib.h>
+#include <iostream>
+#include <stdio.h>
+#include <fcntl.h>
+#include <string>
+#include <time.h>
+
+
+#include "org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h"
+
+
+#include "JavaUtilities.h"
+#include "AIOController.h"
+#include "JNICallbackAdapter.h"
+#include "AIOException.h"
+#include "Version.h"
+
+
+// This value is set here globally, to avoid passing stuff on stack between java and the native layer on every sleep call
+struct timespec nanoTime;
+
+
+/*
+ * Class: org_jboss_jaio_libaioimpl_LibAIOController
+ * Method: init
+ * Signature: (Ljava/lang/String;Ljava/lang/Class;)J
+ */
+JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_init
+ (JNIEnv * env, jclass clazz, jstring jstrFileName, jint maxIO, jobject logger)
+{
+ AIOController * controller = 0;
+ try
+ {
+ std::string fileName = convertJavaString(env, jstrFileName);
+
+ controller = new AIOController(fileName, (int) maxIO);
+ controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/jboss/messaging/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;)V");
+ if (!controller->done) return 0;
+
+ controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/jboss/messaging/core/asyncio/AIOCallback;ILjava/lang/String;)V");
+ if (!controller->error) return 0;
+
+ jclass loggerClass = env->GetObjectClass(logger);
+
+ if (!(controller->loggerDebug = env->GetMethodID(loggerClass, "debug", "(Ljava/lang/Object;)V"))) return 0;
+ if (!(controller->loggerWarn = env->GetMethodID(loggerClass, "warn", "(Ljava/lang/Object;)V"))) return 0;
+ if (!(controller->loggerInfo = env->GetMethodID(loggerClass, "info", "(Ljava/lang/Object;)V"))) return 0;
+ if (!(controller->loggerError = env->GetMethodID(loggerClass, "error", "(Ljava/lang/Object;)V"))) return 0;
+
+ controller->logger = env->NewGlobalRef(logger);
+
+// controller->log(env,4, "Controller initialized");
+
+ return (jlong)controller;
+ }
+ catch (AIOException& e){
+ if (controller != 0)
+ {
+ delete controller;
+ }
+ throwException(env, e.getErrorCode(), e.what());
+ return 0;
+ }
+}
+
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_read
+ (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
+{
+ try
+ {
+ AIOController * controller = (AIOController *) controllerAddress;
+ void * buffer = env->GetDirectBufferAddress(jbuffer);
+
+ if (buffer == 0)
+ {
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+ return;
+ }
+
+ if (((long)buffer) % 512)
+ {
+ throwException(env, NATIVE_ERROR_NOT_ALIGNED, "Buffer not aligned for use with DMA");
+ return;
+ }
+
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
+
+ controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ }
+}
+
+
+// Fast memset on buffer
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_resetBuffer
+ (JNIEnv *env, jclass, jobject jbuffer, jint size)
+{
+ void * buffer = env->GetDirectBufferAddress(jbuffer);
+
+ if (buffer == 0)
+ {
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+ return;
+ }
+
+ memset(buffer, 0, (size_t)size);
+
+}
+
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_destroyBuffer
+ (JNIEnv * env, jclass, jobject jbuffer)
+{
+ void * buffer = env->GetDirectBufferAddress(jbuffer);
+ free(buffer);
+}
+
+JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_newNativeBuffer
+ (JNIEnv * env, jclass, jlong size)
+{
+ try
+ {
+
+ if (size % ALIGNMENT)
+ {
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Buffer size needs to be aligned to 512");
+ return 0;
+ }
+
+
+ // This will allocate a buffer, aligned by 512.
+ // Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
+ void * buffer = 0;
+ if (::posix_memalign(&buffer, 512, size))
+ {
+ throwException(env, NATIVE_ERROR_INTERNAL, "Error on posix_memalign");
+ return 0;
+ }
+
+ memset(buffer, 0, (size_t)size);
+
+ jobject jbuffer = env->NewDirectByteBuffer(buffer, size);
+ return jbuffer;
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ return 0;
+ }
+}
+
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
+ (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
+{
+ try
+ {
+ AIOController * controller = (AIOController *) controllerAddress;
+ void * buffer = env->GetDirectBufferAddress(jbuffer);
+
+ if (buffer == 0)
+ {
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+ return;
+ }
+
+
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
+
+ controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ }
+}
+
+
+
+JNIEXPORT void Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_internalPollEvents
+ (JNIEnv *env, jclass, jlong controllerAddress)
+{
+ try
+ {
+ AIOController * controller = (AIOController *) controllerAddress;
+ controller->fileOutput.pollEvents(env);
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ }
+}
+
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_stopPoller
+ (JNIEnv *env, jclass, jlong controllerAddress)
+{
+ try
+ {
+ AIOController * controller = (AIOController *) controllerAddress;
+ controller->fileOutput.stopPoller(env);
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ }
+}
+
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_closeInternal
+ (JNIEnv *env, jclass, jlong controllerAddress)
+{
+ try
+ {
+ if (controllerAddress != 0)
+ {
+ AIOController * controller = (AIOController *) controllerAddress;
+ controller->destroy(env);
+ delete controller;
+ }
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ }
+}
+
+
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_fill
+ (JNIEnv * env, jclass, jlong controllerAddress, jlong position, jint blocks, jlong size, jbyte fillChar)
+{
+ try
+ {
+ AIOController * controller = (AIOController *) controllerAddress;
+
+ controller->fileOutput.preAllocate(env, position, blocks, size, fillChar);
+
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ }
+}
+
+
+
+/** It does nothing... just return true to make sure it has all the binary dependencies */
+JNIEXPORT jint JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_getNativeVersion
+ (JNIEnv *, jclass)
+
+{
+ return _VERSION_NATIVE_AIO;
+}
+
+
+JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_size0
+ (JNIEnv * env, jobject, jlong controllerAddress)
+{
+ try
+ {
+ AIOController * controller = (AIOController *) controllerAddress;
+
+ long size = controller->fileOutput.getSize();
+ if (size < 0)
+ {
+ throwException(env, NATIVE_ERROR_INTERNAL, "InternalError on Native Layer: method size failed");
+ return -1l;
+ }
+ return size;
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ return -1l;
+ }
+
+}
+
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: setNanoSleepInterval
+ * Signature: (I)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_setNanoSleepInterval
+ (JNIEnv *, jclass, jint nanotime)
+{
+ nanoTime.tv_sec = 0;
+ nanoTime.tv_nsec = (long)nanotime;
+}
+
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: nanoSleep
+ * Signature: ()V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_nanoSleep
+ (JNIEnv *, jclass)
+{
+ if (nanoTime.tv_nsec != 0)
+ {
+ nanosleep(&nanoTime, 0);
+ }
+}
+
Deleted: trunk/native/src/LibAIOController.cpp
===================================================================
--- trunk/native/src/LibAIOController.cpp 2009-06-04 19:49:50 UTC (rev 7201)
+++ trunk/native/src/LibAIOController.cpp 2009-06-04 20:08:33 UTC (rev 7202)
@@ -1,292 +0,0 @@
-/*
- Copyright (C) 2008 Red Hat Software - JBoss Middleware Division
-
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
- USA
-
- The GNU Lesser General Public License is available in the file COPYING.
-
- Software written by Clebert Suconic (csuconic at redhat dot com)
-*/
-
-#include <jni.h>
-#include <stdlib.h>
-#include <iostream>
-#include <stdio.h>
-#include <fcntl.h>
-#include <string>
-
-
-#include "org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h"
-
-
-#include "JavaUtilities.h"
-#include "AIOController.h"
-#include "JNICallbackAdapter.h"
-#include "AIOException.h"
-#include "Version.h"
-
-
-/*
- * Class: org_jboss_jaio_libaioimpl_LibAIOController
- * Method: init
- * Signature: (Ljava/lang/String;Ljava/lang/Class;)J
- */
-JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_init
- (JNIEnv * env, jclass clazz, jstring jstrFileName, jint maxIO, jobject logger)
-{
- AIOController * controller = 0;
- try
- {
- std::string fileName = convertJavaString(env, jstrFileName);
-
- controller = new AIOController(fileName, (int) maxIO);
- controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/jboss/messaging/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;)V");
- if (!controller->done) return 0;
-
- controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/jboss/messaging/core/asyncio/AIOCallback;ILjava/lang/String;)V");
- if (!controller->error) return 0;
-
- jclass loggerClass = env->GetObjectClass(logger);
-
- if (!(controller->loggerDebug = env->GetMethodID(loggerClass, "debug", "(Ljava/lang/Object;)V"))) return 0;
- if (!(controller->loggerWarn = env->GetMethodID(loggerClass, "warn", "(Ljava/lang/Object;)V"))) return 0;
- if (!(controller->loggerInfo = env->GetMethodID(loggerClass, "info", "(Ljava/lang/Object;)V"))) return 0;
- if (!(controller->loggerError = env->GetMethodID(loggerClass, "error", "(Ljava/lang/Object;)V"))) return 0;
-
- controller->logger = env->NewGlobalRef(logger);
-
-// controller->log(env,4, "Controller initialized");
-
- return (jlong)controller;
- }
- catch (AIOException& e){
- if (controller != 0)
- {
- delete controller;
- }
- throwException(env, e.getErrorCode(), e.what());
- return 0;
- }
-}
-
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_read
- (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
-{
- try
- {
- AIOController * controller = (AIOController *) controllerAddress;
- void * buffer = env->GetDirectBufferAddress(jbuffer);
-
- if (buffer == 0)
- {
- throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
- return;
- }
-
- if (((long)buffer) % 512)
- {
- throwException(env, NATIVE_ERROR_NOT_ALIGNED, "Buffer not aligned for use with DMA");
- return;
- }
-
- CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
-
- controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
- }
- catch (AIOException& e)
- {
- throwException(env, e.getErrorCode(), e.what());
- }
-}
-
-
-// Fast memset on buffer
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_resetBuffer
- (JNIEnv *env, jclass, jobject jbuffer, jint size)
-{
- void * buffer = env->GetDirectBufferAddress(jbuffer);
-
- if (buffer == 0)
- {
- throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
- return;
- }
-
- memset(buffer, 0, (size_t)size);
-
-}
-
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_destroyBuffer
- (JNIEnv * env, jclass, jobject jbuffer)
-{
- void * buffer = env->GetDirectBufferAddress(jbuffer);
- free(buffer);
-}
-
-JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_newNativeBuffer
- (JNIEnv * env, jclass, jlong size)
-{
- try
- {
-
- if (size % ALIGNMENT)
- {
- throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Buffer size needs to be aligned to 512");
- return 0;
- }
-
-
- // This will allocate a buffer, aligned by 512.
- // Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
- void * buffer = 0;
- if (::posix_memalign(&buffer, 512, size))
- {
- throwException(env, NATIVE_ERROR_INTERNAL, "Error on posix_memalign");
- return 0;
- }
-
- memset(buffer, 0, (size_t)size);
-
- jobject jbuffer = env->NewDirectByteBuffer(buffer, size);
- return jbuffer;
- }
- catch (AIOException& e)
- {
- throwException(env, e.getErrorCode(), e.what());
- return 0;
- }
-}
-
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
- (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
-{
- try
- {
- AIOController * controller = (AIOController *) controllerAddress;
- void * buffer = env->GetDirectBufferAddress(jbuffer);
-
- if (buffer == 0)
- {
- throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
- return;
- }
-
-
- CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
-
- controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
- }
- catch (AIOException& e)
- {
- throwException(env, e.getErrorCode(), e.what());
- }
-}
-
-
-
-JNIEXPORT void Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_internalPollEvents
- (JNIEnv *env, jclass, jlong controllerAddress)
-{
- try
- {
- AIOController * controller = (AIOController *) controllerAddress;
- controller->fileOutput.pollEvents(env);
- }
- catch (AIOException& e)
- {
- throwException(env, e.getErrorCode(), e.what());
- }
-}
-
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_stopPoller
- (JNIEnv *env, jclass, jlong controllerAddress)
-{
- try
- {
- AIOController * controller = (AIOController *) controllerAddress;
- controller->fileOutput.stopPoller(env);
- }
- catch (AIOException& e)
- {
- throwException(env, e.getErrorCode(), e.what());
- }
-}
-
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_closeInternal
- (JNIEnv *env, jclass, jlong controllerAddress)
-{
- try
- {
- if (controllerAddress != 0)
- {
- AIOController * controller = (AIOController *) controllerAddress;
- controller->destroy(env);
- delete controller;
- }
- }
- catch (AIOException& e)
- {
- throwException(env, e.getErrorCode(), e.what());
- }
-}
-
-
-JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_fill
- (JNIEnv * env, jclass, jlong controllerAddress, jlong position, jint blocks, jlong size, jbyte fillChar)
-{
- try
- {
- AIOController * controller = (AIOController *) controllerAddress;
-
- controller->fileOutput.preAllocate(env, position, blocks, size, fillChar);
-
- }
- catch (AIOException& e)
- {
- throwException(env, e.getErrorCode(), e.what());
- }
-}
-
-
-
-/** It does nothing... just return true to make sure it has all the binary dependencies */
-JNIEXPORT jint JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_getNativeVersion
- (JNIEnv *, jclass)
-
-{
- return _VERSION_NATIVE_AIO;
-}
-
-
-JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_size0
- (JNIEnv * env, jobject, jlong controllerAddress)
-{
- try
- {
- AIOController * controller = (AIOController *) controllerAddress;
-
- long size = controller->fileOutput.getSize();
- if (size < 0)
- {
- throwException(env, NATIVE_ERROR_INTERNAL, "InternalError on Native Layer: method size failed");
- return -1l;
- }
- return size;
- }
- catch (AIOException& e)
- {
- throwException(env, e.getErrorCode(), e.what());
- return -1l;
- }
-
-}
-
Modified: trunk/native/src/Makefile.am
===================================================================
--- trunk/native/src/Makefile.am 2009-06-04 19:49:50 UTC (rev 7201)
+++ trunk/native/src/Makefile.am 2009-06-04 20:08:33 UTC (rev 7202)
@@ -2,9 +2,9 @@
lib_LTLIBRARIES = libJBMLibAIO.la
bin_PROGRAMS = disktest
-libJBMLibAIO_la_SOURCES = AIOController.cpp AIOController.h AIOException.h AsyncFile.cpp \
+libJBMLibAIO_la_SOURCES = JNI_AsynchronousFileImpl.cpp AIOController.cpp AIOController.h AIOException.h AsyncFile.cpp \
AsyncFile.h CallbackAdapter.h JAIODatatypes.h JavaUtilities.cpp \
- JavaUtilities.h JNICallbackAdapter.cpp JNICallbackAdapter.h LibAIOController.cpp \
+ JavaUtilities.h JNICallbackAdapter.cpp JNICallbackAdapter.h \
LockClass.h org_jboss_messaging_core_persistence_impl_libaio_jni_impl_AsynchronousFileImpl.h \
Version.h
Modified: trunk/native/src/Version.h
===================================================================
--- trunk/native/src/Version.h 2009-06-04 19:49:50 UTC (rev 7201)
+++ trunk/native/src/Version.h 2009-06-04 20:08:33 UTC (rev 7202)
@@ -1,5 +1,5 @@
#ifndef _VERSION_NATIVE_AIO
-#define _VERSION_NATIVE_AIO 20
+#define _VERSION_NATIVE_AIO 21
#endif
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2009-06-04 19:49:50 UTC (rev 7201)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2009-06-04 20:08:33 UTC (rev 7202)
@@ -55,7 +55,7 @@
private static boolean loaded = false;
- private static int EXPECTED_NATIVE_VERSION = 20;
+ private static int EXPECTED_NATIVE_VERSION = 21;
public static void addMax(final int io)
{
@@ -86,6 +86,8 @@
}
else
{
+ // Initializing nanosleep
+ setNanoSleepInterval(1);
return true;
}
}
@@ -493,6 +495,11 @@
private static native void resetBuffer(ByteBuffer directByteBuffer, int size);
public static native void destroyBuffer(ByteBuffer buffer);
+
+ /** Instead of passing the nanoSeconds through the stack call every time, we set it statically inside the native method */
+ public static native void setNanoSleepInterval(int nanoseconds);
+
+ public static native void nanoSleep();
private static native ByteBuffer newNativeBuffer(long size);
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-04 19:49:50 UTC (rev 7201)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-04 20:08:33 UTC (rev 7202)
@@ -48,16 +48,13 @@
// Attributes ----------------------------------------------------
private TimedBufferObserver bufferObserver;
-
-
+
// This is used to pause and resume the timer
// This is a reusable Latch, that uses java.util.concurrent base classes
private final VariableLatch latchTimer = new VariableLatch();
private CheckTimer timerRunnable = new CheckTimer();
- private final long timeout;
-
private final int bufferSize;
private final ByteBuffer currentBuffer;
@@ -67,15 +64,15 @@
private final Lock lock = new ReentrantReadWriteLock().writeLock();
// used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
- private volatile long timeLastAdd = 0;
+ private volatile boolean active = false;
// used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
- private volatile long timeLastSync = 0;
+ private volatile boolean pendingSync = false;
private Thread timerThread;
-
+
private volatile boolean started;
-
+
private final boolean flushOnSync;
// Static --------------------------------------------------------
@@ -87,27 +84,28 @@
public TimedBuffer(final int size, final long timeout, final boolean flushOnSync)
{
bufferSize = size;
- this.timeout = timeout;
+ // Setting the interval for nano-sleeps
+ AsynchronousFileImpl.setNanoSleepInterval((int)timeout);
currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
currentBuffer.limit(0);
callbacks = new ArrayList<AIOCallback>();
this.flushOnSync = flushOnSync;
latchTimer.up();
}
-
+
public synchronized void start()
{
if (started)
{
return;
}
-
+
timerRunnable = new CheckTimer();
-
+
timerThread = new Thread(timerRunnable, "jbm-aio-timer");
timerThread.start();
-
+
started = true;
}
@@ -117,9 +115,9 @@
{
return;
}
-
+
latchTimer.down();
-
+
timerRunnable.close();
while (timerThread.isAlive())
@@ -132,7 +130,7 @@
{
}
}
-
+
started = false;
}
@@ -142,7 +140,7 @@
{
flush();
}
-
+
this.bufferObserver = observer;
}
@@ -194,14 +192,16 @@
public synchronized void addBytes(final ByteBuffer bytes, final boolean sync, final AIOCallback callback)
{
- long now = System.nanoTime();
+ if (currentBuffer.position() == 0)
+ {
+ // Resume latch
+ latchTimer.down();
+ }
currentBuffer.put(bytes);
callbacks.add(callback);
- timeLastAdd = now;
-
- latchTimer.down();
+ active = true;
if (sync)
{
@@ -212,15 +212,16 @@
else
{
// We should flush on the next timeout, no matter what other activity happens on the buffer
- if (timeLastSync == 0)
+ if (!pendingSync)
{
- timeLastSync = now;
+ pendingSync = true;
}
}
}
-
+
if (currentBuffer.position() == currentBuffer.capacity())
{
+ System.out.println("Flush by size");
flush();
}
}
@@ -230,7 +231,7 @@
if (currentBuffer.limit() > 0)
{
latchTimer.up();
-
+
ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, currentBuffer.position());
// Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
@@ -241,8 +242,8 @@
callbacks = new ArrayList<AIOCallback>();
- timeLastAdd = 0;
- timeLastSync = 0;
+ active = false;
+ pendingSync = false;
currentBuffer.limit(0);
}
@@ -256,15 +257,13 @@
private void checkTimer()
{
- final long now = System.nanoTime();
-
// if inactive for more than the timeout
// of if a sync happened at more than the the timeout ago
- if (timeLastAdd != 0 && now - timeLastAdd >= timeout || timeLastSync != 0 && now - timeLastSync >= timeout)
+ if (!active || pendingSync)
{
lock.lock();
try
- {
+ {
if (bufferObserver != null)
{
flush();
@@ -275,6 +274,8 @@
lock.unlock();
}
}
+
+ active = false;
}
// Inner classes -------------------------------------------------
@@ -282,9 +283,10 @@
private class CheckTimer implements Runnable
{
private volatile boolean closed = false;
-
+
public void run()
{
+ Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
while (!closed)
{
try
@@ -297,9 +299,9 @@
checkTimer();
- //TODO - this yield is temporary
-
- Thread.yield();
+ // The time is passed on the constructor.
+ // I'm avoiding the the long on the calling stack, to avoid performance hits here
+ AsynchronousFileImpl.nanoSleep();
}
}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-06-04 19:49:50 UTC (rev 7201)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-06-04 20:08:33 UTC (rev 7202)
@@ -97,7 +97,7 @@
public static final boolean DEFAULT_JOURNAL_AIO_FLUSH_SYNC = false;
- public static final int DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT = 1;
+ public static final int DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT = 500000;
public static final int DEFAULT_JOURNAL_AIO_BUFFER_SIZE = 128 * 1024;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-06-04 19:49:50 UTC (rev 7201)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-06-04 20:08:33 UTC (rev 7202)
@@ -951,7 +951,37 @@
controller.close();
}
+
+ public void testNanoSleep() throws Exception
+ {
+ AsynchronousFileImpl.setNanoSleepInterval(1);
+ AsynchronousFileImpl.nanoSleep();
+
+ Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
+
+ long timeInterval = 1000000;
+ long nloops = 1000;
+
+ AsynchronousFileImpl.setNanoSleepInterval((int)timeInterval);
+ long time = System.currentTimeMillis();
+
+ for (long i = 0 ; i < nloops; i++)
+ {
+ Thread.sleep(1);
+ //AsynchronousFileImpl.nanoSleep();
+ }
+
+ long end = System.currentTimeMillis();
+
+ long expectedTime = (timeInterval * nloops / 1000000l);
+
+ System.out.println("TotalTime = " + (end - time) + " expected = " + expectedTime);
+
+ assertTrue((end - time) >= expectedTime);
+
+ }
+
private void addString(final String str, final ByteBuffer buffer)
{
CharBuffer charBuffer = CharBuffer.wrap(str);
More information about the jboss-cvs-commits
mailing list