[jboss-cvs] JBoss Messaging SVN: r4844 - in branches/Branch_JBMESSAGING-1314: native/src and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 19 21:19:55 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-19 21:19:54 -0400 (Tue, 19 Aug 2008)
New Revision: 4844
Added:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/BufferCallback.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/BufferCallback.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/CleanBufferTest.java
Modified:
branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO32.so
branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO64.so
branches/Branch_JBMESSAGING-1314/native/src/JNICallbackAdapter.cpp
branches/Branch_JBMESSAGING-1314/native/src/LibAIOController.cpp
branches/Branch_JBMESSAGING-1314/native/src/Version.h
branches/Branch_JBMESSAGING-1314/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Experimental buffer reuse
Modified: branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO32.so
===================================================================
(Binary files differ)
Modified: branches/Branch_JBMESSAGING-1314/native/bin/libJBMLibAIO64.so
===================================================================
(Binary files differ)
Modified: branches/Branch_JBMESSAGING-1314/native/src/JNICallbackAdapter.cpp
===================================================================
--- branches/Branch_JBMESSAGING-1314/native/src/JNICallbackAdapter.cpp 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/native/src/JNICallbackAdapter.cpp 2008-08-20 01:19:54 UTC (rev 4844)
@@ -36,7 +36,7 @@
void JNICallbackAdapter::done(THREAD_CONTEXT threadContext)
{
- JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback);
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback, bufferReference);
return;
}
Modified: branches/Branch_JBMESSAGING-1314/native/src/LibAIOController.cpp
===================================================================
--- branches/Branch_JBMESSAGING-1314/native/src/LibAIOController.cpp 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/native/src/LibAIOController.cpp 2008-08-20 01:19:54 UTC (rev 4844)
@@ -50,7 +50,7 @@
std::string fileName = convertJavaString(env, jstrFileName);
AIOController * controller = new AIOController(fileName, (int) maxIO);
- controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/jboss/messaging/core/asyncio/AIOCallback;)V");
+ controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/jboss/messaging/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;)V");
if (!controller->done) return 0;
controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/jboss/messaging/core/asyncio/AIOCallback;ILjava/lang/String;)V");
@@ -105,6 +105,23 @@
}
}
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_resetBuffer
+ (JNIEnv *env, jclass, jobject jbuffer, jint size)
+{
+ void * buffer = env->GetDirectBufferAddress(jbuffer);
+
+ if (buffer == 0)
+ {
+ throwException(env, "java/lang/IllegalStateException", "Invalid Direct Buffer used");
+ return;
+ }
+
+ memset(buffer, 0, (size_t)size);
+
+}
+
+
+
JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
(JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
{
@@ -182,8 +199,7 @@
AIOController * controller = (AIOController *) controllerAddress;
controller->fileOutput.preAllocate(env, position, blocks, size, fillChar);
-
- //controller->fileOutput.preAllocate(env, blocks, size);
+
}
catch (AIOException& e)
{
Modified: branches/Branch_JBMESSAGING-1314/native/src/Version.h
===================================================================
--- branches/Branch_JBMESSAGING-1314/native/src/Version.h 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/native/src/Version.h 2008-08-20 01:19:54 UTC (rev 4844)
@@ -1,5 +1,5 @@
#ifndef _VERSION_NATIVE_AIO
-#define _VERSION_NATIVE_AIO 10
+#define _VERSION_NATIVE_AIO 12
#endif
Modified: branches/Branch_JBMESSAGING-1314/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h
===================================================================
--- branches/Branch_JBMESSAGING-1314/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/native/src/org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl.h 2008-08-20 01:19:54 UTC (rev 4844)
@@ -13,6 +13,14 @@
/* Inaccessible static: EXPECTED_NATIVE_VERSION */
/*
* Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
+ * Method: resetBuffer
+ * Signature: (Ljava/nio/ByteBuffer;I)V
+ */
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_resetBuffer
+ (JNIEnv *, jclass, jobject, jint);
+
+/*
+ * Class: org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl
* Method: init
* Signature: (Ljava/lang/String;ILorg/jboss/messaging/core/logging/Logger;)J
*/
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -54,6 +54,8 @@
ByteBuffer newBuffer(int size);
+ void setBufferCallback(BufferCallback callback);
+
int getBlockSize();
String getFileName();
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/BufferCallback.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/BufferCallback.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/BufferCallback.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.asyncio;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface BufferCallback
+{
+ void bufferDone(ByteBuffer buffer);
+}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.asyncio.AIOCallback;
import org.jboss.messaging.core.asyncio.AsynchronousFile;
+import org.jboss.messaging.core.asyncio.BufferCallback;
import org.jboss.messaging.core.logging.Logger;
@@ -54,7 +55,7 @@
private static boolean loaded = false;
- private static int EXPECTED_NATIVE_VERSION = 10;
+ private static int EXPECTED_NATIVE_VERSION = 12;
static void addMax(int io)
{
@@ -129,6 +130,7 @@
private ReadWriteLock lock = new ReentrantReadWriteLock();
private Lock writeLock = lock.writeLock();
private Semaphore writeSemaphore;
+ private BufferCallback bufferCallback;
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -256,16 +258,25 @@
return ByteBuffer.allocateDirect((int)size);
}
+ public void setBufferCallback(BufferCallback callback)
+ {
+ this.bufferCallback = callback;
+ }
+
// Private
// ---------------------------------------------------------------------------------
- /** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
+ /** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
- private void callbackDone(final AIOCallback callback)
+ private void callbackDone(final AIOCallback callback, final ByteBuffer buffer)
{
writeSemaphore.release();
callback.done();
+ if (this.bufferCallback != null)
+ {
+ this.bufferCallback.bufferDone(buffer);
+ }
}
@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
@@ -311,6 +322,8 @@
// Native
// ------------------------------------------------------------------------------------------
+ public static native void resetBuffer(ByteBuffer directByteBuffer, int size);
+
private static native long init(String fileName, int maxIO, Logger logger);
private native long size0(long handle);
Added: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/BufferCallback.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/BufferCallback.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/BufferCallback.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface BufferCallback extends org.jboss.messaging.core.asyncio.BufferCallback
+{
+
+}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -46,6 +46,8 @@
*/
void open(int maxIO) throws Exception;
+ void setBufferCallback(BufferCallback callback);
+
int getAlignment() throws Exception;
int calculateBlockStart(int position) throws Exception;
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -48,6 +48,8 @@
int getAlignment();
- int calculateBlockSize(int bytes) throws Exception;
+ int calculateBlockSize(int bytes);
+ void cleanBuffer(ByteBuffer buffer);
+
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -33,6 +33,7 @@
import org.jboss.messaging.core.asyncio.AsynchronousFile;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.logging.Logger;
@@ -181,6 +182,11 @@
}
+ public void setBufferCallback(BufferCallback callback)
+ {
+ aioFile.setBufferCallback(callback);
+ }
+
public void position(final int pos) throws Exception
{
position.set(pos);
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -65,6 +65,12 @@
return ByteBuffer.allocateDirect(size);
}
+ public void cleanBuffer(ByteBuffer directByteBuffer)
+ {
+ AsynchronousFileImpl.resetBuffer(directByteBuffer, directByteBuffer.capacity());
+ }
+
+
public int getAlignment()
{
return 512;
@@ -78,7 +84,7 @@
return newbuffer;
}
- public int calculateBlockSize(int position) throws Exception
+ public int calculateBlockSize(int position)
{
int alignment = getAlignment();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -49,6 +49,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.LoadManager;
@@ -75,6 +76,8 @@
public class JournalImpl implements TestableJournal
{
+ private static final int REUSED_BUFFER_SIZE = 5 * 1024;
+
// Constants -----------------------------------------------------
private static final int STATE_STOPPED = 0;
@@ -176,6 +179,10 @@
private ExecutorService filesExecutor = null;
+ private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+
+ private final BufferCallback bufferCallback = new LocalBufferCallback();
+
/*
* We use a semaphore rather than synchronized since it performs better when
* contended
@@ -271,7 +278,7 @@
int size = SIZE_ADD_RECORD + recordLength;
- ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
bb.putByte(ADD_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -305,7 +312,7 @@
int size = SIZE_ADD_RECORD + record.length;
- ByteBuffer bb = fileFactory.newBuffer(size);
+ ByteBuffer bb = newBuffer(size);
bb.put(ADD_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -347,7 +354,7 @@
int size = SIZE_UPDATE_RECORD + record.length;
- ByteBuffer bb = fileFactory.newBuffer(size);
+ ByteBuffer bb = newBuffer(size);
bb.put(UPDATE_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -387,7 +394,7 @@
int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
- ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
bb.putByte(UPDATE_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -428,7 +435,7 @@
int size = SIZE_DELETE_RECORD;
- ByteBuffer bb = fileFactory.newBuffer(size);
+ ByteBuffer bb = newBuffer(size);
bb.put(DELETE_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -467,7 +474,7 @@
int size = SIZE_ADD_RECORD_TX + recordLength;
- ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
bb.putByte(ADD_RECORD_TX);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -504,7 +511,7 @@
int size = SIZE_ADD_RECORD_TX + record.length;
- ByteBuffer bb = fileFactory.newBuffer(size);
+ ByteBuffer bb = newBuffer(size);
bb.put(ADD_RECORD_TX);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -541,7 +548,7 @@
int size = SIZE_UPDATE_RECORD_TX + record.length;
- ByteBuffer bb = fileFactory.newBuffer(size);
+ ByteBuffer bb = newBuffer(size);
bb.put(UPDATE_RECORD_TX);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -569,6 +576,7 @@
}
}
+
public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
{
if (state != STATE_LOADED)
@@ -578,7 +586,7 @@
int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
- ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
+ ByteBufferWrapper bb = new ByteBufferWrapper(newBuffer(size));
bb.putByte(UPDATE_RECORD_TX);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -605,7 +613,7 @@
lock.release();
}
}
-
+
public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
{
if (state != STATE_LOADED)
@@ -615,7 +623,7 @@
int size = SIZE_DELETE_RECORD_TX;
- ByteBuffer bb = fileFactory.newBuffer(size);
+ ByteBuffer bb = newBuffer(size);
bb.put(DELETE_RECORD_TX);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -718,7 +726,7 @@
int size = SIZE_ROLLBACK_RECORD;
- ByteBuffer bb = fileFactory.newBuffer(size);
+ ByteBuffer bb = newBuffer(size);
bb.put(ROLLBACK_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -1191,6 +1199,8 @@
{
currentFile.getFile().open();
+ currentFile.getFile().setBufferCallback(bufferCallback);
+
currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
currentFile.setOffset(lastDataPos);
@@ -1582,7 +1592,7 @@
{
int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() * SIZE_INT * 2;
- ByteBuffer bb = fileFactory.newBuffer(size);
+ ByteBuffer bb = newBuffer(size);
bb.put(recordType);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
@@ -1709,7 +1719,7 @@
* */
private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
{
- int size = bb.capacity();
+ int size = bb.limit();
checkFile(size);
bb.position(SIZE_BYTE);
if (currentFile == null)
@@ -1734,6 +1744,7 @@
return currentFile;
}
+
private JournalFile createFile(final boolean keepOpened) throws Exception
{
int orderingID = generateOrderingID();
@@ -1772,6 +1783,8 @@
{
file.getFile().open();
file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
+ file.getFile().setBufferCallback(bufferCallback);
+
}
private int generateOrderingID()
@@ -1943,7 +1956,61 @@
return null;
}
}
+ // -- Area reserved for the reuse buffer logic -----------------------------------------
+ volatile long lastTime = System.currentTimeMillis();
+ private ByteBuffer newBuffer(int size)
+ {
+ if (System.currentTimeMillis() - lastTime > 10000)
+ {
+ System.out.println("Clear!!!" + reuseBuffers.size());
+ reuseBuffers.clear();
+ }
+
+ if (size > REUSED_BUFFER_SIZE)
+ {
+ return fileFactory.newBuffer(size);
+ }
+ else
+ {
+
+ ByteBuffer buffer = this.reuseBuffers.poll();
+ if (buffer == null)
+ {
+ buffer = fileFactory.newBuffer(REUSED_BUFFER_SIZE);
+ }
+ else
+ {
+ // Reset the buffer before reusing it
+ // Maybe we could make use of native methods (memset) to clean the buffer
+ buffer.limit(buffer.capacity());
+ fileFactory.cleanBuffer(buffer);
+ }
+
+ buffer.limit(fileFactory.calculateBlockSize(size));
+ buffer.rewind();
+
+ return buffer;
+ }
+ }
+
+ private class LocalBufferCallback implements BufferCallback
+ {
+
+ public void bufferDone(ByteBuffer buffer)
+ {
+ lastTime = System.currentTimeMillis();
+ if (buffer.capacity() == REUSED_BUFFER_SIZE)
+ {
+ reuseBuffers.offer(buffer);
+ }
+ }
+
+ }
+
+ // ------------------------------------------------------------------------------------
+
+
// Inner classes ---------------------------------------------------------------------------
private static class TransactionCallback implements IOCallback
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.logging.Logger;
@@ -53,6 +54,8 @@
private RandomAccessFile rfile;
+ BufferCallback bufferCallback;
+
public NIOSequentialFile(final String journalDir, final String fileName)
{
this.journalDir = journalDir;
@@ -89,6 +92,13 @@
open();
}
+
+
+ public void setBufferCallback(BufferCallback callback)
+ {
+ this.bufferCallback = callback;
+ }
+
public void fill(final int position, final int size, final byte fillCharacter) throws Exception
{
ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -167,6 +177,11 @@
sync();
}
+ if (bufferCallback != null)
+ {
+ bufferCallback.bufferDone(bytes);
+ }
+
return bytesRead;
}
@@ -180,7 +195,13 @@
{
callback.done();
}
+
+ if (bufferCallback != null)
+ {
+ bufferCallback.bufferDone(bytes);
+ }
+
return bytesRead;
}
catch (Exception e)
@@ -189,7 +210,7 @@
throw e;
}
}
-
+
public void sync() throws Exception
{
channel.force(false);
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -58,6 +58,22 @@
return ByteBuffer.allocate(size);
}
+ public void cleanBuffer(final ByteBuffer buffer)
+ {
+ final int limit = buffer.limit();
+ final int capacity = buffer.capacity();
+ buffer.limit(capacity);
+ buffer.rewind();
+
+ for (int i = 0; i < capacity; i++)
+ {
+ buffer.put((byte)0);
+ }
+
+ buffer.limit(limit);
+ buffer.rewind();
+ }
+
public ByteBuffer wrapBuffer(final byte[] bytes)
{
return ByteBuffer.wrap(bytes);
@@ -68,7 +84,7 @@
return 1;
}
- public int calculateBlockSize(int bytes) throws Exception
+ public int calculateBlockSize(int bytes)
{
return bytes;
}
Added: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/CleanBufferTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/CleanBufferTest.java (rev 0)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/CleanBufferTest.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -0,0 +1,115 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.journal.impl;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class CleanBufferTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public void testCleanOnNIO()
+ {
+ SequentialFileFactory factory = new NIOSequentialFileFactory("Whatever");
+
+ testBuffer(factory);
+ }
+
+ public void testCleanOnAIO()
+ {
+ if (AsynchronousFileImpl.isLoaded())
+ {
+ SequentialFileFactory factory = new AIOSequentialFileFactory("Whatever");
+
+ testBuffer(factory);
+ }
+ }
+
+ public void testCleanOnFake()
+ {
+ SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+ testBuffer(factory);
+ }
+
+ private void testBuffer(SequentialFileFactory factory)
+ {
+ ByteBuffer buffer = factory.newBuffer(100);
+ for (byte b = 0; b < 100; b++)
+ {
+ buffer.put(b);
+ }
+
+ buffer.rewind();
+
+ for (byte b = 0; b < 100; b++)
+ {
+ assertEquals(b, buffer.get());
+ }
+
+
+
+ buffer.limit(10);
+ factory.cleanBuffer(buffer);
+ buffer.limit(100);
+
+ buffer.rewind();
+
+ for (byte b = 0; b < 100; b++)
+ {
+ assertEquals(0, buffer.get());
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -29,6 +29,7 @@
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IArgumentMatcher;
+import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.JournalImpl;
@@ -456,6 +457,24 @@
});
+ EasyMock.expect(mockFactory.calculateBlockSize(EasyMock.anyInt()))
+ .andStubAnswer(new IAnswer<Integer>()
+ {
+
+ public Integer answer() throws Throwable
+ {
+ return (Integer) EasyMock.getCurrentArguments()[0];
+ }
+ });
+
+ file1.setBufferCallback(EasyMock.isA(BufferCallback.class));
+ EasyMock.expectLastCall().anyTimes();
+
+ file2.setBufferCallback(EasyMock.isA(BufferCallback.class));
+ EasyMock.expectLastCall().anyTimes();
+
+
+
EasyMock.expect(file1.getAlignment()).andStubReturn(1);
EasyMock.expect(file2.getAlignment()).andStubReturn(1);
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
import org.jboss.messaging.core.journal.RecordInfo;
@@ -83,6 +84,7 @@
}
catch (Exception e)
{
+ e.printStackTrace();
this.e = e;
}
}
@@ -90,9 +92,9 @@
LocalThread t = new LocalThread();
t.start();
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
- latch.await();
-
Thread.yield();
assertTrue(t.isAlive());
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-08-20 01:09:33 UTC (rev 4843)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-08-20 01:19:54 UTC (rev 4844)
@@ -28,6 +28,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
@@ -101,6 +102,22 @@
return sf;
}
+ public void cleanBuffer(final ByteBuffer buffer)
+ {
+ final int limit = buffer.limit();
+ final int capacity = buffer.capacity();
+ buffer.limit(capacity);
+ buffer.rewind();
+
+ for (int i = 0; i < capacity; i++)
+ {
+ buffer.put((byte)0);
+ }
+
+ buffer.limit(limit);
+ buffer.rewind();
+ }
+
public List<String> listFiles(final String extension)
{
List<String> files = new ArrayList<String>();
@@ -140,7 +157,7 @@
return ByteBuffer.allocateDirect(size);
}
- public int calculateBlockSize(int position) throws Exception
+ public int calculateBlockSize(int position)
{
int alignment = getAlignment();
@@ -229,7 +246,7 @@
final IOCallback callback;
volatile boolean sendError;
- CallbackRunnable(FakeSequentialFile file, ByteBuffer bytes, IOCallback callback)
+ CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final IOCallback callback)
{
this.file = file;
this.bytes = bytes;
@@ -245,8 +262,21 @@
}
else
{
- file.data.put(bytes);
- if (callback!=null) callback.done();
+ try
+ {
+ file.data.put(bytes);
+ if (callback!=null) callback.done();
+
+ if (file.bufferCallback != null)
+ {
+ file.bufferCallback.bufferDone(bytes);
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ callback.onError(-1, e.getMessage());
+ }
}
}
@@ -269,7 +299,9 @@
private final String fileName;
private ByteBuffer data;
-
+
+ private BufferCallback bufferCallback;
+
public ByteBuffer getData()
{
return data;
@@ -323,6 +355,11 @@
checkAndResize(0);
}
+ public void setBufferCallback(BufferCallback callback)
+ {
+ this.bufferCallback = callback;
+ }
+
public void fill(final int pos, final int size, final byte fillCharacter) throws Exception
{
if (!open)
@@ -397,7 +434,7 @@
checkAlignment(bytes.limit());
- checkAndResize(bytes.capacity() + position);
+ checkAndResize(bytes.limit() + position);
CallbackRunnable action = new CallbackRunnable(this, bytes, callback);
More information about the jboss-cvs-commits
mailing list