[jboss-cvs] JBoss Messaging SVN: r4070 - in branches/trunk_tmp_aio: native/src and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Apr 16 19:49:50 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-04-16 19:49:50 -0400 (Wed, 16 Apr 2008)
New Revision: 4070
Modified:
branches/trunk_tmp_aio/
branches/trunk_tmp_aio/.classpath
branches/trunk_tmp_aio/build-messaging.xml
branches/trunk_tmp_aio/native/src/AsyncFile.cpp
branches/trunk_tmp_aio/native/src/JNICallbackAdapter.cpp
branches/trunk_tmp_aio/native/src/JNICallbackAdapter.h
branches/trunk_tmp_aio/native/src/LibAIOController.cpp
branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java
Log:
Implementing read/writeLock on closes
Property changes on: branches/trunk_tmp_aio
___________________________________________________________________
Name: svn:ignore
- output
thirdparty
messaging.iws
bin
ObjectStore
+ output
thirdparty
messaging.iws
bin
ObjectStore
build
Modified: branches/trunk_tmp_aio/.classpath
===================================================================
--- branches/trunk_tmp_aio/.classpath 2008-04-16 22:18:37 UTC (rev 4069)
+++ branches/trunk_tmp_aio/.classpath 2008-04-16 23:49:50 UTC (rev 4070)
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
- <classpathentry kind="src" path="output/gen-parsers"/>
+ <classpathentry kind="src" path="build/src"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src"/>
<classpathentry kind="src" path="tests/etc/ide"/>
<classpathentry excluding="ide/" kind="src" path="tests/etc"/>
@@ -56,9 +56,7 @@
<classpathentry kind="lib" path="thirdparty/jboss/jboss-javaee/lib/jboss-javaee.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/jbosssx-client/lib/jbosssx-client.jar"/>
<classpathentry kind="lib" path="tests/lib/easymock.jar"/>
- <classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core-2.0.0-M1.jar" sourcepath="thirdparty/apache-mina/lib/mina-core-2.0.0-M1-sources.jar"/>
<classpathentry kind="lib" path="thirdparty/slf4j/log4j/lib/slf4j-log4j12.jar"/>
- <classpathentry kind="lib" path="src/etc/server/default/config"/>
- <classpathentry kind="lib" path="src/etc/server/default/deploy"/>
+ <classpathentry kind="lib" path="thirdparty/apache-mina/lib/mina-core-2.0.0-M2-20080411.122259-16.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Modified: branches/trunk_tmp_aio/build-messaging.xml
===================================================================
--- branches/trunk_tmp_aio/build-messaging.xml 2008-04-16 22:18:37 UTC (rev 4069)
+++ branches/trunk_tmp_aio/build-messaging.xml 2008-04-16 23:49:50 UTC (rev 4070)
@@ -188,6 +188,11 @@
<path refid="jboss.dependencies.classpath"/>
</path>
+ <path id="javah.compilation.classpath">
+ <path refid="compilation.classpath"/>
+ <path location="./build/classes"/>
+ </path>
+
<path id="test.compilation.classpath">
<path refid="compilation.classpath"/>
<path location="${build.jars.dir}/jboss-${module.name}.jar"/>
@@ -310,6 +315,8 @@
<include name="**/*.java"/>
<classpath refid="compilation.classpath"/>
</javac>
+ <javah class="org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl"
+ classpathref="javah.compilation.classpath" destdir="./native/src"/>
</target>
<!-- ======================================================================================== -->
@@ -619,4 +626,4 @@
</junitreport>
</target>
-</project>
\ No newline at end of file
+</project>
Modified: branches/trunk_tmp_aio/native/src/AsyncFile.cpp
===================================================================
--- branches/trunk_tmp_aio/native/src/AsyncFile.cpp 2008-04-16 22:18:37 UTC (rev 4069)
+++ branches/trunk_tmp_aio/native/src/AsyncFile.cpp 2008-04-16 23:49:50 UTC (rev 4070)
@@ -99,6 +99,11 @@
}
}
+int isException (THREAD_CONTEXT threadContext)
+{
+ return JNI_ENV(threadContext)->ExceptionOccurred() != 0;
+}
+
void AsyncFile::pollEvents(THREAD_CONTEXT threadContext)
{
@@ -114,8 +119,13 @@
while (pollerRunning)
{
+ if (isException(threadContext))
+ {
+ return;
+ }
int result = io_getevents(this->aioContext, 1, maxIO, events, 0);
+
#ifdef DEBUG
fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
#endif
Modified: branches/trunk_tmp_aio/native/src/JNICallbackAdapter.cpp
===================================================================
--- branches/trunk_tmp_aio/native/src/JNICallbackAdapter.cpp 2008-04-16 22:18:37 UTC (rev 4069)
+++ branches/trunk_tmp_aio/native/src/JNICallbackAdapter.cpp 2008-04-16 23:49:50 UTC (rev 4070)
@@ -22,10 +22,11 @@
#include <iostream>
#include "JavaUtilities.h"
-JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jobject _obj) : CallbackAdapter(), refs(1)
+JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController) : CallbackAdapter(), refs(1)
{
controller = _controller;
- obj = _obj;
+ callback = _callback;
+ fileController = _fileController;
}
JNICallbackAdapter::~JNICallbackAdapter()
@@ -34,7 +35,7 @@
void JNICallbackAdapter::done(THREAD_CONTEXT threadContext)
{
- JNI_ENV(threadContext)->CallVoidMethod(obj,controller->done);
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback);
return;
}
@@ -42,10 +43,11 @@
{
controller->log(threadContext, 0, "Libaio event generated errors, callback object was informed about it");
jstring strError = JNI_ENV(threadContext)->NewStringUTF(error.data());
- JNI_ENV(threadContext)->CallVoidMethod(obj, controller->error, (jint)errorCode, strError);
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, (jint)errorCode, strError);
}
void JNICallbackAdapter::destroy(THREAD_CONTEXT threadContext)
{
- JNI_ENV(threadContext)->DeleteGlobalRef(obj);
+ JNI_ENV(threadContext)->DeleteGlobalRef(callback);
+ JNI_ENV(threadContext)->DeleteGlobalRef(fileController);
}
Modified: branches/trunk_tmp_aio/native/src/JNICallbackAdapter.h
===================================================================
--- branches/trunk_tmp_aio/native/src/JNICallbackAdapter.h 2008-04-16 22:18:37 UTC (rev 4069)
+++ branches/trunk_tmp_aio/native/src/JNICallbackAdapter.h 2008-04-16 23:49:50 UTC (rev 4070)
@@ -31,13 +31,14 @@
{
private:
AIOController * controller;
- jobject obj;
+ jobject callback;
+ jobject fileController;
int refs;
void destroy(THREAD_CONTEXT threadContext);
public:
// _ob must be a global Reference (use createGloblReferente before calling the constructor)
- JNICallbackAdapter(AIOController * _controller, jobject _ob);
+ JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController);
virtual ~JNICallbackAdapter();
void done(THREAD_CONTEXT threadContext);
void onError(THREAD_CONTEXT threadContext, long error, std::string error);
Modified: branches/trunk_tmp_aio/native/src/LibAIOController.cpp
===================================================================
--- branches/trunk_tmp_aio/native/src/LibAIOController.cpp 2008-04-16 22:18:37 UTC (rev 4069)
+++ branches/trunk_tmp_aio/native/src/LibAIOController.cpp 2008-04-16 23:49:50 UTC (rev 4070)
@@ -42,17 +42,17 @@
* Signature: (Ljava/lang/String;Ljava/lang/Class;)J
*/
JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_init
- (JNIEnv * env, jclass, jstring jstrFileName, jclass callbackClass, jint maxIO, jobject logger)
+ (JNIEnv * env, jclass clazz, jstring jstrFileName, jint maxIO, jobject logger)
{
try
{
std::string fileName = convertJavaString(env, jstrFileName);
AIOController * controller = new AIOController(fileName, (int) maxIO);
- controller->done = env->GetMethodID(callbackClass,"done","()V");
+ controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/jboss/messaging/core/asyncio/AIOCallback;)V");
if (!controller->done) return 0;
- controller->error = env->GetMethodID(callbackClass, "onError", "(ILjava/lang/String;)V");
+ controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/jboss/messaging/core/asyncio/AIOCallback;ILjava/lang/String;)V");
if (!controller->error) return 0;
jclass loggerClass = env->GetObjectClass(logger);
@@ -75,7 +75,7 @@
}
JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_read
- (JNIEnv *env, jclass, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
+ (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
{
try
{
@@ -94,7 +94,7 @@
return;
}
- CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback));
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis));
controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
}
@@ -105,7 +105,7 @@
}
JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
- (JNIEnv *env, jclass, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
+ (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
{
try
{
@@ -117,7 +117,7 @@
return;
}
- CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback));
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis));
controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
}
Modified: branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
===================================================================
--- branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h 2008-04-16 22:18:37 UTC (rev 4069)
+++ branches/trunk_tmp_aio/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h 2008-04-16 23:49:50 UTC (rev 4070)
@@ -12,10 +12,10 @@
/*
* Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
* Method: init
- * Signature: (Ljava/lang/String;Ljava/lang/Class;ILorg/jboss/messaging/core/logging/Logger;)J
+ * Signature: (Ljava/lang/String;ILorg/jboss/messaging/core/logging/Logger;)J
*/
JNIEXPORT jlong JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_init
- (JNIEnv *, jclass, jstring, jclass, jint, jobject);
+ (JNIEnv *, jclass, jstring, jint, jobject);
/*
* Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
@@ -23,7 +23,7 @@
* Signature: (JJJLjava/nio/ByteBuffer;Lorg/jboss/messaging/core/asyncio/AIOCallback;)V
*/
JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
- (JNIEnv *, jclass, jlong, jlong, jlong, jobject, jobject);
+ (JNIEnv *, jobject, jlong, jlong, jlong, jobject, jobject);
/*
* Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
@@ -31,7 +31,7 @@
* Signature: (JJJLjava/nio/ByteBuffer;Lorg/jboss/messaging/core/asyncio/AIOCallback;)V
*/
JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_read
- (JNIEnv *, jclass, jlong, jlong, jlong, jobject, jobject);
+ (JNIEnv *, jobject, jlong, jlong, jlong, jobject, jobject);
/*
* Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-04-16 22:18:37 UTC (rev 4069)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-04-16 23:49:50 UTC (rev 4070)
@@ -8,6 +8,9 @@
package org.jboss.messaging.core.asyncio.impl;
import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.messaging.core.asyncio.AIOCallback;
import org.jboss.messaging.core.asyncio.AsynchronousFile;
@@ -27,6 +30,10 @@
private Thread poller;
private static boolean loaded = true;
+ ReadWriteLock lock = new ReentrantReadWriteLock();
+ Lock writeLock = lock.writeLock();
+ Lock readLock = lock.readLock();
+
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
*/
@@ -56,10 +63,18 @@
public void open(String fileName, int maxIO)
{
- opened = true;
- this.fileName=fileName;
- handler = init (fileName, AIOCallback.class, maxIO, log);
- startPoller();
+ writeLock.lock();
+ try
+ {
+ opened = true;
+ this.fileName=fileName;
+ handler = init (fileName, maxIO, log);
+ startPoller();
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
}
class PollerThread extends Thread
@@ -74,33 +89,52 @@
}
}
- private synchronized void startPoller()
- {
- checkOpened();
- poller = new PollerThread();
- poller.start();
- }
-
public synchronized void close()
{
+ writeLock.lock();
+ try
+ {
checkOpened();
closeInternal(handler);
opened = false;
handler = 0;
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
}
public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
{
checkOpened();
- write (handler, position, size, directByteBuffer, aioPackage);
+ readLock.lock();
+ try
+ {
+ write (handler, position, size, directByteBuffer, aioPackage);
+ }
+ catch (RuntimeException e)
+ {
+ readLock.unlock();
+ throw e;
+ }
}
public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
{
checkOpened();
- read (handler, position, size, directByteBuffer, aioPackage);
+ readLock.lock();
+ try
+ {
+ read (handler, position, size, directByteBuffer, aioPackage);
+ }
+ catch (RuntimeException e)
+ {
+ readLock.unlock();
+ throw e;
+ }
}
@@ -123,6 +157,21 @@
}
+ /** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
+ @SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
+ private void callbackDone(AIOCallback callback)
+ {
+ readLock.unlock();
+ callback.done();
+ }
+
+ @SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
+ private void callbackError(AIOCallback callback, int errorCode, String errorMessage)
+ {
+ readLock.unlock();
+ callback.onError(errorCode, errorMessage);
+ }
+
private void pollEvents()
{
if (!opened)
@@ -132,6 +181,22 @@
internalPollEvents(handler);
}
+ private synchronized void startPoller()
+ {
+ checkOpened();
+ poller = new PollerThread();
+ try
+ {
+ poller.start();
+ }
+ catch (Exception ex)
+ {
+ log.error(ex.getMessage(), ex);
+ }
+ }
+
+
+
private void checkOpened()
{
if (!opened)
@@ -144,11 +209,11 @@
* I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
*/
@SuppressWarnings("unchecked")
- private static native long init(String fileName, Class aioPackageClazz, int maxIO, Logger logger);
+ private static native long init(String fileName, int maxIO, Logger logger);
- private static native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
+ private native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
- private static native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
+ private native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
private static native void fill(long handle, long position, int blocks, long size, byte fillChar);
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java 2008-04-16 22:18:37 UTC (rev 4069)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java 2008-04-16 23:49:50 UTC (rev 4070)
@@ -23,7 +23,7 @@
import junit.framework.TestCase;
-//you need to define java.library.path=${project-root}/native/src/.libs
+//you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
public class SingleThreadWriteNativeTest extends TestCase
{
private static final Logger log = Logger.getLogger(SingleThreadWriteNativeTest.class);
@@ -277,14 +277,19 @@
try
{
- final int NUMBER_LINES = 300;
+ final int NUMBER_LINES = 1000;
final int SIZE = 1024;
controller.open(FILE_NAME, 10);
+
+ log.info("Filling file");
+ controller.fill(0,1, NUMBER_LINES * SIZE, (byte)'j');
+
ByteBuffer buffer = controller.newBuffer(SIZE);
-
+ log.info("Writing file");
+
for (int i=0; i<NUMBER_LINES; i++)
{
buffer.clear();
@@ -304,6 +309,13 @@
assertTrue(aio.doneCalled);
}
+
+ // If you call close you're supposed to wait events to finish before closing it
+ log.info("Closing file");
+ controller.close();
+ log.info("Reading file");
+ controller.open(FILE_NAME, 10);
+
ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
for (int i=0; i<NUMBER_LINES; i++)
@@ -354,6 +366,111 @@
}
+
+
+ public void testConcurrentClose() throws Exception
+ {
+ // The test might eventually pass if broken
+ for (int i=0; i<10; i++)
+ internalConcurrentClose();
+ }
+
+ public void internalConcurrentClose() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+
+ final int NUMBER_LINES = 1000;
+ CountDownLatch readLatch = new CountDownLatch (NUMBER_LINES);
+ final int SIZE = 1024;
+
+ controller.open(FILE_NAME, 10000);
+
+ log.info("Filling file");
+
+ controller.fill(0,1, NUMBER_LINES * SIZE, (byte)'j');
+
+ log.info("Writing file");
+
+ for (int i=0; i<NUMBER_LINES; i++)
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+
+ buffer.clear();
+ addString ("Str value " + i + "\n", buffer);
+ for (int j=buffer.position(); j<buffer.capacity()-1;j++)
+ {
+ buffer.put((byte)' ');
+ }
+ buffer.put((byte)'\n');
+
+
+ LocalAIO aio = new LocalAIO(readLatch);
+ controller.write(i * SIZE, SIZE, buffer, aio);
+ }
+
+
+ long counter = readLatch.getCount();
+ // If you call close you're supposed to wait events to finish before closing it
+ controller.close();
+ log.info("Closed file with counter = " + counter);
+ assertEquals(0, readLatch.getCount());
+ readLatch.await();
+ log.info("Reading file");
+ controller.open(FILE_NAME, 10);
+
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+
+ for (int i=0; i<NUMBER_LINES; i++)
+ {
+ newBuffer.clear();
+ addString ("Str value " + i + "\n", newBuffer);
+ for (int j=newBuffer.position(); j<newBuffer.capacity()-1;j++)
+ {
+ newBuffer.put((byte)' ');
+ }
+ newBuffer.put((byte)'\n');
+
+
+ CountDownLatch latch = new CountDownLatch(1);
+ LocalAIO aio = new LocalAIO(latch);
+ controller.read(i * SIZE, SIZE, buffer, aio);
+ latch.await();
+ assertFalse(aio.errorCalled);
+ assertTrue(aio.doneCalled);
+
+ byte bytesRead[] = new byte[SIZE];
+ byte bytesCompare[] = new byte[SIZE];
+
+ newBuffer.rewind();
+ newBuffer.get(bytesCompare);
+ buffer.rewind();
+ buffer.get(bytesRead);
+
+ for (int count=0;count<SIZE;count++)
+ {
+ assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
+ }
+
+
+ //byte[] byteCompare = new byte[SIZE];
+ //byte[] byteRead = new byte[SIZE];
+
+ assertTrue(buffer.equals(newBuffer));
+ }
+
+ }
+ finally
+ {
+ try { controller.close(); } catch (Throwable ignored){}
+
+ }
+
+ }
+
/**
* This method is not used unless you uncomment testValidateData
* The purpose of this method is to verify if the information generated by one of the write methods is correct
More information about the jboss-cvs-commits
mailing list