[jboss-cvs] JBoss Messaging SVN: r3731 - projects/jaio/trunk/jaio/native/src.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Feb 18 10:20:49 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-02-18 10:20:49 -0500 (Mon, 18 Feb 2008)
New Revision: 3731

Modified:
   projects/jaio/trunk/jaio/native/src/AIOController.h
   projects/jaio/trunk/jaio/native/src/BufferAdapter.h
   projects/jaio/trunk/jaio/native/src/FileOutput.cpp
   projects/jaio/trunk/jaio/native/src/FileOutput.h
   projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.cpp
   projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h
   projects/jaio/trunk/jaio/native/src/LibAIOController.cpp
   projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h
Log:
event handling

Modified: projects/jaio/trunk/jaio/native/src/AIOController.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/AIOController.h	2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/AIOController.h	2008-02-18 15:20:49 UTC (rev 3731)
@@ -13,6 +13,8 @@
 	jmethodID encode;
 	jmethodID decode;
 	jmethodID done;
+	jmethodID error;
+	
 	int fileHandle;
 	JNIEnv * env;
 	FileOutput fileOutput;

Modified: projects/jaio/trunk/jaio/native/src/BufferAdapter.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/BufferAdapter.h	2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/BufferAdapter.h	2008-02-18 15:20:49 UTC (rev 3731)
@@ -43,5 +43,6 @@
 	virtual void encode(const int& size, void *) = 0;
 	virtual void done() = 0;
 	virtual int blockSize() = 0;
+	virtual void onError(long error, std::string error)=0;
 };
 #endif /*BUFFERADAPTER_H_*/

Modified: projects/jaio/trunk/jaio/native/src/FileOutput.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/FileOutput.cpp	2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/FileOutput.cpp	2008-02-18 15:20:49 UTC (rev 3731)
@@ -8,9 +8,13 @@
 #include <fcntl.h>
 #include "FileOutput.h"
 #include "AIOException.h"
+#include "pthread.h"
+#include "LockClass.h"
 
 //#define SYNC
+//#define CALLBACK_THREAD
 
+
 std::string io_error(int rc)
 {
 	std::stringstream buffer;
@@ -24,8 +28,9 @@
 }
 
 
