[jboss-cvs] JBoss Messaging SVN: r3731 - projects/jaio/trunk/jaio/native/src.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Feb 18 10:20:49 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-02-18 10:20:49 -0500 (Mon, 18 Feb 2008)
New Revision: 3731
Modified:
projects/jaio/trunk/jaio/native/src/AIOController.h
projects/jaio/trunk/jaio/native/src/BufferAdapter.h
projects/jaio/trunk/jaio/native/src/FileOutput.cpp
projects/jaio/trunk/jaio/native/src/FileOutput.h
projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.cpp
projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h
projects/jaio/trunk/jaio/native/src/LibAIOController.cpp
projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h
Log:
event handling
Modified: projects/jaio/trunk/jaio/native/src/AIOController.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/AIOController.h 2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/AIOController.h 2008-02-18 15:20:49 UTC (rev 3731)
@@ -13,6 +13,8 @@
jmethodID encode;
jmethodID decode;
jmethodID done;
+ jmethodID error;
+
int fileHandle;
JNIEnv * env;
FileOutput fileOutput;
Modified: projects/jaio/trunk/jaio/native/src/BufferAdapter.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/BufferAdapter.h 2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/BufferAdapter.h 2008-02-18 15:20:49 UTC (rev 3731)
@@ -43,5 +43,6 @@
virtual void encode(const int& size, void *) = 0;
virtual void done() = 0;
virtual int blockSize() = 0;
+ virtual void onError(long error, std::string error)=0;
};
#endif /*BUFFERADAPTER_H_*/
Modified: projects/jaio/trunk/jaio/native/src/FileOutput.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/FileOutput.cpp 2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/FileOutput.cpp 2008-02-18 15:20:49 UTC (rev 3731)
@@ -8,9 +8,13 @@
#include <fcntl.h>
#include "FileOutput.h"
#include "AIOException.h"
+#include "pthread.h"
+#include "LockClass.h"
//#define SYNC
+//#define CALLBACK_THREAD
+
std::string io_error(int rc)
{
std::stringstream buffer;
@@ -24,8 +28,9 @@
}
-FileOutput::FileOutput(std::string & _fileName) : aioContext(0), filePointer(0)
+FileOutput::FileOutput(std::string & _fileName) : aioContext(0), filePointer(0), events(0)
{
+ ::pthread_mutex_init(&fileMutex,0);
fileName = _fileName;
std::cout << "Initializing FileOutput " << aioContext << "\n";
if (io_queue_init(MAX_IO, &aioContext))
@@ -38,6 +43,7 @@
fileHandle = open(fileName.data(), O_WRONLY | O_CREAT, 0666);
#else
fileHandle = open(fileName.data(), O_WRONLY | O_CREAT | O_DIRECT, 0666);
+ events = (struct io_event *)malloc (MAX_IO * sizeof (struct io_event));
#endif
if (fileHandle < 0)
{
@@ -48,6 +54,8 @@
FileOutput::~FileOutput()
{
+ ::pthread_mutex_destroy(&fileMutex);
+ free(events);
if (io_queue_release(aioContext))
{
throw AIOException(2,"Can't release aio");
@@ -93,10 +101,68 @@
}
+void FileOutput::pollEvents()
+{
+#ifndef SYNC
+ LockClass lock(&fileMutex);
+ int result = io_getevents(this->aioContext,0, MAX_IO, events, 0);
+ if (result)
+ {
+ std::cout << "poll being caleld result is " << result << "\n";
+ std::cout.flush();
+ }
+
+ if (result < 0)
+ {
+ // no results
+ if (result == EINTR)
+ {
+ std::cout << "Events are empty\n";
+ return;
+ }
+
+ throw AIOException(55, io_error(result));
+ }
+
+ for (int i=0; i<result; i++)
+ {
+
+ struct iocb * iocbp = events[i].obj;
+
+ std::list<BufferAdapter *> * list = (std::list<BufferAdapter *> *)(iocbp->data);
+
+ std::cout << "Recovered data = " << ((long)iocbp->data) << "\n";
+ std::cout.flush();
+ long result = events[i].res;
+ if (result < 0)
+ {
+ std::string strerror = io_error(result);
+ for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
+ {
+ (*iter)->onError(result, strerror);
+ }
+ }
+ else
+ {
+ for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
+ {
+ (*iter)->completeBlock();
+ }
+ }
+
+ free(iocbp->u.c.buf);
+ delete list;
+ }
+
+#endif
+}
+
// Used only when Paging is disabled (as the user have this option to opt out for Paging)
void FileOutput::addData(BufferAdapter * adapter)
{
+ LockClass lock(&fileMutex);
+
int size = adapter->blockSize();
#ifdef SYNC
void * buffer = malloc(size + sizeof (BufferAdapter *));
@@ -116,18 +182,23 @@
adapter->done();
#else
void * buffer;
- if (posix_memalign(&buffer, 512, size + 512))
+ if (posix_memalign(&buffer, 512, size))
{
throw AIOException(10, "Error on posix_memalign");
}
+ std::list<BufferAdapter *> * list = new std::list<BufferAdapter *>();
+ list->push_back(adapter);
+
//void * buffer = malloc(size + sizeof (BufferAdapter *));
adapter->encode(size, buffer);
memcpy (((char *)buffer) + size, &adapter, sizeof (adapter));
struct iocb * iocb = new struct iocb();
::io_prep_pwrite(iocb, fileHandle, buffer, size, filePointer);
- ::io_set_callback(iocb, &callbackDirect);
+ iocb->data = (void *) list;
+ std::cout << "Set data = " << ((long)iocb->data) << "\n";
+
filePointer+=size;
int result = ::io_submit(aioContext, 1, &iocb);
Modified: projects/jaio/trunk/jaio/native/src/FileOutput.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/FileOutput.h 2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/FileOutput.h 2008-02-18 15:20:49 UTC (rev 3731)
@@ -5,16 +5,22 @@
#include "DataManager.h"
#include <string>
#include <libaio.h>
+#include <pthread.h>
-#define MAX_IO 3000
+#define MAX_IO 300
class FileOutput : public PageObserver, DataManager
{
private:
+ struct io_event *events;
int fileHandle;
std::string fileName;
io_context_t aioContext;
off_t filePointer;
+
+ pthread_mutex_t fileMutex;
+
+
public:
FileOutput(std::string & _fileName);
virtual ~FileOutput();
@@ -27,10 +33,7 @@
}
// Nothing to be done on FileOutput
- void pollEvents()
- {
- io_queue_run(aioContext);
- }
+ void pollEvents();
void validateLowRate()
{
Modified: projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.cpp 2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.cpp 2008-02-18 15:20:49 UTC (rev 3731)
@@ -5,27 +5,37 @@
JNIBufferAdapter::JNIBufferAdapter(AIOController * _controller, jobject _obj) : BufferAdapter(), refs(1)
{
controller = _controller;
- obj = controller->env->NewGlobalRef(_obj);
+ obj = controller->env->NewGlobalRef(_obj); // restore me
+ //obj = _obj; // delete me
}
JNIBufferAdapter::~JNIBufferAdapter()
{
- controller->env->DeleteGlobalRef(obj);
+ std::cout << " Deleting globalRef\n";
+ controller->env->DeleteGlobalRef(obj); // restore me
}
-
void JNIBufferAdapter::encode(const int& size, void * bufferAddress)
{
jobject buffer = controller->env->NewDirectByteBuffer(bufferAddress, size);
controller->env->CallVoidMethod(obj, controller->encode, buffer);
}
+
void JNIBufferAdapter::done()
{
+ std::cout << "Done called internally\n";
controller->env->CallVoidMethod(obj,controller->done);
return;
}
+
int JNIBufferAdapter::blockSize()
{
return controller->env->CallIntMethod(obj,controller->encodeSize);
}
+void JNIBufferAdapter::onError(long errorCode, std::string error)
+{
+ std::cout<< "Calling onError with errorCode " << errorCode << "\n";
+ jstring strError = controller->env->NewStringUTF(error.data());
+ controller->env->CallVoidMethod(obj, controller->error, (jint)errorCode, strError);
+}
Modified: projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h 2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h 2008-02-18 15:20:49 UTC (rev 3731)
@@ -17,6 +17,7 @@
void encode(const int& size, void *);
void done();
int blockSize();
+ void onError(long error, std::string error);
void addref()
{
refs++;
Modified: projects/jaio/trunk/jaio/native/src/LibAIOController.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/LibAIOController.cpp 2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/LibAIOController.cpp 2008-02-18 15:20:49 UTC (rev 3731)
@@ -33,15 +33,18 @@
if (!controller->done) return 0;
controller->encode = env->GetMethodID(clazz,"encode","(Ljava/nio/ByteBuffer;)V");
- if (!controller->done) return 0;
+ if (!controller->encode) return 0;
controller->decode = env->GetMethodID(clazz,"decode","(ILjava/nio/ByteBuffer;)V");
- if (!controller->done) return 0;
+ if (!controller->decode) return 0;
controller->encodeSize = env->GetMethodID(clazz,"encodeSize","()I");
- if (!controller->done) return 0;
+ if (!controller->encodeSize) return 0;
+ controller->error = env->GetMethodID(clazz, "onError", "(ILjava/lang/String;)V");
+ if (!controller->error) return 0;
+
controller->env = env;
return (jlong)controller;
@@ -77,7 +80,9 @@
controller->encodeSize = env->GetMethodID(clazz,"encodeSize","()I");
if (!controller->done) return 0;
+ controller->error = env->GetMethodID(clazz,"onError","(ILjava/lang/String;)V");
+
controller->env = env;
return (jlong)controller;
@@ -107,7 +112,7 @@
-JNIEXPORT void Java_org_jboss_jaio_libaioimpl_LibAIOController_pollEvents
+JNIEXPORT void Java_org_jboss_jaio_libaioimpl_LibAIOController_internalPollEvents
(JNIEnv *env, jclass, jlong controllerAddress)
{
try
Modified: projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h 2008-02-18 14:21:09 UTC (rev 3730)
+++ projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h 2008-02-18 15:20:49 UTC (rev 3731)
@@ -41,10 +41,10 @@
/*
* Class: org_jboss_jaio_libaioimpl_LibAIOController
- * Method: pollEvents
+ * Method: internalPollEvents
* Signature: (J)V
*/
-JNIEXPORT void JNICALL Java_org_jboss_jaio_libaioimpl_LibAIOController_pollEvents
+JNIEXPORT void JNICALL Java_org_jboss_jaio_libaioimpl_LibAIOController_internalPollEvents
(JNIEnv *, jclass, jlong);
/*
More information about the jboss-cvs-commits
mailing list