[jboss-cvs] JBoss Messaging SVN: r6683 - in trunk: native/src and 16 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 6 00:42:57 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-05-06 00:42:56 -0400 (Wed, 06 May 2009)
New Revision: 6683
Removed:
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java
Modified:
trunk/native/bin/libJBMLibAIO32.so
trunk/native/bin/libJBMLibAIO64.so
trunk/native/src/AsyncFile.cpp
trunk/native/src/LibAIOController.cpp
trunk/native/src/Version.h
trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1538 - AIO & Buffer creation
Modified: trunk/native/bin/libJBMLibAIO32.so
===================================================================
(Binary files differ)
Modified: trunk/native/bin/libJBMLibAIO64.so
===================================================================
(Binary files differ)
Modified: trunk/native/src/AsyncFile.cpp
===================================================================
--- trunk/native/src/AsyncFile.cpp 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/native/src/AsyncFile.cpp 2009-05-06 04:42:56 UTC (rev 6683)
@@ -13,7 +13,7 @@
USA
The GNU Lesser General Public License is available in the file COPYING.
-
+
Software written by Clebert Suconic (csuconic at redhat dot com)
*/
@@ -48,12 +48,12 @@
std::string io_error(int rc)
{
std::stringstream buffer;
-
+
if (rc == -ENOSYS)
buffer << "AIO not in this kernel";
- else
+ else
buffer << "Error:= " << strerror(-rc);
-
+
return buffer.str();
}
@@ -62,27 +62,27 @@
{
::pthread_mutex_init(&fileMutex,0);
::pthread_mutex_init(&pollerMutex,0);
-
+
maxIO = _maxIO;
fileName = _fileName;
if (io_queue_init(maxIO, &aioContext))
{
- throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio");
+ throw AIOException(NATIVE_ERROR_CANT_INITIALIZE_AIO, "Can't initialize aio, out of AIO Handlers");
}
fileHandle = ::open(fileName.data(), O_RDWR | O_CREAT | O_DIRECT, 0666);
if (fileHandle < 0)
{
io_queue_release(aioContext);
- throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file");
+ throw AIOException(NATIVE_ERROR_CANT_OPEN_CLOSE_FILE, "Can't open file");
}
-
+
#ifdef DEBUG
fprintf (stderr,"File Handle %d", fileHandle);
#endif
events = (struct io_event *)malloc (maxIO * sizeof (struct io_event));
-
+
if (events == 0)
{
throw AIOException (NATIVE_ERROR_CANT_ALLOCATE_QUEUE, "Can't allocate ioEvents");
@@ -112,17 +112,17 @@
void AsyncFile::pollEvents(THREAD_CONTEXT threadContext)
{
-
+
LockClass lock(&pollerMutex);
pollerRunning=1;
-
+
// TODO: Maybe we don't need to wait for one second.... we just keep waiting forever, and use something to interrupt it
// maybe an invalid write to interrupt it.
struct timespec oneSecond;
oneSecond.tv_sec = 1;
oneSecond.tv_nsec = 0;
-
-
+
+
while (pollerRunning)
{
if (isException(threadContext))
@@ -130,15 +130,15 @@
return;
}
int result = io_getevents(this->aioContext, 1, maxIO, events, 0);
-
-
+
+
#ifdef DEBUG
fprintf (stderr, "poll, pollerRunning=%d\n", pollerRunning); fflush(stderr);
#endif
-
+
if (result > 0)
{
-
+
#ifdef DEBUG
fprintf (stdout, "Received %d events\n", result);
fflush(stdout);
@@ -147,9 +147,9 @@
for (int i=0; i<result; i++)
{
-
+
struct iocb * iocbp = events[i].obj;
-
+
if (iocbp->data == (void *) -1)
{
pollerRunning = 0;
@@ -160,7 +160,7 @@
else
{
CallbackAdapter * adapter = (CallbackAdapter *) iocbp->data;
-
+
long result = events[i].res;
if (result < 0)
{
@@ -172,13 +172,13 @@
adapter->done(threadContext);
}
}
-
+
delete iocbp;
}
}
#ifdef DEBUG
controller->log(threadContext, 2, "Poller finished execution");
-#endif
+#endif
}
@@ -189,18 +189,18 @@
{
throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "You can only pre allocate files in multiples of 512");
}
-
+
void * preAllocBuffer = 0;
if (posix_memalign(&preAllocBuffer, 512, size))
{
throw AIOException(NATIVE_ERROR_ALLOCATE_MEMORY, "Error on posix_memalign");
}
-
+
memset(preAllocBuffer, fillChar, size);
-
-
+
+
if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (11, "Error positioning the file");
-
+
for (int i=0; i<blocks; i++)
{
if (::write(fileHandle, preAllocBuffer, size)<0)
@@ -208,9 +208,9 @@
throw AIOException (NATIVE_ERROR_PREALLOCATE_FILE, "Error pre allocating the file");
}
}
-
+
if (::lseek (fileHandle, position, SEEK_SET) < 0) throw AIOException (NATIVE_ERROR_IO, "Error positioning the file");
-
+
free (preAllocBuffer);
}
@@ -223,7 +223,7 @@
int tries = 0;
int result = 0;
-
+
while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
{
#ifdef DEBUG
@@ -237,7 +237,7 @@
#endif
controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
}
-
+
if (tries > TRIES_BEFORE_ERROR)
{
#ifdef DEBUG
@@ -247,7 +247,7 @@
}
::usleep(WAIT_FOR_SPOT);
}
-
+
if (result<0)
{
std::stringstream str;
@@ -265,7 +265,7 @@
int tries = 0;
int result = 0;
-
+
while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
{
#ifdef DEBUG
@@ -279,7 +279,7 @@
#endif
controller->log(threadContext, 1, "You should consider expanding AIOLimit if this message appears too many times");
}
-
+
if (tries > TRIES_BEFORE_ERROR)
{
#ifdef DEBUG
@@ -289,7 +289,7 @@
}
::usleep(WAIT_FOR_SPOT);
}
-
+
if (result<0)
{
std::stringstream str;
@@ -301,7 +301,7 @@
long AsyncFile::getSize()
{
struct stat64 statBuffer;
-
+
if (fstat64(fileHandle, &statBuffer) < 0)
{
return -1l;
@@ -313,21 +313,21 @@
void AsyncFile::stopPoller(THREAD_CONTEXT threadContext)
{
pollerRunning = 0;
-
-
+
+
struct iocb * iocb = new struct iocb();
::io_prep_pwrite(iocb, fileHandle, 0, 0, 0);
iocb->data = (void *) -1;
int result = 0;
-
+
while ((result = ::io_submit(aioContext, 1, &iocb)) == (-EAGAIN))
{
fprintf(stderr, "Couldn't send request to stop poller, trying again");
controller->log(threadContext, 1, "Couldn't send request to stop poller, trying again");
::usleep(WAIT_FOR_SPOT);
}
-
+
// Waiting the Poller to finish (by giving up the lock)
LockClass lock(&pollerMutex);
}
Modified: trunk/native/src/LibAIOController.cpp
===================================================================
--- trunk/native/src/LibAIOController.cpp 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/native/src/LibAIOController.cpp 2009-05-06 04:42:56 UTC (rev 6683)
@@ -35,8 +35,6 @@
#include "Version.h"
-
-
/*
* Class: org_jboss_jaio_libaioimpl_LibAIOController
* Method: init
@@ -110,6 +108,8 @@
}
}
+
+// Fast memset on buffer
JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_resetBuffer
(JNIEnv *env, jclass, jobject jbuffer, jint size)
{
@@ -125,7 +125,46 @@
}
+JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_destroyBuffer
+ (JNIEnv * env, jclass, jobject jbuffer)
+{
+ void * buffer = env->GetDirectBufferAddress(jbuffer);
+ free(buffer);
+}
+JNIEXPORT jobject JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_newNativeBuffer
+ (JNIEnv * env, jclass, jlong size)
+{
+ try
+ {
+
+ if (size % ALIGNMENT)
+ {
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Buffer size needs to be aligned to 512");
+ return 0;
+ }
+
+
+ // This will allocate a buffer, aligned by 512.
+ // Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
+ void * buffer = 0;
+ if (::posix_memalign(&buffer, 512, size))
+ {
+ throwException(env, NATIVE_ERROR_INTERNAL, "Error on posix_memalign");
+ return 0;
+ }
+
+ memset(buffer, 0, (size_t)size);
+
+ jobject jbuffer = env->NewDirectByteBuffer(buffer, size);
+ return jbuffer;
+ }
+ catch (AIOException& e)
+ {
+ throwException(env, e.getErrorCode(), e.what());
+ return 0;
+ }
+}
JNIEXPORT void JNICALL Java_org_jboss_messaging_core_asyncio_impl_AsynchronousFileImpl_write
(JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
@@ -134,12 +173,14 @@
{
AIOController * controller = (AIOController *) controllerAddress;
void * buffer = env->GetDirectBufferAddress(jbuffer);
+
if (buffer == 0)
{
throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Direct Buffer used");
return;
}
+
CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
Modified: trunk/native/src/Version.h
===================================================================
--- trunk/native/src/Version.h 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/native/src/Version.h 2009-05-06 04:42:56 UTC (rev 6683)
@@ -1,5 +1,5 @@
#ifndef _VERSION_NATIVE_AIO
-#define _VERSION_NATIVE_AIO 17
+#define _VERSION_NATIVE_AIO 19
#endif
Modified: trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/config/stand-alone/non-clustered/jbm-configuration.xml 2009-05-06 04:42:56 UTC (rev 6683)
@@ -187,7 +187,7 @@
<create-journal-dir>true</create-journal-dir>
- <journal-type>NIO</journal-type>
+ <journal-type>ASYNCIO</journal-type>
<!-- The journal will reuse any buffers where the size < journal-buffer-reuse-size on write operations
Set this to -1 to disable this feature -->
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -40,8 +40,9 @@
* Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error
* @param fileName
* @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
+ * @throws MessagingException
*/
- void open(String fileName, int maxIO);
+ void open(String fileName, int maxIO) throws MessagingException;
/**
* Warning: This function will perform a synchronous IO, probably translating to a fstat call
@@ -49,14 +50,13 @@
* */
long size() throws MessagingException;
- void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage) throws MessagingException;
+ /** Any error will be reported on the callback interface */
+ void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback);
- void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage) throws MessagingException;
+ void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioCallback) throws MessagingException;
void fill(long position, int blocks, long size, byte fillChar) throws MessagingException;
- ByteBuffer newBuffer(int size);
-
void setBufferCallback(BufferCallback callback);
int getBlockSize();
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -23,6 +23,8 @@
package org.jboss.messaging.core.asyncio.impl;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,6 +36,7 @@
import org.jboss.messaging.core.asyncio.BufferCallback;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.VariableLatch;
/**
*
@@ -54,7 +57,7 @@
private static boolean loaded = false;
- private static int EXPECTED_NATIVE_VERSION = 17;
+ private static int EXPECTED_NATIVE_VERSION = 19;
public static void addMax(final int io)
{
@@ -131,7 +134,9 @@
private String fileName;
- private volatile Thread poller;
+ private final VariableLatch pollerLatch = new VariableLatch();
+
+ private volatile Runnable poller;
private int maxIO;
@@ -145,12 +150,31 @@
* Warning: Beware of the C++ pointer! It will bite you! :-)
*/
private long handler;
+
+
+ // A context switch on AIO would make it to synchronize the disk before
+ // switching to the new thread, what would cause
+ // serious performance problems. Because of that we make all the writes on
+ // AIO using a single thread.
+ private final Executor writeExecutor;
+
+ private final Executor pollerExecutor;
// AsynchronousFile implementation
// ------------------------------------------------------------------------------------
- public void open(final String fileName, final int maxIO)
+ /**
+ * @param writeExecutor It needs to be a single Thread executor. If null it will use the user thread to execute write operations
+ * @param pollerExecutor The thread pool that will initialize poller handlers
+ */
+ public AsynchronousFileImpl(Executor writeExecutor, Executor pollerExecutor)
{
+ this.writeExecutor = writeExecutor;
+ this.pollerExecutor = pollerExecutor;
+ }
+
+ public void open(final String fileName, final int maxIO) throws MessagingException
+ {
writeLock.lock();
try
@@ -165,7 +189,23 @@
this.fileName = fileName;
- handler = init(fileName, this.maxIO, log);
+ try
+ {
+ handler = init(fileName, this.maxIO, log);
+ }
+ catch (MessagingException e)
+ {
+ MessagingException ex = null;
+ if (e.getCode() == MessagingException.NATIVE_ERROR_CANT_INITIALIZE_AIO)
+ {
+ ex = new MessagingException(e.getCode(), "Can't initialize AIO. Currently AIO in use = " + totalMaxIO.get() + ", trying to allocate more " + maxIO, e);
+ }
+ else
+ {
+ ex = e;
+ }
+ throw ex;
+ }
opened = true;
addMax(this.maxIO);
}
@@ -192,11 +232,10 @@
writeSemaphore = null;
if (poller != null)
{
- Thread currentPoller = poller;
stopPoller(handler);
// We need to make sure we won't call close until Poller is
// completely done, or we might get beautiful GPFs
- currentPoller.join();
+ this.pollerLatch.waitCompletion();
}
closeInternal(handler);
@@ -216,30 +255,56 @@
public void write(final long position,
final long size,
final ByteBuffer directByteBuffer,
- final AIOCallback aioPackage) throws MessagingException
+ final AIOCallback aioCallback)
{
+ if (aioCallback == null)
+ {
+ throw new NullPointerException("Null Callback");
+ }
+
checkOpened();
if (poller == null)
{
startPoller();
}
writeSemaphore.acquireUninterruptibly();
- try
+
+ if (writeExecutor != null)
{
- write(handler, position, size, directByteBuffer, aioPackage);
+ writeExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ write(handler, position, size, directByteBuffer, aioCallback);
+ }
+ catch (MessagingException e)
+ {
+ callbackError(aioCallback, e.getCode(), e.getMessage());
+ }
+ catch (RuntimeException e)
+ {
+ callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+ }
+ }
+ });
}
- catch (MessagingException e)
+ else
{
- // Release only if an exception happened
- writeSemaphore.release();
- throw e;
+ try
+ {
+ write(handler, position, size, directByteBuffer, aioCallback);
+ }
+ catch (MessagingException e)
+ {
+ callbackError(aioCallback, e.getCode(), e.getMessage());
+ }
+ catch (RuntimeException e)
+ {
+ callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+ }
}
- catch (RuntimeException e)
- {
- // Release only if an exception happened
- writeSemaphore.release();
- throw e;
- }
}
@@ -294,22 +359,42 @@
return fileName;
}
- // Should we make this method static?
- public ByteBuffer newBuffer(final int size)
+ /**
+ * This needs to be synchronized because of
+ * http://bugs.sun.com/view_bug.do?bug_id=6791815
+ * http://mail.openjdk.java.net/pipermail/hotspot-runtime-dev/2009-January/000386.html
+ *
+ * @param size
+ * @return
+ */
+ public synchronized static ByteBuffer newBuffer(final int size)
{
- if (size % getBlockSize() != 0)
+ if (size % 512 != 0)
{
throw new RuntimeException("Buffer size needs to be aligned to 512");
}
- return ByteBuffer.allocateDirect(size);
+ return newNativeBuffer(size);
}
public void setBufferCallback(final BufferCallback callback)
{
bufferCallback = callback;
}
+
+ /** Return the JNI handler used on C++ */
+ public long getHandler()
+ {
+ return handler;
+ }
+
+ public static void clearBuffer(ByteBuffer buffer)
+ {
+ resetBuffer(buffer, buffer.limit());
+ buffer.position(0);
+ }
+
// Private
// ---------------------------------------------------------------------------------
@@ -355,10 +440,11 @@
if (poller == null)
{
- poller = new PollerThread();
+ pollerLatch.up();
+ poller = new PollerRunnable();
try
{
- poller.start();
+ pollerExecutor.execute(poller);
}
catch (Exception ex)
{
@@ -379,14 +465,22 @@
throw new RuntimeException("File is not opened");
}
}
-
// Native
// ------------------------------------------------------------------------------------------
- public static native void resetBuffer(ByteBuffer directByteBuffer, int size);
+ private static native void resetBuffer(ByteBuffer directByteBuffer, int size);
- private static native long init(String fileName, int maxIO, Logger logger);
+ // Should we make this method static?
+ public static native void destroyBuffer(ByteBuffer buffer);
+
+ // Should we make this method static?
+ private static native ByteBuffer newNativeBuffer(long size);
+
+
+
+ private static native long init(String fileName, int maxIO, Logger logger) throws MessagingException;
+
private native long size0(long handle) throws MessagingException;
private native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws MessagingException;
@@ -401,6 +495,7 @@
/** A native method that does nothing, and just validate if the ELF dependencies are loaded and on the correct platform as this binary format */
private static native int getNativeVersion();
+
/** Poll asynchrounous events from internal queues */
private static native void internalPollEvents(long handler);
@@ -408,14 +503,12 @@
// Inner classes
// -----------------------------------------------------------------------------------------
- private class PollerThread extends Thread
+ private class PollerRunnable implements Runnable
{
- PollerThread()
+ PollerRunnable()
{
- super("NativePoller for " + fileName);
}
- @Override
public void run()
{
try
@@ -428,6 +521,7 @@
// Case the poller thread is interrupted, this will allow us to
// restart the thread when required
poller = null;
+ pollerLatch.down();
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/buffers/ByteBufferBackedChannelBuffer.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -60,7 +60,7 @@
throw new NullPointerException("buffer");
}
- this.buffer = buffer.slice();
+ this.buffer = buffer;
capacity = buffer.remaining();
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -48,8 +48,6 @@
*/
void open(int maxIO) throws Exception;
- void setBufferCallback(BufferCallback callback);
-
int getAlignment() throws Exception;
int calculateBlockStart(int position) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -42,8 +42,14 @@
boolean isSupportsCallbacks();
ByteBuffer newBuffer(int size);
+
+ void releaseBuffer(ByteBuffer buffer);
+
+ void setBufferCallback(BufferCallback bufferCallback);
+
+ BufferCallback getBufferCallback();
- // Avoid using this method in production as it creates an unecessary copy
+ // To be used in tests only
ByteBuffer wrapBuffer(byte[] bytes);
int getAlignment();
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -25,8 +25,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -60,18 +59,26 @@
private AsynchronousFile aioFile;
private final AtomicLong position = new AtomicLong(0);
+
+ private BufferCallback bufferCallback;
- // A context switch on AIO would make it to synchronize the disk before
- // switching to the new thread, what would cause
- // serious performance problems. Because of that we make all the writes on
- // AIO using a single thread.
- private ExecutorService executor;
+ /** A context switch on AIO would make it to synchronize the disk before
+ switching to the new thread, what would cause
+ serious performance problems. Because of that we make all the writes on
+ AIO using a single thread. */
+ private final Executor executor;
- public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO)
+ /** The pool for Thread pollers */
+ private final Executor pollerExecutor;
+
+ public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final BufferCallback bufferCallback, final Executor executor, final Executor pollerExecutor)
{
this.journalDir = journalDir;
this.fileName = fileName;
this.maxIO = maxIO;
+ this.bufferCallback = bufferCallback;
+ this.executor = executor;
+ this.pollerExecutor = pollerExecutor;
}
public boolean isOpen()
@@ -99,10 +106,20 @@
{
checkOpened();
opened = false;
- executor.shutdown();
- while (!executor.awaitTermination(60, TimeUnit.SECONDS))
+ final CountDownLatch donelatch = new CountDownLatch(1);
+
+ executor.execute(new Runnable()
{
+ public void run()
+ {
+ donelatch.countDown();
+ }
+ });
+
+
+ while (!donelatch.await(60, TimeUnit.SECONDS))
+ {
log.warn("Executor on file " + fileName + " couldn't complete its tasks in 60 seconds.",
new Exception("Warning: Executor on file " + fileName + " couldn't complete its tasks in 60 seconds."));
}
@@ -190,10 +207,10 @@
public synchronized void open(final int currentMaxIO) throws Exception
{
opened = true;
- executor = Executors.newSingleThreadExecutor();
aioFile = newFile();
aioFile.open(journalDir + "/" + fileName, currentMaxIO);
position.set(0);
+ aioFile.setBufferCallback(bufferCallback);
}
@@ -289,7 +306,7 @@
*/
protected AsynchronousFile newFile()
{
- return new AsynchronousFileImpl();
+ return new AsynchronousFileImpl(executor, pollerExecutor);
}
// Private methods
@@ -300,24 +317,7 @@
final int bytesToWrite,
final long positionToWrite)
{
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
- }
- catch (Exception e)
- {
- log.warn(e.getMessage(), e);
- if (callback != null)
- {
- callback.onError(-1, e.getMessage());
- }
- }
- }
- });
+ aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
private void checkOpened() throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -23,9 +23,13 @@
package org.jboss.messaging.core.journal.impl;
import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.utils.JBMThreadFactory;
/**
*
@@ -36,6 +40,16 @@
*/
public class AIOSequentialFileFactory extends AbstractSequentialFactory
{
+
+ /** A single AIO write executor for every AIO File.
+ * This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
+ * And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
+ private final Executor writeExecutor = Executors.newSingleThreadExecutor();
+
+
+ private final Executor pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), false));
+
+
public AIOSequentialFileFactory(final String journalDir)
{
super(journalDir);
@@ -43,7 +57,7 @@
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
- return new AIOSequentialFile(journalDir, fileName, maxIO);
+ return new AIOSequentialFile(journalDir, fileName, maxIO, bufferCallback, writeExecutor, pollerExecutor);
}
public boolean isSupportsCallbacks()
@@ -62,12 +76,12 @@
{
size = (size / 512 + 1) * 512;
}
- return ByteBuffer.allocateDirect(size);
+ return AsynchronousFileImpl.newBuffer(size);
}
public void clearBuffer(final ByteBuffer directByteBuffer)
{
- AsynchronousFileImpl.resetBuffer(directByteBuffer, directByteBuffer.limit());
+ AsynchronousFileImpl.clearBuffer(directByteBuffer);
}
public int getAlignment()
@@ -91,4 +105,12 @@
return pos;
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+ */
+ public void releaseBuffer(ByteBuffer buffer)
+ {
+ AsynchronousFileImpl.destroyBuffer(buffer);
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -28,6 +28,7 @@
import java.util.Arrays;
import java.util.List;
+import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
@@ -44,6 +45,8 @@
private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
protected final String journalDir;
+
+ protected BufferCallback bufferCallback;
public AbstractSequentialFactory(final String journalDir)
{
@@ -85,4 +88,22 @@
return Arrays.asList(fileNames);
}
+ /**
+ * @return the bufferCallback
+ */
+ public BufferCallback getBufferCallback()
+ {
+ return bufferCallback;
+ }
+
+ /**
+ * @param bufferCallback the bufferCallback to set
+ */
+ public void setBufferCallback(BufferCallback bufferCallback)
+ {
+ this.bufferCallback = bufferCallback;
+ }
+
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -274,6 +274,8 @@
this.syncNonTransactional = syncNonTransactional;
this.fileFactory = fileFactory;
+
+ this.fileFactory.setBufferCallback(this.buffersControl.callback);
this.filePrefix = filePrefix;
@@ -870,6 +872,10 @@
throw new IllegalStateException("Journal must be in started state");
}
+ // Disabling life cycle control on buffers, as we are reading the buffer
+ buffersControl.disable();
+
+
Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
List<JournalFile> orderedFiles = orderFiles();
@@ -880,12 +886,12 @@
for (JournalFile file : orderedFiles)
{
+ ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
+
file.getFile().open(1);
- ByteBuffer bb = fileFactory.newBuffer(fileSize);
+ int bytesRead = file.getFile().read(wholeFileBuffer);
- int bytesRead = file.getFile().read(bb);
-
if (bytesRead != fileSize)
{
// FIXME - We should extract everything we can from this file
@@ -900,16 +906,19 @@
file.getFile().getFileName());
}
+
+ wholeFileBuffer.position(0);
+
// First long is the ordering timestamp, we just jump its position
- bb.position(SIZE_HEADER);
+ wholeFileBuffer.position(SIZE_HEADER);
boolean hasData = false;
- while (bb.hasRemaining())
+ while (wholeFileBuffer.hasRemaining())
{
- final int pos = bb.position();
+ final int pos = wholeFileBuffer.position();
- byte recordType = bb.get();
+ byte recordType = wholeFileBuffer.get();
if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
{
@@ -919,7 +928,7 @@
continue;
}
- if (bb.position() + SIZE_INT > fileSize)
+ if (wholeFileBuffer.position() + SIZE_INT > fileSize)
{
// II - Ignore this record, lets keep looking
continue;
@@ -927,7 +936,7 @@
// III - Every record has the file-id.
// This is what supports us from not re-filling the whole file
- int readFileId = bb.getInt();
+ int readFileId = wholeFileBuffer.getInt();
// IV - This record is from a previous file-usage. The file was
// reused and we need to ignore this record
@@ -937,7 +946,7 @@
// next reclaiming will fix it
hasData = true;
- bb.position(pos + 1);
+ wholeFileBuffer.position(pos + 1);
continue;
}
@@ -946,24 +955,24 @@
if (isTransaction(recordType))
{
- if (bb.position() + SIZE_LONG > fileSize)
+ if (wholeFileBuffer.position() + SIZE_LONG > fileSize)
{
continue;
}
- transactionID = bb.getLong();
+ transactionID = wholeFileBuffer.getLong();
}
long recordID = 0;
if (!isCompleteTransaction(recordType))
{
- if (bb.position() + SIZE_LONG > fileSize)
+ if (wholeFileBuffer.position() + SIZE_LONG > fileSize)
{
continue;
}
- recordID = bb.getLong();
+ recordID = wholeFileBuffer.getLong();
maxID = Math.max(maxID, recordID);
}
@@ -984,14 +993,14 @@
if (isContainsBody(recordType))
{
- if (bb.position() + SIZE_INT > fileSize)
+ if (wholeFileBuffer.position() + SIZE_INT > fileSize)
{
continue;
}
- variableSize = bb.getInt();
+ variableSize = wholeFileBuffer.getInt();
- if (bb.position() + variableSize > fileSize)
+ if (wholeFileBuffer.position() + variableSize > fileSize)
{
log.warn("Record at position " + pos +
" file:" +
@@ -1002,12 +1011,12 @@
if (recordType != DELETE_RECORD_TX)
{
- userRecordType = bb.get();
+ userRecordType = wholeFileBuffer.get();
}
record = new byte[variableSize];
- bb.get(record);
+ wholeFileBuffer.get(record);
}
if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
@@ -1015,11 +1024,11 @@
if (recordType == PREPARE_RECORD)
{
// Add the variable size required for preparedTransactions
- preparedTransactionExtraDataSize = bb.getInt();
+ preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
}
// Both commit and record contain the recordSummary, and this is
// used to calculate the record-size on both record-types
- variableSize += bb.getInt() * SIZE_INT * 2;
+ variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
}
int recordSize = getRecordSize(recordType);
@@ -1042,11 +1051,11 @@
continue;
}
- int oldPos = bb.position();
+ int oldPos = wholeFileBuffer.position();
- bb.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+ wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
- int checkSize = bb.getInt();
+ int checkSize = wholeFileBuffer.getInt();
// VII - The checkSize at the end has to match with the size
// informed at the beggining.
@@ -1063,12 +1072,12 @@
// next reclaiming will fix it
hasData = true;
- bb.position(pos + SIZE_BYTE);
+ wholeFileBuffer.position(pos + SIZE_BYTE);
continue;
}
- bb.position(oldPos);
+ wholeFileBuffer.position(oldPos);
// At this point everything is checked. So we relax and just load
// the data now.
@@ -1190,10 +1199,10 @@
byte extraData[] = new byte[preparedTransactionExtraDataSize];
- bb.get(extraData);
+ wholeFileBuffer.get(extraData);
// Pair <FileID, NumberOfElements>
- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
tx.prepared = true;
@@ -1231,7 +1240,7 @@
// We need to read it even if transaction was not found, or
// the reading checks would fail
// Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
+ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
// The commit could be alone on its own journal-file and the
// whole transaction body was reclaimed but not the
@@ -1319,18 +1328,20 @@
}
}
- checkSize = bb.getInt();
+ checkSize = wholeFileBuffer.getInt();
// This is a sanity check about the loading code itself.
// If this checkSize doesn't match, it means the reading method is
// not doing what it was supposed to do
if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
- throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() + ", pos = " + pos);
}
- lastDataPos = bb.position();
+ lastDataPos = wholeFileBuffer.position();
}
+
+ fileFactory.releaseBuffer(wholeFileBuffer);
file.getFile().close();
@@ -1345,6 +1356,8 @@
}
}
+ buffersControl.enable();
+
// Create any more files we need
// FIXME - size() involves a scan
@@ -1377,11 +1390,6 @@
{
currentFile.getFile().open();
- if (reuseBufferSize > 0)
- {
- currentFile.getFile().setBufferCallback(buffersControl.callback);
- }
-
currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
currentFile.setOffset(currentFile.getFile().position());
@@ -1680,6 +1688,8 @@
freeFiles.clear();
openedFiles.clear();
+
+ buffersControl.clearPoll();
state = STATE_STOPPED;
}
@@ -1932,8 +1942,19 @@
return recordSize;
}
+
+ /**
+ * This method requires bufferControl disabled, or the reads are going to be invalid
+ * */
private List<JournalFile> orderFiles() throws Exception
{
+
+ if (buffersControl.enabled)
+ {
+ // Sanity check, this shouldn't happen unless someone made an invalid change on the code
+ throw new IllegalStateException("Buffer life cycle control needs to be disabled at this point!!!");
+ }
+
List<String> fileNames = fileFactory.listFiles(fileExtension);
List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
@@ -1949,6 +1970,10 @@
file.read(bb);
int orderingID = bb.getInt();
+
+ fileFactory.releaseBuffer(bb);
+
+ bb = null;
if (nextOrderingId.get() < orderingID)
{
@@ -2105,11 +2130,6 @@
file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
file.setOffset(file.getFile().calculateBlockStart(SIZE_HEADER));
-
- if (reuseBufferSize > 0)
- {
- file.getFile().setBufferCallback(buffersControl.callback);
- }
}
private int generateOrderingID()
@@ -2387,9 +2407,22 @@
/** This queue is fed by {@link JournalImpl.ReuseBuffersController.LocalBufferCallback}} which is called directly by NIO or NIO.
* On the case of the AIO this is almost called by the native layer as soon as the buffer is not being used any more
* and ready to be reused or GCed */
- private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+ private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
+
+ /** During reload we may disable/enable buffer reuse */
+ private boolean enabled = true;
final BufferCallback callback = new LocalBufferCallback();
+
+ public void enable()
+ {
+ this.enabled = true;
+ }
+
+ public void disable()
+ {
+ this.enabled = false;
+ }
public ByteBuffer newBuffer(final int size)
{
@@ -2398,11 +2431,11 @@
// just to cleanup this
if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
{
- trace("Clearing reuse buffers queue with " + reuseBuffers.size() + " elements");
+ trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + " elements");
bufferReuseLastTime = System.currentTimeMillis();
- reuseBuffers.clear();
+ clearPoll();
}
// if a buffer is bigger than the configured-size, we just create a new
@@ -2418,7 +2451,7 @@
int alignedSize = fileFactory.calculateBlockSize(size);
// Try getting a buffer from the queue...
- ByteBuffer buffer = reuseBuffers.poll();
+ ByteBuffer buffer = reuseBuffersQueue.poll();
if (buffer == null)
{
@@ -2434,24 +2467,41 @@
fileFactory.clearBuffer(buffer);
}
-
+
buffer.rewind();
return buffer;
}
}
+ public void clearPoll()
+ {
+ ByteBuffer reusedBuffer;
+
+ while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
+ {
+ fileFactory.releaseBuffer(reusedBuffer);
+ }
+ }
+
private class LocalBufferCallback implements BufferCallback
{
public void bufferDone(final ByteBuffer buffer)
{
- bufferReuseLastTime = System.currentTimeMillis();
-
- // If a buffer has any other than the configured size, the buffer
- // will be just sent to GC
- if (buffer.capacity() == reuseBufferSize)
+ if (enabled)
{
- reuseBuffers.offer(buffer);
+ bufferReuseLastTime = System.currentTimeMillis();
+
+ // If a buffer has any other than the configured size, the buffer
+ // will be just sent to GC
+ if (buffer.capacity() == reuseBufferSize)
+ {
+ reuseBuffersQueue.offer(buffer);
+ }
+ else
+ {
+ fileFactory.releaseBuffer(buffer);
+ }
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -54,10 +54,11 @@
BufferCallback bufferCallback;
- public NIOSequentialFile(final String directory, final String fileName)
+ public NIOSequentialFile(final String directory, final String fileName, final BufferCallback bufferCallback)
{
this.directory = directory;
file = new File(directory + "/" + fileName);
+ this.bufferCallback = bufferCallback;
}
public int getAlignment()
@@ -92,11 +93,6 @@
open();
}
- public void setBufferCallback(final BufferCallback callback)
- {
- bufferCallback = callback;
- }
-
public void fill(final int position, final int size, final byte fillCharacter) throws Exception
{
ByteBuffer bb = ByteBuffer.allocateDirect(size);
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -53,7 +53,7 @@
// maxIO is ignored on NIO
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
- return new NIOSequentialFile(journalDir, fileName);
+ return new NIOSequentialFile(journalDir, fileName, bufferCallback);
}
public boolean isSupportsCallbacks()
@@ -94,4 +94,12 @@
return bytes;
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+ */
+ public void releaseBuffer(ByteBuffer buffer)
+ {
+ // nothing to be done here
+ }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -882,9 +882,11 @@
configuration.setSecurityEnabled(false);
configuration.setBindingsDirectory(getBindingsDir(node, backup));
configuration.setJournalMinFiles(2);
+ configuration.setJournalMaxAIO(1000);
configuration.setJournalDirectory(getJournalDir(node, backup));
configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(JournalType.NIO);
+ configuration.setJournalType(JournalType.ASYNCIO);
+ configuration.setJournalMaxAIO(1000);
configuration.setPagingDirectory(getPageDir(node, backup));
configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, backup));
configuration.setClustered(true);
@@ -978,7 +980,8 @@
configuration.setJournalMinFiles(2);
configuration.setJournalDirectory(getJournalDir(node, false));
configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(JournalType.NIO);
+ configuration.setJournalType(JournalType.ASYNCIO);
+ configuration.setJournalMaxAIO(1000);
configuration.setPagingDirectory(getPageDir(node, false));
configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
configuration.setClustered(true);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -124,7 +124,7 @@
backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
backupConf.setJournalFileSize(100 * 1024);
- backupConf.setJournalType(JournalType.NIO);
+ backupConf.setJournalType(JournalType.ASYNCIO);
backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
backupConf.setPagingGlobalWatermarkSize(pageSize);
@@ -164,7 +164,7 @@
liveConf.setPagingGlobalWatermarkSize(pageSize);
liveConf.setJournalFileSize(100 * 1024);
- liveConf.setJournalType(JournalType.NIO);
+ liveConf.setJournalType(JournalType.ASYNCIO);
}
if (fileBased)
@@ -208,7 +208,7 @@
backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
backupConf.setJournalFileSize(100 * 1024);
- backupConf.setJournalType(JournalType.NIO);
+ backupConf.setJournalType(JournalType.ASYNCIO);
backupConf.setPagingMaxGlobalSizeBytes(-1);
backupConf.setPagingGlobalWatermarkSize(-1);
@@ -262,7 +262,7 @@
liveConf.setPagingGlobalWatermarkSize(-1);
liveConf.setJournalFileSize(100 * 1024);
- liveConf.setJournalType(JournalType.NIO);
+ liveConf.setJournalType(JournalType.ASYNCIO);
liveServer = Messaging.newMessagingServer(liveConf);
}
else
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageMultiThreadFailoverTest.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -103,7 +103,7 @@
backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
backupConf.setJournalFileSize(100 * 1024);
- backupConf.setJournalType(JournalType.NIO);
+ backupConf.setJournalType(JournalType.ASYNCIO);
backupConf.setSecurityEnabled(false);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -124,7 +124,7 @@
liveConf.setJournalFileSize(100 * 1024);
- liveConf.setJournalType(JournalType.NIO);
+ liveConf.setJournalType(JournalType.ASYNCIO);
liveConf.setSecurityEnabled(false);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -382,7 +382,7 @@
backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
backupConf.setJournalFileSize(100 * 1024);
- backupConf.setJournalType(JournalType.NIO);
+ backupConf.setJournalType(JournalType.ASYNCIO);
backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
backupConf.setPagingGlobalWatermarkSize(pageSize);
@@ -422,7 +422,7 @@
liveConf.setPagingGlobalWatermarkSize(pageSize);
liveConf.setJournalFileSize(100 * 1024);
- liveConf.setJournalType(JournalType.NIO);
+ liveConf.setJournalType(JournalType.ASYNCIO);
}
if (fileBased)
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/XALargeMessageMultiThreadFailoverTest.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -85,7 +85,7 @@
backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
backupConf.setJournalFileSize(100 * 1024);
- backupConf.setJournalType(JournalType.NIO);
+ backupConf.setJournalType(JournalType.ASYNCIO);
backupConf.setSecurityEnabled(false);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -106,7 +106,7 @@
liveConf.setJournalFileSize(100 * 1024);
- liveConf.setJournalType(JournalType.NIO);
+ liveConf.setJournalType(JournalType.ASYNCIO);
liveConf.setSecurityEnabled(false);
liveConf.getAcceptorConfigurations()
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -25,7 +25,6 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
@@ -78,6 +77,7 @@
ByteBuffer buff = factory.newBuffer(10);
assertEquals(512, buff.limit());
file.close();
+ factory.releaseBuffer(buff);
}
public void testBlockCallback() throws Exception
@@ -131,7 +131,7 @@
BlockCallback callback = new BlockCallback();
- final int NUMBER_OF_RECORDS = 10000;
+ final int NUMBER_OF_RECORDS = 500;
SequentialFile file = factory.createSequentialFile("callbackBlock.log", 1000);
file.open();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -23,7 +23,6 @@
package org.jboss.messaging.tests.integration.journal;
import java.io.File;
-import java.util.concurrent.Executors;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.journal.SequentialFileFactory;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -60,7 +60,7 @@
configuration.start();
- configuration.setJournalType(JournalType.NIO);
+ configuration.setJournalType(JournalType.ASYNCIO);
final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
journal.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -202,7 +202,7 @@
{
Journal journal =
new JournalImpl(10 * 1024 * 1024, 10, true, true, getFileFactory(),
- "jbm-data", "jbm", 5000, 0);
+ "jbm-data", "jbm", 5000, 10 * 1024);
journal.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -321,7 +321,7 @@
config.setJournalFileSize(10 * 1024 * 1024);
config.setJournalMinFiles(5);
- config.setJournalType(JournalType.NIO);
+ config.setJournalType(JournalType.ASYNCIO);
return config;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -22,12 +22,7 @@
package org.jboss.messaging.tests.unit.core.asyncio;
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.BufferCallback;
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
@@ -35,7 +30,17 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.BufferCallback;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.JBMThreadFactory;
+
/**
*
* you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
@@ -53,19 +58,40 @@
private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
byte commonBuffer[] = null;
+
+ ExecutorService executor;
+
+ ExecutorService pollerExecutor;
+
private static void debug(final String msg)
{
log.debug(msg);
}
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), false));
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ executor.shutdown();
+ pollerExecutor.shutdown();
+ super.tearDown();
+ }
+
/**
* Opening and closing a file immediately can lead to races on the native layer,
* creating crash conditions.
* */
public void testOpenClose() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
for (int i = 0; i < 1000; i++)
{
controller.open(FILE_NAME, 10000);
@@ -73,16 +99,16 @@
}
}
-
+
public void testFileNonExistent() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
for (int i = 0; i < 1000; i++)
{
try
{
controller.open("/non-existent/IDontExist.error", 10000);
- fail ("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail.");
+ fail("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail.");
}
catch (Throwable ignored)
{
@@ -106,21 +132,22 @@
*/
public void testTwoFiles() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- final AsynchronousFileImpl controller2 = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+ final AsynchronousFileImpl controller2 = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME + ".1", 10000);
controller2.open(FILE_NAME + ".2", 10000);
int numberOfLines = 1000;
int size = 1024;
+ ByteBuffer buffer = null;
try
{
CountDownLatch latchDone = new CountDownLatch(numberOfLines);
CountDownLatch latchDone2 = new CountDownLatch(numberOfLines);
- ByteBuffer block = controller.newBuffer(size);
- encodeBufer(block);
+ buffer = AsynchronousFileImpl.newBuffer(size);
+ encodeBufer(buffer);
preAlloc(controller, numberOfLines * size);
preAlloc(controller2, numberOfLines * size);
@@ -144,8 +171,8 @@
{
CountDownCallback tmp2 = iter2.next();
- controller.write(counter * size, size, block, tmp);
- controller.write(counter * size, size, block, tmp2);
+ controller.write(counter * size, size, buffer, tmp);
+ controller.write(counter * size, size, buffer, tmp2);
if (++counter % 5000 == 0)
{
debug(5000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
@@ -202,6 +229,7 @@
}
finally
{
+ AsynchronousFileImpl.destroyBuffer(buffer);
try
{
controller.close();
@@ -249,7 +277,8 @@
}
}
- AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+ ByteBuffer buffer = null;
try
{
@@ -258,13 +287,13 @@
controller.open(FILE_NAME, 10);
controller.close();
- controller = new AsynchronousFileImpl();
+ controller = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME, 10);
controller.fill(0, 1, 512, (byte)'j');
- ByteBuffer buffer = controller.newBuffer(SIZE);
+ buffer = AsynchronousFileImpl.newBuffer(SIZE);
buffer.clear();
@@ -305,8 +334,7 @@
{
}
- // newBuffer = ByteBuffer.allocateDirect(512);
- newBuffer = controller.newBuffer(512);
+ newBuffer = AsynchronousFileImpl.newBuffer(512);
callbackLocal = new LocalCallback();
controller.read(0, 512, newBuffer, callbackLocal);
callbackLocal.latch.await();
@@ -325,6 +353,8 @@
}
finally
{
+ AsynchronousFileImpl.destroyBuffer(buffer);
+
try
{
controller.close();
@@ -340,7 +370,7 @@
public void testBufferCallbackUniqueBuffers() throws Exception
{
boolean closed = false;
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
try
{
final int NUMBER_LINES = 1000;
@@ -366,7 +396,7 @@
CountDownCallback aio = new CountDownCallback(latch);
for (int i = 0; i < NUMBER_LINES; i++)
{
- ByteBuffer buffer = controller.newBuffer(SIZE);
+ ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
buffer.rewind();
for (int j = 0; j < SIZE; j++)
{
@@ -400,6 +430,11 @@
}
}
+ for (ByteBuffer bufferTmp : buffers)
+ {
+ AsynchronousFileImpl.destroyBuffer(bufferTmp);
+ }
+
buffers.clear();
}
@@ -415,7 +450,8 @@
public void testBufferCallbackAwaysSameBuffer() throws Exception
{
boolean closed = false;
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+ ByteBuffer buffer = null;
try
{
final int NUMBER_LINES = 1000;
@@ -440,7 +476,7 @@
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
CountDownCallback aio = new CountDownCallback(latch);
- ByteBuffer buffer = controller.newBuffer(SIZE);
+ buffer = AsynchronousFileImpl.newBuffer(SIZE);
buffer.rewind();
for (int j = 0; j < SIZE; j++)
{
@@ -482,6 +518,7 @@
}
finally
{
+ AsynchronousFileImpl.destroyBuffer(buffer);
if (!closed)
{
controller.close();
@@ -491,11 +528,22 @@
public void testRead() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+ controller.setBufferCallback(new BufferCallback()
+ {
+
+ public void bufferDone(ByteBuffer buffer)
+ {
+ AsynchronousFileImpl.destroyBuffer(buffer);
+ }
+
+ });
+
+ ByteBuffer readBuffer = null;
try
{
- final int NUMBER_LINES = 5000;
+ final int NUMBER_LINES = 10000;
final int SIZE = 1024;
controller.open(FILE_NAME, 1000);
@@ -508,13 +556,15 @@
for (int i = 0; i < NUMBER_LINES; i++)
{
- ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
- addString("Str value " + i + "\n", buffer);
- for (int j = buffer.position(); j < buffer.capacity() - 1; j++)
+ if (i % 1000 == 0)
{
- buffer.put((byte)' ');
+ System.out.println("Wrote " + i + " lines");
}
- buffer.put((byte)'\n');
+ ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
+ for (int j = 0; j < SIZE; j++)
+ {
+ buffer.put(getSamplebyte(j));
+ }
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -527,48 +577,76 @@
// If you call close you're supposed to wait events to finish before
// closing it
controller.close();
+ controller.setBufferCallback(null);
+
controller.open(FILE_NAME, 10);
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+ readBuffer = AsynchronousFileImpl.newBuffer(SIZE);
+ Thread t = null;
+
for (int i = 0; i < NUMBER_LINES; i++)
{
- newBuffer.clear();
- addString("Str value " + i + "\n", newBuffer);
- for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++)
+ if (i % 1000 == 0)
{
- newBuffer.put((byte)' ');
+ System.out.println("Read " + i + " lines");
}
- newBuffer.put((byte)'\n');
+ AsynchronousFileImpl.clearBuffer(readBuffer);
CountDownLatch latch = new CountDownLatch(1);
CountDownCallback aio = new CountDownCallback(latch);
- ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+ controller.read(i * SIZE, SIZE, readBuffer, aio);
- controller.read(i * SIZE, SIZE, buffer, aio);
+ // at the first 20 lines, we will force a lot of garbage, to make sure the pointers are well isolated from Garbage Collection
+ if (i < 20)
+ {
+ if (t != null)
+ {
+ t.join();
+ }
+
+ t = new Thread()
+ {
+ public void run()
+ {
+ // Force a lot of garbage during reading, to make sure the memory read is well isolated from
+ // garbage collection
+ WeakReference<Object> garbage = new WeakReference<Object>(new Object());
+ // Stays in loop until GC kicks in to clean up this reference
+ while (garbage.get() != null)
+ {
+ @SuppressWarnings("unused")
+ byte[] garbage2 = new byte[10 * 1024 * 1024]; // More Garbage
+ }
+
+ }
+ };
+
+ t.start();
+ }
+
latch.await();
assertFalse(aio.errorCalled);
assertTrue(aio.doneCalled);
byte bytesRead[] = new byte[SIZE];
- byte bytesCompare[] = new byte[SIZE];
+ readBuffer.get(bytesRead);
- newBuffer.rewind();
- newBuffer.get(bytesCompare);
- buffer.rewind();
- buffer.get(bytesRead);
-
for (int count = 0; count < SIZE; count++)
{
- assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
+ assertEquals("byte position " + count + " differs on line " + i + " position = " + count,
+ getSamplebyte(count),
+ bytesRead[count]);
}
-
- assertTrue(buffer.equals(newBuffer));
}
}
finally
{
+ if (readBuffer != null)
+ {
+ AsynchronousFileImpl.destroyBuffer(readBuffer);
+ }
try
{
controller.close();
@@ -587,7 +665,7 @@
* The file is also read after being written to validate its correctness */
public void testConcurrentClose() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
try
{
@@ -598,10 +676,20 @@
controller.open(FILE_NAME, 10000);
controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+
+ controller.setBufferCallback(new BufferCallback()
+ {
+ public void bufferDone(ByteBuffer buffer)
+ {
+ AsynchronousFileImpl.destroyBuffer(buffer);
+ }
+
+ });
+
for (int i = 0; i < NUMBER_LINES; i++)
{
- ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+ ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
buffer.clear();
addString("Str value " + i + "\n", buffer);
@@ -618,14 +706,16 @@
// If you call close you're supposed to wait events to finish before
// closing it
controller.close();
+
+ controller.setBufferCallback(null);
assertEquals(0, readLatch.getCount());
readLatch.await();
controller.open(FILE_NAME, 10);
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+ ByteBuffer newBuffer = AsynchronousFileImpl.newBuffer(SIZE);
- ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+ ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
for (int i = 0; i < NUMBER_LINES; i++)
{
@@ -659,6 +749,9 @@
assertTrue(buffer.equals(newBuffer));
}
+
+ AsynchronousFileImpl.destroyBuffer(newBuffer);
+ AsynchronousFileImpl.destroyBuffer(buffer);
}
finally
@@ -676,15 +769,17 @@
private void asyncData(final int numberOfLines, final int size, final int aioLimit) throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME, aioLimit);
-
+
+ ByteBuffer buffer = null;
+
try
{
CountDownLatch latchDone = new CountDownLatch(numberOfLines);
- ByteBuffer block = controller.newBuffer(size);
- encodeBufer(block);
+ buffer = AsynchronousFileImpl.newBuffer(size);
+ encodeBufer(buffer);
preAlloc(controller, numberOfLines * size);
@@ -701,7 +796,7 @@
int counter = 0;
for (CountDownCallback tmp : list)
{
- controller.write(counter * size, size, block, tmp);
+ controller.write(counter * size, size, buffer, tmp);
if (++counter % 20000 == 0)
{
debug(20000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
@@ -736,6 +831,7 @@
}
finally
{
+ AsynchronousFileImpl.destroyBuffer(buffer);
try
{
controller.close();
@@ -749,16 +845,17 @@
public void testDirectSynchronous() throws Exception
{
+ ByteBuffer buffer = null;
try
{
final int NUMBER_LINES = 3000;
final int SIZE = 1024;
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME, 2000);
- ByteBuffer block = ByteBuffer.allocateDirect(SIZE);
- encodeBufer(block);
+ buffer = AsynchronousFileImpl.newBuffer(SIZE);
+ encodeBufer(buffer);
preAlloc(controller, NUMBER_LINES * SIZE);
@@ -768,7 +865,7 @@
{
CountDownLatch latchDone = new CountDownLatch(1);
CountDownCallback aioBlock = new CountDownCallback(latchDone);
- controller.write(i * 512, 512, block, aioBlock);
+ controller.write(i * 512, 512, buffer, aioBlock);
latchDone.await();
assertTrue(aioBlock.doneCalled);
assertFalse(aioBlock.errorCalled);
@@ -793,27 +890,33 @@
{
throw e;
}
+ finally
+ {
+ AsynchronousFileImpl.destroyBuffer(buffer);
+ }
}
public void testInvalidWrite() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME, 2000);
-
+
+ ByteBuffer buffer = null;
+
try
{
final int SIZE = 512;
- ByteBuffer block = controller.newBuffer(SIZE);
- encodeBufer(block);
+ buffer = AsynchronousFileImpl.newBuffer(SIZE);
+ encodeBufer(buffer);
preAlloc(controller, 10 * 512);
CountDownLatch latchDone = new CountDownLatch(1);
CountDownCallback aioBlock = new CountDownCallback(latchDone);
- controller.write(11, 512, block, aioBlock);
+ controller.write(11, 512, buffer, aioBlock);
latchDone.await();
@@ -827,6 +930,7 @@
}
finally
{
+ AsynchronousFileImpl.destroyBuffer(buffer);
controller.close();
}
@@ -834,10 +938,10 @@
public void testInvalidAlloc() throws Exception
{
- AsynchronousFileImpl controller = new AsynchronousFileImpl();
try
{
- ByteBuffer buffer = controller.newBuffer(300);
+ @SuppressWarnings("unused")
+ ByteBuffer buffer = AsynchronousFileImpl.newBuffer(300);
fail("Exception expected");
}
catch (Exception ignored)
@@ -845,10 +949,58 @@
}
}
+
+ // This is in particular testing for http://bugs.sun.com/view_bug.do?bug_id=6791815
+ public void testAllocations() throws Exception
+ {
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread ts[] = new Thread[100];
+ final CountDownLatch align = new CountDownLatch(ts.length);
+ final CountDownLatch start = new CountDownLatch(1);
+
+ for (int i = 0; i < ts.length; i++)
+ {
+ ts[i] = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ align.countDown();
+ start.await();
+ for (int i = 0; i < 1000; i++)
+ {
+ ByteBuffer buffer = AsynchronousFileImpl.newBuffer(512);
+ AsynchronousFileImpl.destroyBuffer(buffer);
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+ ts[i].start();
+ }
+
+ align.await();
+ start.countDown();
+
+ for (Thread t: ts)
+ {
+ t.join();
+ }
+
+ assertEquals(0, errors.get());
+ }
+
+
public void testSize() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
final int NUMBER_LINES = 10;
final int SIZE = 1024;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -22,18 +22,19 @@
package org.jboss.messaging.tests.unit.core.asyncio;
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.logging.Logger;
-
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.JBMThreadFactory;
+
/**
*
* you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
@@ -59,18 +60,35 @@
// Executor exec
- Executor executor = Executors.newSingleThreadExecutor();
+ ExecutorService executor;
+
+ ExecutorService pollerExecutor;
- @Override
+
+ private static void debug(final String msg)
+ {
+ log.debug(msg);
+ }
+
+
+
protected void setUp() throws Exception
{
super.setUp();
- position.set(0);
+ pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), false));
+ executor = Executors.newSingleThreadExecutor();
}
-
+
+ protected void tearDown() throws Exception
+ {
+ executor.shutdown();
+ pollerExecutor.shutdown();
+ super.tearDown();
+ }
+
public void testMultipleASynchronousWrites() throws Throwable
{
- executeTest(false);
+ executeTest(false);
}
public void testMultipleSynchronousWrites() throws Throwable
@@ -80,15 +98,15 @@
private void executeTest(final boolean sync) throws Throwable
{
- log.debug(sync ? "Sync test:" : "Async test");
- AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl();
+ debug(sync ? "Sync test:" : "Async test");
+ AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl(executor, pollerExecutor);
jlibAIO.open(FILE_NAME, 21000);
try
{
- log.debug("Preallocating file");
+ debug("Preallocating file");
jlibAIO.fill(0l, NUMBER_OF_THREADS, SIZE * NUMBER_OF_LINES, (byte)0);
- log.debug("Done Preallocating file");
+ debug("Done Preallocating file");
CountDownLatch latchStart = new CountDownLatch(NUMBER_OF_THREADS + 1);
@@ -115,7 +133,7 @@
}
long endTime = System.currentTimeMillis();
- log.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
+ debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
NUMBER_OF_THREADS *
NUMBER_OF_LINES *
1000 /
@@ -164,11 +182,17 @@
{
super.run();
+
+ ByteBuffer buffer = null;
+
+ synchronized (MultiThreadAsynchronousFileTest.class)
+ {
+ buffer = AsynchronousFileImpl.newBuffer(SIZE);
+ }
+
try
{
- ByteBuffer buffer = libaio.newBuffer(SIZE);
-
// I'm aways reusing the same buffer, as I don't want any noise from
// malloc on the measurement
// Encoding buffer
@@ -225,7 +249,7 @@
long endtime = System.currentTimeMillis();
- log.debug(Thread.currentThread().getName() + " Rec/Sec= " +
+ debug(Thread.currentThread().getName() + " Rec/Sec= " +
NUMBER_OF_LINES *
1000 /
(endtime - startTime) +
@@ -246,6 +270,13 @@
e.printStackTrace();
failed = e;
}
+ finally
+ {
+ synchronized (MultiThreadAsynchronousFileTest.class)
+ {
+ AsynchronousFileImpl.destroyBuffer(buffer);
+ }
+ }
}
}
Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileTest.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -1,534 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
- * Middleware LLC, and individual contributors by the @authors tag. See the
- * copyright.txt in the distribution for a full listing of individual
- * contributors.
- *
- * This is free software; you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free
- * Software Foundation; either version 2.1 of the License, or (at your option)
- * any later version.
- *
- * This software is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- * details.
- *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this software; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- * site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.unit.core.journal.impl;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.jboss.messaging.core.asyncio.AsynchronousFile;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.journal.IOCallback;
-import org.jboss.messaging.core.journal.SequentialFile;
-import org.jboss.messaging.core.journal.impl.AIOSequentialFile;
-import org.jboss.messaging.tests.util.UnitTestCase;
-
-/**
- * Test AIOSEquentialFile using an EasyMock
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- */
-public class AIOSequentialFileTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- AsynchronousFile mockFile;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testOpen() throws Exception
- {
- openFile();
- }
-
- public void testAlignment() throws Exception
- {
- SequentialFile file = new MockAIOSequentialFileImpl(getTestDir(), "nothing", 1);
-
- try
- {
- file.getAlignment();
- fail("Exception expected");
- }
- catch (Exception ignored)
- {
- }
-
- file = openFile();
-
- EasyMock.expect(mockFile.getBlockSize()).andReturn(512);
-
- EasyMock.replay(mockFile);
-
- assertEquals(512, file.getAlignment());
-
- EasyMock.verify(mockFile);
- }
-
- public void testCalculateblockStart() throws Exception
- {
- SequentialFile file = new MockAIOSequentialFileImpl(getTestDir(), "nothing", 1);
-
- try
- {
- file.calculateBlockStart(10);
- fail("Exception expected");
- }
- catch (Exception ignored)
- {
- }
-
- file = openFile();
-
- EasyMock.expect(mockFile.getBlockSize()).andReturn(512);
-
- EasyMock.replay(mockFile);
-
- assertEquals(1024, file.calculateBlockStart(900));
-
- EasyMock.verify(mockFile);
- }
-
- public void testClose() throws Exception
- {
- SequentialFile file = new MockAIOSequentialFileImpl(getTestDir(), "nothing", 1);
-
- try
- {
- file.close();
- fail("Exception expected");
- }
- catch (Exception ignored)
- {
- }
-
- file = openFile();
-
- mockFile.close();
-
- EasyMock.replay(mockFile);
-
- file.close();
-
- EasyMock.verify(mockFile);
- }
-
- public void testDelete() throws Exception
- {
- File tmpFile = File.createTempFile("temporaryTestFile", ".tmp");
-
- assertTrue(tmpFile.exists());
-
- SequentialFile fileImpl = new MockAIOSequentialFileImpl(tmpFile.getParent(), tmpFile.getName(), 1);
-
- fileImpl.delete();
-
- // delete on a closed file
- assertFalse(tmpFile.exists());
-
- tmpFile = File.createTempFile("temporaryTestFile", ".tmp");
-
- assertTrue(tmpFile.exists());
-
- fileImpl = openFile(tmpFile.getParent(), tmpFile.getName());
-
- mockFile.close();
-
- EasyMock.replay(mockFile);
-
- fileImpl.delete();
-
- assertFalse(tmpFile.exists());
-
- EasyMock.verify(mockFile);
- }
-
- public void testFill() throws Exception
- {
- SequentialFile fileImpl = openFile();
-
- validateFill(fileImpl, 3 * 100 * 1024 * 1024, 3, 100 * 1024 * 1024);
-
- validateFill(fileImpl, 3 * 10 * 1024 * 1024, 3, 10 * 1024 * 1024);
-
- validateFill(fileImpl, 7 * 1024 * 1024, 7, 1024 * 1024);
-
- validateFill(fileImpl, 7 * 10 * 1024, 7, 10 * 1024);
-
- validateFill(fileImpl, 7 * 512, 7, 512);
-
- validateFill(fileImpl, 300, 1, 512);
- }
-
- public void testWriteWithCallback() throws Exception
- {
- SequentialFile fileImpl = openFile();
-
- ByteBuffer buffer = ByteBuffer.allocate(512);
-
- IOCallback callback = new IOCallback()
- {
-
- public void done()
- {
- }
-
- public void onError(int errorCode, String errorMessage)
- {
- }
- };
-
- mockFile.write(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.same(callback));
-
- mockFile.close();
-
- EasyMock.replay(mockFile);
-
- fileImpl.position(512 * 3);
-
- fileImpl.write(buffer, callback);
-
- // We need that to make sure the executor is cleared before the verify
- fileImpl.close();
-
- EasyMock.verify(mockFile);
- }
-
- public void testWriteWithSyncOnCallback() throws Exception
- {
- SequentialFile fileImpl = openFile();
-
- ByteBuffer buffer = ByteBuffer.allocate(512);
-
- mockFile.write(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
- EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
- {
-
- public Object answer() throws Throwable
- {
- IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[3];
-
- callback.done();
-
- return null;
- }
-
- });
-
- EasyMock.replay(mockFile);
-
- fileImpl.position(512 * 3);
-
- fileImpl.write(buffer, true);
-
- EasyMock.verify(mockFile);
- }
-
- public void testWriteWithNoSyncOnCallback() throws Exception
- {
- SequentialFile fileImpl = openFile();
-
- ByteBuffer buffer = ByteBuffer.allocate(512);
-
- mockFile.write(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
- EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
- {
-
- public Object answer() throws Throwable
- {
- IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[3];
-
- callback.done();
-
- return null;
- }
-
- });
-
- mockFile.close();
-
- EasyMock.replay(mockFile);
-
- fileImpl.position(512 * 3);
-
- fileImpl.write(buffer, false);
-
- fileImpl.close();
-
- EasyMock.verify(mockFile);
- }
-
- public void testWriteWithSyncAndCallbackError() throws Exception
- {
- SequentialFile fileImpl = openFile();
-
- ByteBuffer buffer = ByteBuffer.allocate(512);
-
- mockFile.write(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
- EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
- {
-
- public Object answer() throws Throwable
- {
- IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[3];
-
- callback.onError(100, "Fake Message");
-
- return null;
- }
-
- });
-
- EasyMock.replay(mockFile);
-
- fileImpl.position(512 * 3);
-
- try
- {
- fileImpl.write(buffer, true);
- fail("Exception was expected");
- }
- catch (MessagingException e)
- {
- assertEquals(100, e.getCode());
- assertEquals("Fake Message", e.getMessage());
- }
-
- EasyMock.verify(mockFile);
- }
-
- public void testWriteWithSyncAndException() throws Exception
- {
- SequentialFile fileImpl = openFile();
-
- ByteBuffer buffer = ByteBuffer.allocate(512);
-
- mockFile.write(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
- EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
- {
-
- public Object answer() throws Throwable
- {
- throw new IllegalArgumentException("Fake Message");
- }
-
- });
-
- EasyMock.replay(mockFile);
-
- fileImpl.position(512 * 3);
-
- try
- {
- fileImpl.write(buffer, true);
- fail("Exception was expected");
- }
- catch (MessagingException e)
- {
- assertEquals(-1, e.getCode());
- assertEquals("Fake Message", e.getMessage());
- }
-
- EasyMock.verify(mockFile);
- }
-
- public void testReadWithCallback() throws Exception
- {
- SequentialFile fileImpl = openFile();
-
- ByteBuffer buffer = ByteBuffer.allocate(512);
-
- IOCallback callback = new IOCallback()
- {
-
- public void done()
- {
- }
-
- public void onError(int errorCode, String errorMessage)
- {
- }
- };
-
- mockFile.read(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.same(callback));
-
- EasyMock.replay(mockFile);
-
- fileImpl.position(512 * 3);
-
- fileImpl.read(buffer, callback);
-
- EasyMock.verify(mockFile);
- }
-
- public void testReadWithoutCallback() throws Exception
- {
- SequentialFile fileImpl = openFile();
-
- ByteBuffer buffer = ByteBuffer.allocate(512);
-
- mockFile.read(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
- EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
- {
-
- public Object answer() throws Throwable
- {
- IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[3];
-
- callback.done();
-
- return null;
- }
-
- });
-
- EasyMock.replay(mockFile);
-
- fileImpl.position(512 * 3);
-
- fileImpl.read(buffer);
-
- EasyMock.verify(mockFile);
- }
-
- public void testReadWithoutCallbackOnError() throws Exception
- {
- SequentialFile fileImpl = openFile();
-
- ByteBuffer buffer = ByteBuffer.allocate(512);
-
- mockFile.read(EasyMock.eq(512l * 3l), EasyMock.eq(512l), EasyMock.same(buffer), EasyMock.isA(IOCallback.class));
-
- EasyMock.expectLastCall().andAnswer(new IAnswer<Object>()
- {
-
- public Object answer() throws Throwable
- {
- IOCallback callback = (IOCallback)EasyMock.getCurrentArguments()[3];
-
- callback.onError(100, "Fake Message");
-
- return null;
- }
-
- });
-
- EasyMock.replay(mockFile);
-
- fileImpl.position(512 * 3);
-
- try
- {
- fileImpl.read(buffer);
- fail("Expected Exception");
- }
- catch (MessagingException e)
- {
- assertEquals(100, e.getCode());
- assertEquals("Fake Message", "Fake Message");
- }
-
- EasyMock.verify(mockFile);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- mockFile = null;
- File testDir = new File(getTestDir());
- testDir.mkdirs();
- }
-
- // Private -------------------------------------------------------
-
- private void validateFill(final SequentialFile fileImpl,
- final int totalSize,
- final int numberOfBlocksExpected,
- final long blockSizeExpected) throws Exception
- {
- EasyMock.expect(mockFile.getBlockSize()).andReturn(512);
-
- mockFile.fill(512, numberOfBlocksExpected, blockSizeExpected, (byte)'b');
-
- EasyMock.replay(mockFile);
-
- fileImpl.fill(5, totalSize, (byte)'b');
-
- EasyMock.verify(mockFile);
-
- EasyMock.reset(mockFile);
- }
-
- private SequentialFile openFile() throws Exception
- {
- return openFile(getTemporaryDir(), "nothing");
- }
-
- private SequentialFile openFile(final String directory, final String fileName) throws Exception
- {
- mockFile = EasyMock.createStrictMock(AsynchronousFile.class);
-
- mockFile.open(directory + "/" + fileName, 1);
-
- EasyMock.replay(mockFile);
-
- SequentialFile file = new MockAIOSequentialFileImpl(directory, fileName, 1);
-
- file.open();
-
- EasyMock.verify(mockFile);
-
- EasyMock.reset(mockFile);
-
- return file;
- }
-
- // Inner classes -------------------------------------------------
-
- class MockAIOSequentialFileImpl extends AIOSequentialFile
- {
-
- public MockAIOSequentialFileImpl(final String journalDir, final String fileName, final int maxIO) throws Exception
- {
- super(journalDir, fileName, maxIO);
- }
-
- @Override
- protected AsynchronousFile newFile()
- {
- return mockFile;
- }
-
- }
-
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -22,6 +22,7 @@
package org.jboss.messaging.tests.unit.core.journal.impl;
+import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -474,7 +475,7 @@
* @param expected
* @param actual
*/
- private void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
+ protected void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
{
System.out.println("***********************************************");
System.out.println("Expected list:");
@@ -482,10 +483,14 @@
{
System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
}
- System.out.println("Actual list:");
- for (RecordInfo info : actual)
+ if (actual != null)
{
- System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+ System.out.println("***********************************************");
+ System.out.println("Actual list:");
+ for (RecordInfo info : actual)
+ {
+ System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+ }
}
System.out.println("***********************************************");
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -3018,7 +3018,6 @@
assertEquals(0, journal.getDataFilesCount());
}
-
protected abstract int getAlignment();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -574,4 +574,29 @@
// nothing to be done on the fake Sequential file
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
+ */
+ public void releaseBuffer(ByteBuffer buffer)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFileFactory#getBufferCallback()
+ */
+ public BufferCallback getBufferCallback()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFileFactory#setBufferCallback(org.jboss.messaging.core.journal.BufferCallback)
+ */
+ public void setBufferCallback(BufferCallback bufferCallback)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-05-06 02:30:57 UTC (rev 6682)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-05-06 04:42:56 UTC (rev 6683)
@@ -132,7 +132,7 @@
Configuration config = new ConfigurationImpl();
config.setJournalDirectory(getJournalDir());
config.setBindingsDirectory(getBindingsDir());
- config.setJournalType(JournalType.NIO);
+ config.setJournalType(JournalType.ASYNCIO);
config.setLargeMessagesDirectory(getLargeMessagesDir());
return config;
}
@@ -278,7 +278,7 @@
configuration.setJournalMinFiles(2);
configuration.setJournalDirectory(getJournalDir(index, false));
configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(JournalType.NIO);
+ configuration.setJournalType(JournalType.ASYNCIO);
configuration.setPagingDirectory(getPageDir(index, false));
configuration.setLargeMessagesDirectory(getLargeMessagesDir(index, false));
@@ -305,7 +305,7 @@
configuration.setPagingDirectory(getPageDir());
configuration.setLargeMessagesDirectory(getLargeMessagesDir());
- configuration.setJournalType(JournalType.NIO);
+ configuration.setJournalType(JournalType.ASYNCIO);
configuration.getAcceptorConfigurations().clear();
More information about the jboss-cvs-commits
mailing list