-FileOutput::FileOutput(std::string & _fileName) : aioContext(0), filePointer(0)
+FileOutput::FileOutput(std::string & _fileName) : aioContext(0), filePointer(0), events(0)
 {
+	::pthread_mutex_init(&fileMutex,0);
 	fileName = _fileName;
 	std::cout << "Initializing FileOutput " << aioContext << "\n";
 	if (io_queue_init(MAX_IO, &aioContext))
@@ -38,6 +43,7 @@
 	fileHandle = open(fileName.data(),  O_WRONLY | O_CREAT, 0666);
 #else
 	fileHandle = open(fileName.data(),  O_WRONLY | O_CREAT | O_DIRECT, 0666);
+	events = (struct io_event *)malloc (MAX_IO * sizeof (struct io_event));
 #endif
 	if (fileHandle < 0)
 	{
@@ -48,6 +54,8 @@
 
 FileOutput::~FileOutput()
 {
+	::pthread_mutex_destroy(&fileMutex);
+	free(events);
 	if (io_queue_release(aioContext))
 	{
 		throw AIOException(2,"Can't release aio");
@@ -93,10 +101,68 @@
 	
 }
 
+void FileOutput::pollEvents()
+{
+#ifndef SYNC
+	LockClass lock(&fileMutex);
+	int result = io_getevents(this->aioContext,0, MAX_IO, events, 0);
 
+	if (result) 
+	{
+		std::cout << "poll being caleld result is " << result << "\n";
+		std::cout.flush();
+	}
+	
+	if (result < 0)
+	{
+		// no results
+		if (result == EINTR)
+		{
+			std::cout << "Events are empty\n";
+			return;
+		}
+		
+		throw AIOException(55, io_error(result));
+	}
+	
+	for (int i=0; i<result; i++)
+	{
+		
+		struct iocb * iocbp = events[i].obj;
+
+		std::list<BufferAdapter *> * list = (std::list<BufferAdapter *> *)(iocbp->data); 
+		
+		std::cout << "Recovered data = " << ((long)iocbp->data) << "\n";
+		std::cout.flush();
+		long result = events[i].res;
+		if (result < 0)
+		{
+			std::string strerror = io_error(result);
+			for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
+			{
+				(*iter)->onError(result, strerror);
+			}
+		}
+		else
+		{
+			for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
+			{
+				(*iter)->completeBlock();
+			}
+		}
+		
+		free(iocbp->u.c.buf);
+		delete list;
+	}
+	
+#endif
+}
+
 // Used only when Paging is disabled (as the user have this option to opt out for Paging)
 void FileOutput::addData(BufferAdapter * adapter)
 {
+	LockClass lock(&fileMutex);
+
 	int size = adapter->blockSize();
 #ifdef SYNC
 	void * buffer = malloc(size + sizeof (BufferAdapter *));
@@ -116,18 +182,23 @@
 	adapter->done();
 #else
 	void * buffer;
-	if (posix_memalign(&buffer, 512, size + 512))
+	if (posix_memalign(&buffer, 512, size))
 	{
 		throw AIOException(10, "Error on posix_memalign");
 	}
 	
+	std::list<BufferAdapter *> * list = new std::list<BufferAdapter *>();
+	list->push_back(adapter);
+	
 	//void * buffer = malloc(size + sizeof (BufferAdapter *));
 	adapter->encode(size, buffer);
 	memcpy (((char *)buffer) + size, &adapter, sizeof (adapter));
 	
 	struct iocb * iocb = new struct iocb();
 	::io_prep_pwrite(iocb, fileHandle, buffer, size, filePointer);
-	::io_set_callback(iocb, &callbackDirect);
+	iocb->data = (void *) list;
+	std::cout << "Set data = " << ((long)iocb->data) << "\n";
+
 	filePointer+=size;
 	
 	int result = ::io_submit(aioContext, 1, &iocb);

Modified: projects/jaio/trunk/jaio/native/src/FileOutput.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/FileOutput.h	2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/FileOutput.h	2008-02-18 15:20:49 UTC (rev 3731)
@@ -5,16 +5,22 @@
 #include "DataManager.h"
 #include <string>
 #include <libaio.h>
+#include <pthread.h>
 
-#define MAX_IO 3000
+#define MAX_IO 300
 
 class FileOutput : public PageObserver, DataManager
 {
 private:
+	struct io_event *events; 
 	int fileHandle;
 	std::string fileName;
 	io_context_t aioContext;
 	off_t filePointer;
+	
+	pthread_mutex_t fileMutex;
+	
+	
 public:
 	FileOutput(std::string & _fileName);
 	virtual ~FileOutput();
@@ -27,10 +33,7 @@
 	}
 	
 	// Nothing to be done on FileOutput 
-	void pollEvents()
-	{
-		io_queue_run(aioContext);
-	}
+	void pollEvents();
 	
 	void validateLowRate()
 	{

Modified: projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.cpp	2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.cpp	2008-02-18 15:20:49 UTC (rev 3731)
@@ -5,27 +5,37 @@
 JNIBufferAdapter::JNIBufferAdapter(AIOController * _controller, jobject _obj) : BufferAdapter(), refs(1)
 {
 	controller = _controller;
-	obj = controller->env->NewGlobalRef(_obj);
+	obj = controller->env->NewGlobalRef(_obj); // restore me
+	//obj = _obj; // delete me
 }
 
 JNIBufferAdapter::~JNIBufferAdapter()
 {
-	controller->env->DeleteGlobalRef(obj);
+	std::cout << " Deleting globalRef\n";
+	controller->env->DeleteGlobalRef(obj); // restore me
 }
 
-
 void JNIBufferAdapter::encode(const int& size, void * bufferAddress)
 {
 	jobject buffer = controller->env->NewDirectByteBuffer(bufferAddress, size);
 	controller->env->CallVoidMethod(obj, controller->encode, buffer);
 }
+
 void JNIBufferAdapter::done()
 {
+	std::cout << "Done called internally\n";
 	controller->env->CallVoidMethod(obj,controller->done); 
 	return;
 }
+
 int JNIBufferAdapter::blockSize()
 {
 	return controller->env->CallIntMethod(obj,controller->encodeSize); 
 }
 
+void JNIBufferAdapter::onError(long errorCode, std::string error)
+{
+	std::cout<< "Calling onError with errorCode " << errorCode << "\n";
+	jstring strError = controller->env->NewStringUTF(error.data());
+	controller->env->CallVoidMethod(obj, controller->error, (jint)errorCode, strError);
+}

Modified: projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h	2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h	2008-02-18 15:20:49 UTC (rev 3731)
@@ -17,6 +17,7 @@
 	void encode(const int& size, void *);
 	void done();
 	int blockSize();
+	void onError(long error, std::string error);
 	void addref()
 	{
 		refs++;

Modified: projects/jaio/trunk/jaio/native/src/LibAIOController.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/LibAIOController.cpp	2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/LibAIOController.cpp	2008-02-18 15:20:49 UTC (rev 3731)
@@ -33,15 +33,18 @@
 		if (!controller->done) return 0;
 		
 		controller->encode = env->GetMethodID(clazz,"encode","(Ljava/nio/ByteBuffer;)V");
-		if (!controller->done) return 0;
+		if (!controller->encode) return 0;
 		
 		controller->decode = env->GetMethodID(clazz,"decode","(ILjava/nio/ByteBuffer;)V");
-		if (!controller->done) return 0;
+		if (!controller->decode) return 0;
 		
 		controller->encodeSize = env->GetMethodID(clazz,"encodeSize","()I");
-		if (!controller->done) return 0;
+		if (!controller->encodeSize) return 0;
 		
+		controller->error = env->GetMethodID(clazz, "onError", "(ILjava/lang/String;)V");
+        if (!controller->error) return 0;
 		
+		
 	    controller->env = env;	
 		
 	    return (jlong)controller;
@@ -77,7 +80,9 @@
 		controller->encodeSize = env->GetMethodID(clazz,"encodeSize","()I");
 		if (!controller->done) return 0;
 		
+		controller->error = env->GetMethodID(clazz,"onError","(ILjava/lang/String;)V");
 		
+		
 	    controller->env = env;	
 		
 	    return (jlong)controller;
@@ -107,7 +112,7 @@
 
 
 
-JNIEXPORT void Java_org_jboss_jaio_libaioimpl_LibAIOController_pollEvents
+JNIEXPORT void Java_org_jboss_jaio_libaioimpl_LibAIOController_internalPollEvents
   (JNIEnv *env, jclass, jlong controllerAddress)
 {
 	try

Modified: projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h	2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h	2008-02-18 15:20:49 UTC (rev 3731)
@@ -41,10 +41,10 @@
 
 /*
  * Class:     org_jboss_jaio_libaioimpl_LibAIOController
- * Method:    pollEvents
+ * Method:    internalPollEvents
  * Signature: (J)V
  */
-JNIEXPORT void JNICALL Java_org_jboss_jaio_libaioimpl_LibAIOController_pollEvents
+JNIEXPORT void JNICALL Java_org_jboss_jaio_libaioimpl_LibAIOController_internalPollEvents
   (JNIEnv *, jclass, jlong);
 
 /*




More information about the jboss-cvs-commits mailing list