[hornetq-commits] JBoss hornetq SVN: r9438 - in trunk: src/main/org/hornetq/core/asyncio and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Jul 21 01:59:13 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-07-21 01:59:12 -0400 (Wed, 21 Jul 2010)
New Revision: 9438
Modified:
trunk/native/src/AsyncFile.cpp
trunk/native/src/AsyncFile.h
trunk/native/src/JNI_AsynchronousFileImpl.cpp
trunk/native/src/Version.h
trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - It's not necessary to use aio_write during compacting, we can just use a direct write. I will then just use a direct write to avoid possible issues.
Modified: trunk/native/src/AsyncFile.cpp
===================================================================
--- trunk/native/src/AsyncFile.cpp 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/AsyncFile.cpp 2010-07-21 05:59:12 UTC (rev 9438)
@@ -206,6 +206,26 @@
free (preAllocBuffer);
}
+
+/** Write directly to the file without using libaio queue */
+void AsyncFile::writeInternal(THREAD_CONTEXT, long position, size_t size, void *& buffer)
+{
+ if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
+
+ if (::write(fileHandle, buffer, size)<0)
+ {
+ throw AIOException (NATIVE_ERROR_IO, "Error writing file");
+ }
+
+ if (::fsync(fileHandle) < 0)
+ {
+ throw AIOException (NATIVE_ERROR_IO, "Error on synchronizing file");
+ }
+
+
+}
+
+
void AsyncFile::write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter)
{
Modified: trunk/native/src/AsyncFile.h
===================================================================
--- trunk/native/src/AsyncFile.h 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/AsyncFile.h 2010-07-21 05:59:12 UTC (rev 9438)
@@ -49,6 +49,9 @@
void write(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter);
+ /** Write directly to the file without using libaio queue */
+ void writeInternal(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer);
+
void read(THREAD_CONTEXT threadContext, long position, size_t size, void *& buffer, CallbackAdapter *& adapter);
int getHandle()
Modified: trunk/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- trunk/native/src/JNI_AsynchronousFileImpl.cpp 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/JNI_AsynchronousFileImpl.cpp 2010-07-21 05:59:12 UTC (rev 9438)
@@ -196,8 +196,29 @@
}
}
+JNIEXPORT void JNICALL Java_org_hornetq_core_asyncio_impl_AsynchronousFileImpl_writeInternal
+ (JNIEnv * env, jobject , jobject controllerAddress, jlong positionToWrite, jlong size, jobject jbuffer)
+{
+ try
+ {
+ AIOController * controller = getController(env, controllerAddress);
+ void * buffer = env->GetDirectBufferAddress(jbuffer);
+ if (buffer == 0)
+ {
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
+ return;
+ }
+ controller->fileOutput.writeInternal(env, positionToWrite, (size_t)size, buffer);
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ }
+}
+
+
JNIEXPORT void Java_org_hornetq_core_asyncio_impl_AsynchronousFileImpl_internalPollEvents
(JNIEnv *env, jclass, jobject controllerAddress)
{
Modified: trunk/native/src/Version.h
===================================================================
--- trunk/native/src/Version.h 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/native/src/Version.h 2010-07-21 05:59:12 UTC (rev 9438)
@@ -3,6 +3,6 @@
// This definition needs to match org.hornetq.core.asyncio.impl.AsynchronousFileImpl.EXPECTED_NATIVE_VERSION
// Or else the native module won't be loaded because of version mismatches
-#define _VERSION_NATIVE_AIO 29
+#define _VERSION_NATIVE_AIO 30
#endif
Modified: trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/asyncio/AsynchronousFile.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -43,6 +43,12 @@
/** Any error will be reported on the callback interface */
void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
+
+ /**
+ * Performs an internal direct write.
+ * @throws HornetQException
+ */
+ void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException;
void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws HornetQException;
@@ -54,4 +60,5 @@
String getFileName();
+
}
Modified: trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -51,7 +51,7 @@
/** This definition needs to match Version.h on the native sources.
Or else the native module won't be loaded because of version mismatches */
- private static int EXPECTED_NATIVE_VERSION = 29;
+ private static int EXPECTED_NATIVE_VERSION = 30;
/** Used to determine the next writing sequence */
private final AtomicLong nextWritingSequence = new AtomicLong(0);
@@ -271,7 +271,18 @@
writeLock.unlock();
}
}
+
+
+ public void writeInternal(long positionToWrite, long size, ByteBuffer bytes) throws HornetQException
+ {
+ writeInternal(handler, positionToWrite, size, bytes);
+ if (bufferCallback != null)
+ {
+ bufferCallback.bufferDone(bytes);
+ }
+ }
+
public void write(final long position,
final long size,
final ByteBuffer directByteBuffer,
@@ -629,6 +640,8 @@
ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
+ private native void writeInternal(ByteBuffer handle, long positionToWrite, long size, ByteBuffer bytes) throws HornetQException;
+
private native void read(ByteBuffer handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
private static native void fill(ByteBuffer handle, long position, int blocks, long size, byte fillChar) throws HornetQException;
@@ -720,4 +733,5 @@
}
}
}
+
}
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -69,6 +69,11 @@
/** Write directly to the file without using any buffer */
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
+
+ /** Write directly to the file.
+ * This is used by compacting and other places where we write a big buffer in a single shot.
+ * writeInternal should always block until the entire write is sync on disk */
+ void writeInternal(ByteBuffer bytes) throws Exception;
int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -279,7 +279,17 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
+
+ public void writeInternal(ByteBuffer bytes) throws Exception
+ {
+ final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+ final long positionToWrite = position.getAndAdd(bytesToWrite);
+
+ aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
+ }
+
+
// Protected methods
// -----------------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -180,13 +180,11 @@
if (writingChannel != null)
{
sequentialFile.position(0);
- SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
// To Fix the size of the file
writingChannel.writerIndex(writingChannel.capacity());
- sequentialFile.writeDirect(writingChannel.toByteBuffer(), true, completion);
- completion.waitCompletion();
+ sequentialFile.writeInternal(writingChannel.toByteBuffer());
sequentialFile.close();
newDataFiles.add(currentFile);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -260,7 +260,14 @@
{
internalWrite(bytes, sync, null);
}
+
+
+ public void writeInternal(ByteBuffer bytes) throws Exception
+ {
+ internalWrite(bytes, true, null);
+ }
+
@Override
protected ByteBuffer newBuffer(int size, final int limit)
{
Modified: trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -13,6 +13,10 @@
package org.hornetq.tests.unit.core.asyncio;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
@@ -917,7 +921,51 @@
}
}
+
+
+ public void testInternalWrite() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+ controller.open(FILE_NAME, 2000);
+ ByteBuffer buffer = null;
+
+ try
+ {
+ final int SIZE = 10 * 512;
+
+ buffer = AsynchronousFileImpl.newBuffer(SIZE);
+
+ for (int i = 0 ; i < SIZE; i++)
+ {
+ buffer.put(getSamplebyte(i));
+ }
+
+ controller.writeInternal(0, SIZE, buffer);
+
+ InputStream fileInput = new BufferedInputStream(new FileInputStream(new File(FILE_NAME)));
+
+ for (int i = 0 ; i < SIZE; i++)
+ {
+ assertEquals((int)getSamplebyte(i), (int)fileInput.read());
+ }
+
+ assertEquals(-1, fileInput.read());
+
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ finally
+ {
+ if (buffer != null) AsynchronousFileImpl.destroyBuffer(buffer);
+ if (controller != null) controller.close();
+ }
+
+ }
+
+
public void testInvalidWrite() throws Exception
{
final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2010-07-21 05:07:44 UTC (rev 9437)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2010-07-21 05:59:12 UTC (rev 9438)
@@ -498,7 +498,17 @@
{
writeDirect(bytes, sync, null);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFile#writeInternal(java.nio.ByteBuffer)
+ */
+ public void writeInternal(ByteBuffer bytes) throws Exception
+ {
+ writeDirect(bytes, true);
+ }
+
+
private void checkAndResize(final int size)
{
int oldpos = data == null ? 0 : data.position();
@@ -662,8 +672,6 @@
*/
public void setTimedBuffer(final TimedBuffer buffer)
{
- // TODO Auto-generated method stub
-
}
}
More information about the hornetq-commits
mailing list