[hornetq-commits] JBoss hornetq SVN: r9438 - in trunk: src/main/org/hornetq/core/asyncio and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 21 01:59:13 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-07-21 01:59:12 -0400 (Wed, 21 Jul 2010)
New Revision: 9438

Modified:
   trunk/native/src/AsyncFile.cpp
   trunk/native/src/AsyncFile.h
   trunk/native/src/JNI_AsynchronousFileImpl.cpp
   trunk/native/src/Version.h
   trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
   trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
   trunk/src/main/org/hornetq/core/journal/SequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
   trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - It's not necessary to use aio_write during compacting, we can just use a direct write. I will then just use a direct write to avoid possible issues.

Modified: trunk/native/src/AsyncFile.cpp
===================================================================
--- trunk/native/src/AsyncFile.cpp	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/AsyncFile.cpp	2010-07-21 05:59:12 UTC (rev 9438)
@@ -206,6 +206,26 @@
 	free (preAllocBuffer);
 }
 
+
+/** Write directly to the file without using libaio queue */
+void AsyncFile::writeInternal(THREAD_CONTEXT, long position, size_t size, void *& buffer)
+{
+	if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+
+	if (::write(fileHandle, buffer, size)<0)
+	{
+		throw AIOException (NATIVE_ERROR_IO, "Error writing file");
+	}
+	
+	if (::fsync(fileHandle) < 0)
+	{
+		throw AIOException (NATIVE_ERROR_IO, "Error on synchronizing file");
+	}
+	
+
+}
+
+
 void AsyncFile::write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter)
 {
 

Modified: trunk/native/src/AsyncFile.h
===================================================================
--- trunk/native/src/AsyncFile.h	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/AsyncFile.h	2010-07-21 05:59:12 UTC (rev 9438)
@@ -49,6 +49,9 @@
 	
 	void write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter);
 	
+	/** Write directly to the file without using libaio queue */
+	void writeInternal(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer);
+	
 	void read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter);
 	
 	int getHandle()

Modified: trunk/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- trunk/native/src/JNI_AsynchronousFileImpl.cpp	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/JNI_AsynchronousFileImpl.cpp	2010-07-21 05:59:12 UTC (rev 9438)
@@ -196,8 +196,29 @@
 	}
 }
 
+JNIEXPORT void JNICALL Java_org_hornetq_core_asyncio_impl_AsynchronousFileImpl_writeInternal
+  (JNIEnv * env, jobject , jobject controllerAddress, jlong positionToWrite, jlong size, jobject jbuffer)
+{
+	try
+	{
+		AIOController * controller = getController(env, controllerAddress);
+		void * buffer = env->GetDirectBufferAddress(jbuffer);
 
+		if (buffer == 0)
+		{
+			throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+			return;
+		}
 
+		controller->fileOutput.writeInternal(env, positionToWrite, (size_t)size, buffer);
+	}
+	catch (AIOException& e)
+	{
+		throwException(env, e.getErrorCode(), e.what());
+	}
+}
+
+
 JNIEXPORT void Java_org_hornetq_core_asyncio_impl_AsynchronousFileImpl_internalPollEvents
   (JNIEnv *env, jclass, jobject controllerAddress)
 {

Modified: trunk/native/src/Version.h
===================================================================
--- trunk/native/src/Version.h	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/Version.h	2010-07-21 05:59:12 UTC (rev 9438)
@@ -3,6 +3,6 @@
 
 // This definition needs to match org.hornetq.core.asyncio.impl.AsynchronousFileImpl.EXPECTED_NATIVE_VERSION
 // Or else the native module won't be loaded because of version mismatches
-#define _VERSION_NATIVE_AIO 29
+#define _VERSION_NATIVE_AIO 30
 #endif
 

Modified: trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java	2010-07-21 05:59:12 UTC (rev 9438)
@@ -43,6 +43,12 @@
 
    /** Any error will be reported on the callback interface */
    void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
+   
+   /**
+    * Performs an internal direct write. 
+    * @throws HornetQException 
+    */
+   void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException;
 
    void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws HornetQException;
 
@@ -54,4 +60,5 @@
 
    String getFileName();
 
+
 }

Modified: trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2010-07-21 05:59:12 UTC (rev 9438)
@@ -51,7 +51,7 @@
 
    /** This definition needs to match Version.h on the native sources.
        Or else the native module won't be loaded because of version mismatches */
-   private static int EXPECTED_NATIVE_VERSION = 29;
+   private static int EXPECTED_NATIVE_VERSION = 30;
 
    /** Used to determine the next writing sequence */
    private final AtomicLong nextWritingSequence = new AtomicLong(0);
@@ -271,7 +271,18 @@
          writeLock.unlock();
       }
    }
