[jboss-cvs] JBoss Messaging SVN: r3735 - in projects/jaio/trunk/jaio/native: src and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Feb 19 20:59:00 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-02-19 20:59:00 -0500 (Tue, 19 Feb 2008)
New Revision: 3735
Modified:
projects/jaio/trunk/jaio/native/acinclude.m4
projects/jaio/trunk/jaio/native/configure.ac
projects/jaio/trunk/jaio/native/src/FileOutput.cpp
projects/jaio/trunk/jaio/native/src/FileOutput.h
projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h
projects/jaio/trunk/jaio/native/src/LibAIOController.cpp
projects/jaio/trunk/jaio/native/src/PageManager.cpp
projects/jaio/trunk/jaio/native/src/PageManager.h
projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h
Log:
Fixing async issues with pre-allocating and other tweaks
Modified: projects/jaio/trunk/jaio/native/acinclude.m4
===================================================================
--- projects/jaio/trunk/jaio/native/acinclude.m4 2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/acinclude.m4 2008-02-20 01:59:00 UTC (rev 3735)
@@ -1,3 +1,28 @@
+# serial 3
+# Find valid warning flags for the C Compiler. -*-Autoconf-*-
+dnl Copyright (C) 2001, 2002, 2006 Free Software Foundation, Inc.
+dnl This file is free software; the Free Software Foundation
+dnl gives unlimited permission to copy and/or distribute it,
+dnl with or without modifications, as long as this notice is preserved.
+dnl Written by Jesse Thilo.
+
+AC_DEFUN([gl_COMPILER_FLAGS],
+ [AC_MSG_CHECKING(whether compiler accepts $1)
+ AC_SUBST(COMPILER_FLAGS)
+ ac_save_CFLAGS="$CFLAGS"
+ CFLAGS="$CFLAGS $1"
+ ac_save_CXXFLAGS="$CXXFLAGS"
+ CXXFLAGS="$CXXFLAGS $1"
+ AC_TRY_COMPILE(,
+ [int x;],
+ COMPILER_FLAGS="$COMPILER_FLAGS $1"
+ AC_MSG_RESULT(yes),
+ AC_MSG_RESULT(no))
+ CFLAGS="$ac_save_CFLAGS"
+ CXXFLAGS="$ac_save_CXXFLAGS"
+ ])
+
+
AC_DEFUN([AC_JNI_INCLUDE_DIR],
[
JNI_INCLUDE_DIRS=""
Modified: projects/jaio/trunk/jaio/native/configure.ac
===================================================================
--- projects/jaio/trunk/jaio/native/configure.ac 2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/configure.ac 2008-02-20 01:59:00 UTC (rev 3735)
@@ -22,6 +22,21 @@
AC_MSG_ERROR([libaio-devel package missing. Please ensure both libaio and libaio-devel are installed. (hint: yum install libaio-devel should do it...)])
+gl_COMPILER_FLAGS(-Werror)
+gl_COMPILER_FLAGS(-pedantic)
+gl_COMPILER_FLAGS(-Wall)
+gl_COMPILER_FLAGS(-Wextra)
+gl_COMPILER_FLAGS(-Wno-shadow)
+gl_COMPILER_FLAGS(-Wpointer-arith)
+gl_COMPILER_FLAGS(-Wcast-qual)
+gl_COMPILER_FLAGS(-Wcast-align)
+gl_COMPILER_FLAGS(-Wno-long-long)
+gl_COMPILER_FLAGS(-Wvolatile-register-var)
+gl_COMPILER_FLAGS(-Winvalid-pch)
+gl_COMPILER_FLAGS(-Wno-system-headers)
+
+CPPFLAGS="$CPPFLAGS $COMPILER_FLAGS"
+
# Checks for programs.
AC_PROG_CXX
AC_PROG_CC
Modified: projects/jaio/trunk/jaio/native/src/FileOutput.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/FileOutput.cpp 2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/FileOutput.cpp 2008-02-20 01:59:00 UTC (rev 3735)
@@ -1,3 +1,9 @@
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+
#include <stdlib.h>
#include <list>
#include <iostream>
@@ -28,9 +34,10 @@
}
-FileOutput::FileOutput(std::string & _fileName) : aioContext(0), filePointer(0), events(0), added(0), received(0)
+FileOutput::FileOutput(std::string & _fileName) : aioContext(0), filePointer(0), events(0), added(0), received(0), pollerRunning(0)
{
::pthread_mutex_init(&fileMutex,0);
+ ::pthread_mutex_init(&pollerMutex,0);
fileName = _fileName;
if (io_queue_init(MAX_IO, &aioContext))
{
@@ -40,7 +47,8 @@
#ifdef SYNC
fileHandle = open(fileName.data(), O_WRONLY | O_CREAT, 0666);
#else
- fileHandle = open(fileName.data(), O_WRONLY | O_CREAT | O_DIRECT, 0666);
+ //fileHandle = open(fileName.data(), O_WRONLY | O_CREAT, 0666);
+ fileHandle = ::open(fileName.data(), O_WRONLY | O_CREAT | O_DIRECT, 0666);
events = (struct io_event *)malloc (MAX_IO * sizeof (struct io_event));
#endif
if (fileHandle < 0)
@@ -53,12 +61,13 @@
FileOutput::~FileOutput()
{
::pthread_mutex_destroy(&fileMutex);
+ ::pthread_mutex_destroy(&pollerMutex);
free(events);
if (io_queue_release(aioContext))
{
throw AIOException(2,"Can't release aio");
}
- if (close(fileHandle))
+ if (::close(fileHandle))
{
throw AIOException(2,"Can't close file");
}
@@ -99,89 +108,153 @@
}*/
+
+void FileOutput::stopPoller()
+{
+ pollerRunning = 0;
+ fprintf (stderr,"Setting poller to stop\n"); fflush(stderr);
+ // It will wait the Poller to gives up its lock
+ LockClass lock(&fileMutex);
+}
+
+
void FileOutput::pollEvents(THREAD_CONTEXT threadContext)
{
-#ifndef SYNC
+
LockClass lock(&fileMutex);
- int result = io_getevents(this->aioContext,0, MAX_IO, events, 0);
+ pollerRunning=1;
- if (result)
- {
- received += result;
- std::cout << "\nAdded=" << added << " received=" << received << "\n";
- std::cout.flush();
- }
-
- if (result < 0)
- {
- // no results
- if (result == EINTR)
- {
- std::cout << "Events are empty\n";
- return;
- }
-
- throw AIOException(55, io_error(result));
- }
+ struct timespec oneSecond;
+ oneSecond.tv_sec = 1;
+ oneSecond.tv_nsec = 0;
- for (int i=0; i<result; i++)
+
+ while (pollerRunning)
{
+ int result = io_getevents(this->aioContext, 1, MAX_IO, events, &oneSecond);
- struct iocb * iocbp = events[i].obj;
-
- std::list<BufferAdapter *> * list = (std::list<BufferAdapter *> *)(iocbp->data);
+#ifdef DEBUG
+ fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
+#endif
- long result = events[i].res;
- if (result < 0)
+ if (result > 0)
{
- std::string strerror = io_error(result);
- for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
- {
- (*iter)->onError(threadContext, result, strerror);
- }
+ received += result;
+
+#ifdef DEBUG
+ fprintf (stdout, "Received %d events\n", result);
+ fflush(stdout);
+#endif
}
- else
+
+ for (int i=0; i<result; i++)
{
- for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
+
+ struct iocb * iocbp = events[i].obj;
+
+ std::list<BufferAdapter *> * list = (std::list<BufferAdapter *> *)(iocbp->data);
+
+ long result = events[i].res;
+ if (result < 0)
{
- (*iter)->completeBlock(threadContext);
- (*iter)->deleteRef(threadContext);
+ std::string strerror = io_error(result);
+ for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
+ {
+ (*iter)->onError(threadContext, result, strerror);
+ }
}
+ else
+ {
+ for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
+ {
+ (*iter)->completeBlock(threadContext);
+ (*iter)->deleteRef(threadContext);
+ }
+ }
+
+ free(iocbp->u.c.buf);
+ delete list;
+ free(iocbp);
}
-
- free(iocbp->u.c.buf);
- delete list;
}
-#endif
+ fprintf (stdout, "Poller finished execution\n");
+ fflush(stdout);
+
}
+
+void FileOutput::preAllocate(int blocks, size_t size)
+{
+ size_t currentSize = lseek (fileHandle, 0, SEEK_END);
+
+ std::cout << "currentSize := " << currentSize << "\n";
+ if (currentSize >= blocks * size)
+ {
+ std::cout << "File being reused as requested size=" << size*blocks << " is equals or lower than current file Size=" << currentSize << "\n";
+ std::cout.flush();
+ filePointer=0;
+ return;
+ }
+
+ if (size % 512 != 0)
+ {
+ throw AIOException (101, "You can only pre allocate files in multiples of 512");
+ }
+
+ void * preAllocBuffer = 0;
+ if (posix_memalign(&preAllocBuffer, 512, size))
+ {
+ throw AIOException(10, "Error on posix_memalign");
+ }
+
+ memset(preAllocBuffer, 0, size);
+
+
+ if (::lseek (fileHandle, 0, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+
+ for (int i=0; i<blocks; i++)
+ {
+ if (::write(fileHandle, preAllocBuffer, size)<0)
+ {
+ throw AIOException (12, "Error pre allocating the file");
+ }
+ }
+
+ if (::lseek (fileHandle, 0, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+
+ free (preAllocBuffer);
+
+ filePointer = 0;
+}
+
// Used only when Paging is disabled (as the user have this option to opt out for Paging)
void FileOutput::addData(THREAD_CONTEXT threadContext, BufferAdapter * adapter)
{
int size = adapter->blockSize(threadContext);
- LockClass lock(&fileMutex);
+ //LockClass lock (&fileMutex);
+
added++;
-#ifdef SYNC
- void * buffer = malloc(size);
- adapter->encode(threadContext, size, buffer);
- int error=::write(fileHandle, buffer, size);
-
- if (error<0)
- {
- throw AIOException (21, io_error(error));
- }
- free(buffer);
- error = ::fsync(fileHandle);
- if (error)
- {
- throw AIOException(20, io_error(error));
- }
- adapter->done();
-#else
+//#ifdef SYNC
+// void * buffer = malloc(size);
+// adapter->encode(threadContext, size, buffer);
+// int error=::write(fileHandle, buffer, size);
+//
+// if (error<0)
+// {
+// throw AIOException (21, io_error(error));
+// }
+// free(buffer);
+// error = ::fsync(fileHandle);
+// if (error)
+// {
+// throw AIOException(20, io_error(error));
+// }
+// adapter->done(threadContext);
+//#else
void * buffer = 0;
//void * buffer = malloc(size + sizeof (BufferAdapter *));
if (posix_memalign(&buffer, 512, size))
@@ -197,16 +270,21 @@
struct iocb * iocb = new struct iocb();
::io_prep_pwrite(iocb, fileHandle, buffer, size, filePointer);
iocb->data = (void *) list;
-
filePointer+=size;
-
+ if (size > (1024*1024)) // deleteme
+ {
+ std::cout << "submit....."; std::cout.flush();
+ }
int result = ::io_submit(aioContext, 1, &iocb);
+ if (size > (1024*1024)) // deleteme
+ {
+ std::cout << "done\n"; std::cout.flush();
+ }
if (result<0)
{
std::stringstream str;
str<< "Problem on submit block, errorCode=" << result;
throw AIOException (6, str.str());
}
-#endif
}
Modified: projects/jaio/trunk/jaio/native/src/FileOutput.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/FileOutput.h 2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/FileOutput.h 2008-02-20 01:59:00 UTC (rev 3735)
@@ -9,23 +9,26 @@
#include "DataManager.h"
#include "JAIODatatypes.h"
-#define MAX_IO 3000
+#define MAX_IO 20000
class FileOutput : public PageObserver, DataManager
{
private:
+ io_context_t aioContext;
+ off_t filePointer;
struct io_event *events;
int fileHandle;
std::string fileName;
- io_context_t aioContext;
- off_t filePointer;
pthread_mutex_t fileMutex;
+ pthread_mutex_t pollerMutex;
int added;
int received;
+ bool pollerRunning;
+
public:
FileOutput(std::string & _fileName);
virtual ~FileOutput();
@@ -37,10 +40,16 @@
return fileHandle;
}
+
+ // Finishes the polling thread (if any) and return
+ void stopPoller();
+
+ void preAllocate(int numberOfBlocks, size_t size);
+
// Nothing to be done on FileOutput
void pollEvents(THREAD_CONTEXT threadContext);
- void validateLowRate(THREAD_CONTEXT threadContext)
+ void validateLowRate(THREAD_CONTEXT )
{
}
Modified: projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h 2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h 2008-02-20 01:59:00 UTC (rev 3735)
@@ -24,7 +24,7 @@
void done(THREAD_CONTEXT threadContext);
int blockSize(THREAD_CONTEXT threadContext);
void onError(THREAD_CONTEXT threadContext, long error, std::string error);
- void addref(THREAD_CONTEXT threadContextt)
+ void addref(THREAD_CONTEXT )
{
refs++;
}
Modified: projects/jaio/trunk/jaio/native/src/LibAIOController.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/LibAIOController.cpp 2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/LibAIOController.cpp 2008-02-20 01:59:00 UTC (rev 3735)
@@ -145,6 +145,7 @@
try
{
AIOController * controller = (AIOController *) controllerAddress;
+ controller->fileOutput.stopPoller();
delete controller;
}
catch (AIOException& e)
@@ -152,3 +153,17 @@
throwException(env, "java/lang/RuntimeException", e.what());
}
}
+
+JNIEXPORT void JNICALL Java_org_jboss_jaio_libaioimpl_LibAIOController_preAllocate
+ (JNIEnv * env, jclass, jlong controllerAddress, jint blocks, jlong size)
+{
+ try
+ {
+ AIOController * controller = (AIOController *) controllerAddress;
+ controller->fileOutput.preAllocate(blocks, size);
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, "java/lang/RuntimeException", e.what());
+ }
+}
Modified: projects/jaio/trunk/jaio/native/src/PageManager.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/PageManager.cpp 2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/PageManager.cpp 2008-02-20 01:59:00 UTC (rev 3735)
@@ -93,7 +93,7 @@
initMemory();
}
-void PageManager::pollEvents(THREAD_CONTEXT threadContext)
+void PageManager::pollEvents(THREAD_CONTEXT)
{
//observer->completePage(pagePosition, memoryArea, list);
//initMemory();
Modified: projects/jaio/trunk/jaio/native/src/PageManager.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/PageManager.h 2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/PageManager.h 2008-02-20 01:59:00 UTC (rev 3735)
@@ -34,7 +34,7 @@
void destroy(THREAD_CONTEXT threadContext);
- void validateLowRate(THREAD_CONTEXT threadContext)
+ void validateLowRate(THREAD_CONTEXT)
{
}
Modified: projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h 2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h 2008-02-20 01:59:00 UTC (rev 3735)
@@ -33,6 +33,14 @@
/*
* Class: org_jboss_jaio_libaioimpl_LibAIOController
+ * Method: preAllocate
+ * Signature: (JIJ)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_jaio_libaioimpl_LibAIOController_preAllocate
+ (JNIEnv *, jclass, jlong, jint, jlong);
+
+/*
+ * Class: org_jboss_jaio_libaioimpl_LibAIOController
* Method: closeInternal
* Signature: (J)V
*/
More information about the jboss-cvs-commits
mailing list