[jboss-cvs] JBoss Messaging SVN: r3735 - in projects/jaio/trunk/jaio/native: src and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Feb 19 20:59:00 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-02-19 20:59:00 -0500 (Tue, 19 Feb 2008)
New Revision: 3735

Modified:
   projects/jaio/trunk/jaio/native/acinclude.m4
   projects/jaio/trunk/jaio/native/configure.ac
   projects/jaio/trunk/jaio/native/src/FileOutput.cpp
   projects/jaio/trunk/jaio/native/src/FileOutput.h
   projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h
   projects/jaio/trunk/jaio/native/src/LibAIOController.cpp
   projects/jaio/trunk/jaio/native/src/PageManager.cpp
   projects/jaio/trunk/jaio/native/src/PageManager.h
   projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h
Log:
Fixing async issues with pre-allocating and other tweaks

Modified: projects/jaio/trunk/jaio/native/acinclude.m4
===================================================================
--- projects/jaio/trunk/jaio/native/acinclude.m4	2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/acinclude.m4	2008-02-20 01:59:00 UTC (rev 3735)
@@ -1,3 +1,28 @@
+# serial 3
+# Find valid warning flags for the C Compiler.           -*-Autoconf-*-
+dnl Copyright (C) 2001, 2002, 2006 Free Software Foundation, Inc.
+dnl This file is free software; the Free Software Foundation
+dnl gives unlimited permission to copy and/or distribute it,
+dnl with or without modifications, as long as this notice is preserved.
+dnl Written by Jesse Thilo.
+
+AC_DEFUN([gl_COMPILER_FLAGS],
+  [AC_MSG_CHECKING(whether compiler accepts $1)
+   AC_SUBST(COMPILER_FLAGS)
+   ac_save_CFLAGS="$CFLAGS"
+   CFLAGS="$CFLAGS $1"
+   ac_save_CXXFLAGS="$CXXFLAGS"
+   CXXFLAGS="$CXXFLAGS $1"
+   AC_TRY_COMPILE(,
+    [int x;],
+    COMPILER_FLAGS="$COMPILER_FLAGS $1"
+    AC_MSG_RESULT(yes),
+    AC_MSG_RESULT(no))
+  CFLAGS="$ac_save_CFLAGS"
+  CXXFLAGS="$ac_save_CXXFLAGS"
+ ])
+
+
 AC_DEFUN([AC_JNI_INCLUDE_DIR],
 [
    JNI_INCLUDE_DIRS=""

Modified: projects/jaio/trunk/jaio/native/configure.ac
===================================================================
--- projects/jaio/trunk/jaio/native/configure.ac	2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/configure.ac	2008-02-20 01:59:00 UTC (rev 3735)
@@ -22,6 +22,21 @@
   AC_MSG_ERROR([libaio-devel package missing. Please ensure both libaio and libaio-devel are installed. (hint: yum install libaio-devel should do it...)])
 	
 
+gl_COMPILER_FLAGS(-Werror)
+gl_COMPILER_FLAGS(-pedantic)
+gl_COMPILER_FLAGS(-Wall)
+gl_COMPILER_FLAGS(-Wextra)
+gl_COMPILER_FLAGS(-Wno-shadow)
+gl_COMPILER_FLAGS(-Wpointer-arith)
+gl_COMPILER_FLAGS(-Wcast-qual)
+gl_COMPILER_FLAGS(-Wcast-align)
+gl_COMPILER_FLAGS(-Wno-long-long)
+gl_COMPILER_FLAGS(-Wvolatile-register-var)
+gl_COMPILER_FLAGS(-Winvalid-pch)
+gl_COMPILER_FLAGS(-Wno-system-headers)
+
+CPPFLAGS="$CPPFLAGS $COMPILER_FLAGS"
+
 # Checks for programs.
 AC_PROG_CXX
 AC_PROG_CC

Modified: projects/jaio/trunk/jaio/native/src/FileOutput.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/FileOutput.cpp	2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/FileOutput.cpp	2008-02-20 01:59:00 UTC (rev 3735)
@@ -1,3 +1,9 @@
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+
 #include <stdlib.h>
 #include <list>
 #include <iostream>
@@ -28,9 +34,10 @@
 }
 
 
-FileOutput::FileOutput(std::string & _fileName) : aioContext(0), filePointer(0), events(0), added(0), received(0)
+FileOutput::FileOutput(std::string & _fileName) : aioContext(0), filePointer(0), events(0), added(0), received(0), pollerRunning(0)
 {
 	::pthread_mutex_init(&fileMutex,0);
+	::pthread_mutex_init(&pollerMutex,0);
 	fileName = _fileName;
 	if (io_queue_init(MAX_IO, &aioContext))
 	{
@@ -40,7 +47,8 @@
 #ifdef SYNC
 	fileHandle = open(fileName.data(),  O_WRONLY | O_CREAT, 0666);
 #else
-	fileHandle = open(fileName.data(),  O_WRONLY | O_CREAT | O_DIRECT, 0666);
+	//fileHandle = open(fileName.data(),  O_WRONLY | O_CREAT, 0666);
+	fileHandle = ::open(fileName.data(),  O_WRONLY | O_CREAT | O_DIRECT, 0666);
 	events = (struct io_event *)malloc (MAX_IO * sizeof (struct io_event));
 #endif
 	if (fileHandle < 0)
@@ -53,12 +61,13 @@
 FileOutput::~FileOutput()
 {
 	::pthread_mutex_destroy(&fileMutex);
+	::pthread_mutex_destroy(&pollerMutex);
 	free(events);
 	if (io_queue_release(aioContext))
 	{
 		throw AIOException(2,"Can't release aio");
 	}
-	if (close(fileHandle))
+	if (::close(fileHandle))
 	{
 		throw AIOException(2,"Can't close file");
 	}
@@ -99,89 +108,153 @@
 	
 }*/
 
+
+void FileOutput::stopPoller()
+{
+	pollerRunning = 0;
+	fprintf (stderr,"Setting poller to stop\n"); fflush(stderr);
+	// It will wait the Poller to gives up its lock
+	LockClass lock(&fileMutex);
+}
+
+
 void FileOutput::pollEvents(THREAD_CONTEXT threadContext)
 {
-#ifndef SYNC
+	
 	LockClass lock(&fileMutex);
-	int result = io_getevents(this->aioContext,0, MAX_IO, events, 0);
+	pollerRunning=1;
 	
-	if (result)
-	{
-		received += result;
-		std::cout << "\nAdded=" << added << " received=" << received << "\n";
-		std::cout.flush();
-	}
-
-	if (result < 0)
-	{
-		// no results
-		if (result == EINTR)
-		{
-			std::cout << "Events are empty\n";
-			return;
-		}
-		
-		throw AIOException(55, io_error(result));
-	}
+	struct timespec oneSecond;
+	oneSecond.tv_sec = 1;
+	oneSecond.tv_nsec = 0;
 	
-	for (int i=0; i<result; i++)
+	
+	while (pollerRunning)
 	{
+		int result = io_getevents(this->aioContext, 1, MAX_IO, events, &oneSecond);
 		
-		struct iocb * iocbp = events[i].obj;
-
-		std::list<BufferAdapter *> * list = (std::list<BufferAdapter *> *)(iocbp->data); 
+#ifdef DEBUG
+		fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
+#endif
 		
-		long result = events[i].res;
-		if (result < 0)
+		if (result > 0)
 		{
-			std::string strerror = io_error(result);
-			for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
-			{
-				(*iter)->onError(threadContext, result, strerror);
-			}
+			received += result;
+			
+#ifdef DEBUG
+			fprintf (stdout, "Received %d events\n", result);
+			fflush(stdout);
+#endif
 		}
-		else
+
+		for (int i=0; i<result; i++)
 		{
-			for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
+			
+			struct iocb * iocbp = events[i].obj;
+	
+			std::list<BufferAdapter *> * list = (std::list<BufferAdapter *> *)(iocbp->data); 
+			
+			long result = events[i].res;
+			if (result < 0)
 			{
-				(*iter)->completeBlock(threadContext);
-				(*iter)->deleteRef(threadContext);
+				std::string strerror = io_error(result);
+				for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
+				{
+					(*iter)->onError(threadContext, result, strerror);
+				}
 			}
+			else
+			{
+				for (std::list<BufferAdapter *>::iterator iter = list->begin(); iter != list->end(); iter ++)
+				{
+					(*iter)->completeBlock(threadContext);
+					(*iter)->deleteRef(threadContext);
+				}
+			}
+			
+			free(iocbp->u.c.buf);
+			delete list;
+			free(iocbp);
 		}
-		
-		free(iocbp->u.c.buf);
-		delete list;
 	}
 	
-#endif
+	fprintf (stdout, "Poller finished execution\n");
+	fflush(stdout);
+	
 }
 
+
+void FileOutput::preAllocate(int blocks, size_t size)
+{
+	size_t currentSize = lseek (fileHandle, 0, SEEK_END);
+	
+	std::cout << "currentSize := " << currentSize << "\n";
+	if (currentSize >= blocks * size)
+	{
+		std::cout << "File being reused as requested size=" << size*blocks << " is equals or lower than current file Size=" << currentSize << "\n";
+		std::cout.flush();
+		filePointer=0;
+		return;
+	}
+	
+	if (size % 512 != 0)
+	{
+		throw AIOException (101, "You can only pre allocate files in multiples of 512");
+	}
+	
+	void * preAllocBuffer = 0;
+	if (posix_memalign(&preAllocBuffer, 512, size))
+	{
+		throw AIOException(10, "Error on posix_memalign");
+	}
+	
+	memset(preAllocBuffer, 0, size);
+	
+	
+	if (::lseek (fileHandle, 0, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+	
+	for (int i=0; i<blocks; i++)
+	{
+		if (::write(fileHandle, preAllocBuffer, size)<0)
+		{
+			throw AIOException (12, "Error pre allocating the file");
+		}
+	}
+	
+	if (::lseek (fileHandle, 0, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+	
+	free (preAllocBuffer);
+	
+	filePointer = 0;
+}
+
 // Used only when Paging is disabled (as the user have this option to opt out for Paging)
 void FileOutput::addData(THREAD_CONTEXT threadContext, BufferAdapter * adapter)
 {
 	int size = adapter->blockSize(threadContext);
 
-	LockClass lock(&fileMutex);
+	//LockClass lock (&fileMutex);
 	
+	
 	added++;
 
-#ifdef SYNC
-	void * buffer = malloc(size);
-	adapter->encode(threadContext, size, buffer);
-	int error=::write(fileHandle, buffer, size);
-	
-	if (error<0)
-	{
-		throw AIOException (21, io_error(error));
-	}
-	free(buffer);
-	error = ::fsync(fileHandle);
-	if (error)
-	{
-		throw AIOException(20, io_error(error));
-	}
-	adapter->done();
-#else
+//#ifdef SYNC
+//	void * buffer = malloc(size);
+//	adapter->encode(threadContext, size, buffer);
+//	int error=::write(fileHandle, buffer, size);
+//	
+//	if (error<0)
+//	{
+//		throw AIOException (21, io_error(error));
+//	}
+//	free(buffer);
+//	error = ::fsync(fileHandle);
+//	if (error)
+//	{
+//		throw AIOException(20, io_error(error));
+//	}
+//	adapter->done(threadContext);
+//#else
 	void * buffer = 0;
 	//void * buffer = malloc(size + sizeof (BufferAdapter *));
 	if (posix_memalign(&buffer, 512, size))
@@ -197,16 +270,21 @@
 	struct iocb * iocb = new struct iocb();
 	::io_prep_pwrite(iocb, fileHandle, buffer, size, filePointer);
 	iocb->data = (void *) list;
-
 	filePointer+=size;
-	
+	if (size > (1024*1024)) // deleteme
+	{
+		std::cout << "submit....."; std::cout.flush();
+	}
 	int result = ::io_submit(aioContext, 1, &iocb);
+	if (size > (1024*1024)) // deleteme
+	{
+		std::cout << "done\n"; std::cout.flush();
+	}
 	if (result<0)
 	{
 		std::stringstream str;
 		str<< "Problem on submit block, errorCode=" << result;
 		throw AIOException (6, str.str());
 	}
-#endif
 }
 

Modified: projects/jaio/trunk/jaio/native/src/FileOutput.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/FileOutput.h	2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/FileOutput.h	2008-02-20 01:59:00 UTC (rev 3735)
@@ -9,23 +9,26 @@
 #include "DataManager.h"
 #include "JAIODatatypes.h"
 
-#define MAX_IO 3000
+#define MAX_IO 20000
 
 class FileOutput : public PageObserver, DataManager
 {
 private:
+	io_context_t aioContext;
+	off_t filePointer;
 	struct io_event *events; 
 	int fileHandle;
 	std::string fileName;
-	io_context_t aioContext;
-	off_t filePointer;
 	
 	pthread_mutex_t fileMutex;
+	pthread_mutex_t pollerMutex;
 	
 	int added;
 	int received;
 	
+	bool pollerRunning;
 	
+	
 public:
 	FileOutput(std::string & _fileName);
 	virtual ~FileOutput();
@@ -37,10 +40,16 @@
 		return fileHandle;
 	}
 	
+	
+	// Finishes the polling thread (if any) and return
+	void stopPoller();
+	
+	void preAllocate(int numberOfBlocks, size_t size);
+	
 	// Nothing to be done on FileOutput 
 	void pollEvents(THREAD_CONTEXT threadContext);
 	
-	void validateLowRate(THREAD_CONTEXT threadContext)
+	void validateLowRate(THREAD_CONTEXT )
 	{
 	}
 	

Modified: projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h	2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/JNIBufferAdapter.h	2008-02-20 01:59:00 UTC (rev 3735)
@@ -24,7 +24,7 @@
 	void done(THREAD_CONTEXT threadContext);
 	int blockSize(THREAD_CONTEXT threadContext);
 	void onError(THREAD_CONTEXT threadContext, long error, std::string error);
-	void addref(THREAD_CONTEXT threadContextt)
+	void addref(THREAD_CONTEXT )
 	{
 		refs++;
 	}

Modified: projects/jaio/trunk/jaio/native/src/LibAIOController.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/LibAIOController.cpp	2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/LibAIOController.cpp	2008-02-20 01:59:00 UTC (rev 3735)
@@ -145,6 +145,7 @@
 	try
 	{
 		AIOController * controller = (AIOController *) controllerAddress;
+		controller->fileOutput.stopPoller();
 		delete controller;
 	}
 	catch (AIOException& e)
@@ -152,3 +153,17 @@
 		throwException(env, "java/lang/RuntimeException", e.what());
 	}
 }
+
+JNIEXPORT void JNICALL Java_org_jboss_jaio_libaioimpl_LibAIOController_preAllocate
+  (JNIEnv * env, jclass, jlong controllerAddress, jint blocks, jlong size)
+{
+	try
+	{
+		AIOController * controller = (AIOController *) controllerAddress;
+		controller->fileOutput.preAllocate(blocks, size);
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, "java/lang/RuntimeException", e.what());
+	}
+}

Modified: projects/jaio/trunk/jaio/native/src/PageManager.cpp
===================================================================
--- projects/jaio/trunk/jaio/native/src/PageManager.cpp	2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/PageManager.cpp	2008-02-20 01:59:00 UTC (rev 3735)
@@ -93,7 +93,7 @@
     initMemory();
 }
 
-void PageManager::pollEvents(THREAD_CONTEXT threadContext)
+void PageManager::pollEvents(THREAD_CONTEXT)
 {
     //observer->completePage(pagePosition, memoryArea, list);
     //initMemory();

Modified: projects/jaio/trunk/jaio/native/src/PageManager.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/PageManager.h	2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/PageManager.h	2008-02-20 01:59:00 UTC (rev 3735)
@@ -34,7 +34,7 @@
 	
 	void destroy(THREAD_CONTEXT threadContext);
 	
-	void validateLowRate(THREAD_CONTEXT threadContext)
+	void validateLowRate(THREAD_CONTEXT)
 	{
 	}
 	

Modified: projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h
===================================================================
--- projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h	2008-02-19 16:52:32 UTC (rev 3734)
+++ projects/jaio/trunk/jaio/native/src/org_jboss_jaio_libaioimpl_LibAIOController.h	2008-02-20 01:59:00 UTC (rev 3735)
@@ -33,6 +33,14 @@
 
 /*
  * Class:     org_jboss_jaio_libaioimpl_LibAIOController
+ * Method:    preAllocate
+ * Signature: (JIJ)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_jaio_libaioimpl_LibAIOController_preAllocate
+  (JNIEnv *, jclass, jlong, jint, jlong);
+
+/*
+ * Class:     org_jboss_jaio_libaioimpl_LibAIOController
  * Method:    closeInternal
  * Signature: (J)V
  */




More information about the jboss-cvs-commits mailing list