+   
+   
+   public void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException
+   {
+      writeInternal(handler, positionToWrite, size, bytes);
+      if (bufferCallback != null)
+      {
+         bufferCallback.bufferDone(bytes);
+      }
+   }
 
+
    public void write(final long position,
                      final long size,
                      final ByteBuffer directByteBuffer,
@@ -629,6 +640,8 @@
                              ByteBuffer buffer,
                              AIOCallback aioPackage) throws HornetQException;
 
+   private native void writeInternal(ByteBuffer handle, long positionToWrite, long size, ByteBuffer bytes) throws HornetQException;
+
    private native void read(ByteBuffer handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
 
    private static native void fill(ByteBuffer handle, long position, int blocks, long size, byte fillChar) throws HornetQException;
@@ -720,4 +733,5 @@
          }
       }
    }
+
 }

Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java	2010-07-21 05:59:12 UTC (rev 9438)
@@ -69,6 +69,11 @@
 
    /** Write directly to the file without using any buffer */
    void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
+   
+   /** Write directly to the file.
+    *  This is used by compacting and other places where we write a big buffer in a single shot.
+    *  writeInternal should always block until the entire write is sync on disk */
+   void writeInternal(ByteBuffer bytes) throws Exception;
 
    int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2010-07-21 05:59:12 UTC (rev 9438)
@@ -279,7 +279,17 @@
 
       aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
    }
+   
+   public void writeInternal(ByteBuffer bytes) throws Exception
+   {
+      final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
 
+      final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+      aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
+   }
+
+
    // Protected methods
    // -----------------------------------------------------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2010-07-21 05:59:12 UTC (rev 9438)
@@ -180,13 +180,11 @@
       if (writingChannel != null)
       {
          sequentialFile.position(0);
-         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
          
          // To Fix the size of the file
          writingChannel.writerIndex(writingChannel.capacity());
          
-         sequentialFile.writeDirect(writingChannel.toByteBuffer(), true, completion);
-         completion.waitCompletion();
+         sequentialFile.writeInternal(writingChannel.toByteBuffer());
          sequentialFile.close();
          newDataFiles.add(currentFile);
       }

Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2010-07-21 05:59:12 UTC (rev 9438)
@@ -260,7 +260,14 @@
    {
       internalWrite(bytes, sync, null);
    }
+   
+   
+   public void writeInternal(ByteBuffer bytes) throws Exception
+   {
+      internalWrite(bytes, true, null);
+   }
 
+
    @Override
    protected ByteBuffer newBuffer(int size, final int limit)
    {

Modified: trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java	2010-07-21 05:59:12 UTC (rev 9438)
@@ -13,6 +13,10 @@
 
 package org.hornetq.tests.unit.core.asyncio;
 
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
 import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
@@ -917,7 +921,51 @@
       }
 
    }
+   
+   
+   public void testInternalWrite() throws Exception
+   {
+      final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+      controller.open(FILE_NAME, 2000);
 
+      ByteBuffer buffer = null;
+
+      try
+      {
+         final int SIZE = 10 * 512;
+
+         buffer = AsynchronousFileImpl.newBuffer(SIZE);
+         
+         for (int i = 0 ; i < SIZE; i++)
+         {
+            buffer.put(getSamplebyte(i));
+         }
+
+         controller.writeInternal(0, SIZE, buffer);
+         
+         InputStream fileInput = new BufferedInputStream(new FileInputStream(new File(FILE_NAME)));
+         
+         for (int  i = 0 ; i < SIZE; i++)
+         {
+            assertEquals((int)getSamplebyte(i), (int)fileInput.read());
+         }
+         
+         assertEquals(-1, fileInput.read());
+
+      }
+      catch (Exception e)
+      {
+         throw e;
+      }
+      finally
+      {
+         if (buffer != null) AsynchronousFileImpl.destroyBuffer(buffer);
+         if (controller != null) controller.close();
+      }
+
+   }
+
+
    public void testInvalidWrite() throws Exception
    {
       final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2010-07-21 05:59:12 UTC (rev 9438)
@@ -498,7 +498,17 @@
       {
          writeDirect(bytes, sync, null);
       }
+      
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.SequentialFile#writeInternal(java.nio.ByteBuffer)
+       */
+      public void writeInternal(ByteBuffer bytes) throws Exception
+      {
+         writeDirect(bytes, true);
+      }
 
+      
+
       private void checkAndResize(final int size)
       {
          int oldpos = data == null ? 0 : data.position();
@@ -662,8 +672,6 @@
        */
       public void setTimedBuffer(final TimedBuffer buffer)
       {
-         // TODO Auto-generated method stub
-
       }
 
    }



More information about the hornetq-commits mailing list