Author: clebert.suconic(a)jboss.com
Date: 2009-11-23 21:53:23 -0500 (Mon, 23 Nov 2009)
New Revision: 8389
Added:
trunk/src/main/org/hornetq/core/journal/IOAsyncTask.java
trunk/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
trunk/src/main/org/hornetq/core/persistence/OperationContext.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java
Removed:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
trunk/src/main/org/hornetq/core/replication/ReplicationContext.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
Modified:
trunk/native/bin/libHornetQAIO64.so
trunk/native/src/AIOException.h
trunk/native/src/JNICallbackAdapter.cpp
trunk/native/src/JNICallbackAdapter.h
trunk/native/src/JNI_AsynchronousFileImpl.cpp
trunk/native/src/Version.h
trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/exception/HornetQException.java
trunk/src/main/org/hornetq/core/journal/IOCompletion.java
trunk/src/main/org/hornetq/core/journal/Journal.java
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
trunk/src/main/org/hornetq/core/management/ManagementService.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/hornetq/core/paging/PagingStore.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java
trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-225 - Asynchronous responses on the journal
Modified: trunk/native/bin/libHornetQAIO64.so
===================================================================
(Binary files differ)
Modified: trunk/native/src/AIOException.h
===================================================================
--- trunk/native/src/AIOException.h 2009-11-23 23:28:33 UTC (rev 8388)
+++ trunk/native/src/AIOException.h 2009-11-24 02:53:23 UTC (rev 8389)
@@ -29,7 +29,7 @@
#define NATIVE_ERROR_CANT_ALLOCATE_QUEUE 206
#define NATIVE_ERROR_PREALLOCATE_FILE 208
#define NATIVE_ERROR_ALLOCATE_MEMORY 209
-#define NATIVE_ERROR_IO 210
+#define NATIVE_ERROR_IO 006
#define NATIVE_ERROR_AIO_FULL 211
Modified: trunk/native/src/JNICallbackAdapter.cpp
===================================================================
--- trunk/native/src/JNICallbackAdapter.cpp 2009-11-23 23:28:33 UTC (rev 8388)
+++ trunk/native/src/JNICallbackAdapter.cpp 2009-11-24 02:53:23 UTC (rev 8389)
@@ -18,13 +18,20 @@
jobject nullObj = NULL;
-JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jobject _callback,
jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
+JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jlong _sequence,
jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) :
CallbackAdapter()
{
controller = _controller;
+
+ sequence = _sequence;
+
callback = _callback;
+
fileController = _fileController;
+
bufferReference = _bufferReference;
+
isRead = _isRead;
+
}
JNICallbackAdapter::~JNICallbackAdapter()
@@ -33,15 +40,19 @@
void JNICallbackAdapter::done(THREAD_CONTEXT threadContext)
{
- JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback,
isRead ? nullObj : bufferReference);
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback,
sequence, isRead ? nullObj : bufferReference);
+
release(threadContext);
}
void JNICallbackAdapter::onError(THREAD_CONTEXT threadContext, long errorCode,
std::string error)
{
controller->log(threadContext, 0, "Libaio event generated errors, callback
object was informed about it");
+
jstring strError = JNI_ENV(threadContext)->NewStringUTF(error.data());
- JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error,
callback, isRead ? nullObj : bufferReference, (jint)errorCode, strError);
+
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error,
callback, sequence, isRead ? nullObj : bufferReference, (jint)errorCode, strError);
+
release(threadContext);
}
Modified: trunk/native/src/JNICallbackAdapter.h
===================================================================
--- trunk/native/src/JNICallbackAdapter.h 2009-11-23 23:28:33 UTC (rev 8388)
+++ trunk/native/src/JNICallbackAdapter.h 2009-11-24 02:53:23 UTC (rev 8389)
@@ -24,10 +24,17 @@
class JNICallbackAdapter : public CallbackAdapter
{
private:
+
AIOController * controller;
+
jobject callback;
+
jobject fileController;
+
jobject bufferReference;
+
+ jlong sequence;
+
// Is this a read operation
short isRead;
@@ -43,7 +50,7 @@
public:
// _ob must be a global Reference (use createGloblReferente before calling the
constructor)
- JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject
_fileController, jobject _bufferReference, short _isRead);
+ JNICallbackAdapter(AIOController * _controller, jlong sequence, jobject _callback,
jobject _fileController, jobject _bufferReference, short _isRead);
virtual ~JNICallbackAdapter();
void done(THREAD_CONTEXT threadContext);
Modified: trunk/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- trunk/native/src/JNI_AsynchronousFileImpl.cpp 2009-11-23 23:28:33 UTC (rev 8388)
+++ trunk/native/src/JNI_AsynchronousFileImpl.cpp 2009-11-24 02:53:23 UTC (rev 8389)
@@ -48,10 +48,10 @@
std::string fileName = convertJavaString(env, jstrFileName);
controller = new AIOController(fileName, (int) maxIO);
- controller->done =
env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;)V");
+ controller->done =
env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;)V");
if (!controller->done) return 0;
- controller->error = env->GetMethodID(clazz, "callbackError",
"(Lorg/hornetq/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;ILjava/lang/String;)V");
+ controller->error = env->GetMethodID(clazz, "callbackError",
"(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;ILjava/lang/String;)V");
if (!controller->error) return 0;
jclass loggerClass = env->GetObjectClass(logger);
@@ -97,7 +97,7 @@
return;
}
- CallbackAdapter * adapter = new JNICallbackAdapter(controller,
env->NewGlobalRef(callback), env->NewGlobalRef(objThis),
env->NewGlobalRef(jbuffer), true);
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, -1,
env->NewGlobalRef(callback), env->NewGlobalRef(objThis),
env->NewGlobalRef(jbuffer), true);
controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
}
@@ -166,7 +166,7 @@
}
JNIEXPORT void JNICALL Java_org_hornetq_core_asyncio_impl_AsynchronousFileImpl_write
- (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size,
jobject jbuffer, jobject callback)
+ (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong sequence, jlong position,
jlong size, jobject jbuffer, jobject callback)
{
try
{
@@ -180,7 +180,7 @@
}
- CallbackAdapter * adapter = new JNICallbackAdapter(controller,
env->NewGlobalRef(callback), env->NewGlobalRef(objThis),
env->NewGlobalRef(jbuffer), false);
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, sequence,
env->NewGlobalRef(callback), env->NewGlobalRef(objThis),
env->NewGlobalRef(jbuffer), false);
controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
}
Modified: trunk/native/src/Version.h
===================================================================
--- trunk/native/src/Version.h 2009-11-23 23:28:33 UTC (rev 8388)
+++ trunk/native/src/Version.h 2009-11-24 02:53:23 UTC (rev 8389)
@@ -1,5 +1,5 @@
#ifndef _VERSION_NATIVE_AIO
-#define _VERSION_NATIVE_AIO 25
+#define _VERSION_NATIVE_AIO 27
#endif
Modified: trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -14,11 +14,14 @@
package org.hornetq.core.asyncio.impl;
import java.nio.ByteBuffer;
+import java.util.PriorityQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.asyncio.AIOCallback;
@@ -46,8 +49,21 @@
private static boolean loaded = false;
- private static int EXPECTED_NATIVE_VERSION = 25;
+ private static int EXPECTED_NATIVE_VERSION = 27;
+ /** Used to determine the next writing sequence */
+ private AtomicLong nextWritingSequence = new AtomicLong(0);
+
+ /** Used to determine the next writing sequence.
+ * This is accessed from a single thread (the Poller Thread) */
+ private long nextReadSequence = 0;
+
+ /**
+ * AIO can't guarantee ordering over callbacks.
+ * We use thie PriorityQueue to hold values until they are in order
+ */
+ private PriorityQueue<CallbackHolder> pendingCallbacks = new
PriorityQueue<CallbackHolder>();
+
public static void addMax(final int io)
{
totalMaxIO.addAndGet(io);
@@ -124,6 +140,10 @@
private String fileName;
+ /** Used while inside the callbackDone and callbackError
+ **/
+ private final Lock callbackLock = new ReentrantLock();
+
private final VariableLatch pollerLatch = new VariableLatch();
private volatile Runnable poller;
@@ -131,10 +151,10 @@
private int maxIO;
private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
-
+
private final VariableLatch pendingWrites = new VariableLatch();
- private Semaphore writeSemaphore;
+ private Semaphore maxIOSemaphore;
private BufferCallback bufferCallback;
@@ -175,7 +195,7 @@
}
this.maxIO = maxIO;
- writeSemaphore = new Semaphore(this.maxIO);
+ maxIOSemaphore = new Semaphore(this.maxIO);
this.fileName = fileName;
@@ -189,10 +209,10 @@
if (e.getCode() == HornetQException.NATIVE_ERROR_CANT_INITIALIZE_AIO)
{
ex = new HornetQException(e.getCode(),
- "Can't initialize AIO. Currently AIO
in use = " + totalMaxIO.get() +
- ", trying to allocate more
" +
- maxIO,
- e);
+ "Can't initialize AIO. Currently AIO in
use = " + totalMaxIO.get() +
+ ", trying to allocate more "
+
+ maxIO,
+ e);
}
else
{
@@ -202,6 +222,8 @@
}
opened = true;
addMax(this.maxIO);
+ nextWritingSequence.set(0);
+ nextReadSequence = 0;
}
finally
{
@@ -222,13 +244,13 @@
{
log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.fileName);
}
-
- while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+
+ while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
{
log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.fileName);
}
- writeSemaphore = null;
+ maxIOSemaphore = null;
if (poller != null)
{
stopPoller();
@@ -263,7 +285,7 @@
{
startPoller();
}
-
+
pendingWrites.up();
if (writeExecutor != null)
@@ -272,38 +294,46 @@
{
public void run()
{
- writeSemaphore.acquireUninterruptibly();
+ maxIOSemaphore.acquireUninterruptibly();
+ long sequence = nextWritingSequence.getAndIncrement();
+
try
{
- write(handler, position, size, directByteBuffer, aioCallback);
+ write(handler, sequence, position, size, directByteBuffer,
aioCallback);
}
catch (HornetQException e)
{
- callbackError(aioCallback, directByteBuffer, e.getCode(),
e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer, e.getCode(),
e.getMessage());
}
catch (RuntimeException e)
{
- callbackError(aioCallback, directByteBuffer,
HornetQException.INTERNAL_ERROR, e.getMessage());
+ callbackError(aioCallback,
+ sequence,
+ directByteBuffer,
+ HornetQException.INTERNAL_ERROR,
+ e.getMessage());
}
}
});
}
else
{
- writeSemaphore.acquireUninterruptibly();
+ maxIOSemaphore.acquireUninterruptibly();
+ long sequence = nextWritingSequence.getAndIncrement();
+
try
{
- write(handler, position, size, directByteBuffer, aioCallback);
+ write(handler, sequence, position, size, directByteBuffer, aioCallback);
}
catch (HornetQException e)
{
- callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer, e.getCode(),
e.getMessage());
}
catch (RuntimeException e)
{
- callbackError(aioCallback, directByteBuffer, HornetQException.INTERNAL_ERROR,
e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer,
HornetQException.INTERNAL_ERROR, e.getMessage());
}
}
@@ -320,7 +350,7 @@
startPoller();
}
pendingWrites.up();
- writeSemaphore.acquireUninterruptibly();
+ maxIOSemaphore.acquireUninterruptibly();
try
{
read(handler, position, size, directByteBuffer, aioPackage);
@@ -328,14 +358,14 @@
catch (HornetQException e)
{
// Release only if an exception happened
- writeSemaphore.release();
+ maxIOSemaphore.release();
pendingWrites.down();
throw e;
}
catch (RuntimeException e)
{
// Release only if an exception happened
- writeSemaphore.release();
+ maxIOSemaphore.release();
pendingWrites.down();
throw e;
}
@@ -397,9 +427,9 @@
resetBuffer(buffer, buffer.limit());
buffer.position(0);
}
-
+
// Protected
-------------------------------------------------------------------------
-
+
protected void finalize()
{
if (opened)
@@ -410,32 +440,108 @@
// Private
---------------------------------------------------------------------------
- /** The JNI layer will call this method, so we could use it to unlock readWriteLocks
held in the java layer */
+ /** */
@SuppressWarnings("unused")
- // Called by the JNI layer.. just ignore the
- // warning
- private void callbackDone(final AIOCallback callback, final ByteBuffer buffer)
+ private void callbackDone(final AIOCallback callback, final long sequence, final
ByteBuffer buffer)
{
- writeSemaphore.release();
+ maxIOSemaphore.release();
+
pendingWrites.down();
- callback.done();
-
- // The buffer is not sent on callback for read operations
- if (bufferCallback != null && buffer != null)
+
+ callbackLock.lock();
+
+ try
{
- bufferCallback.bufferDone(buffer);
+
+ if (sequence == -1)
+ {
+ callback.done();
+ }
+ else
+ {
+ if (sequence == nextReadSequence)
+ {
+ nextReadSequence++;
+ callback.done();
+ flushCallbacks();
+ }
+ else
+ {
+ pendingCallbacks.add(new CallbackHolder(sequence, callback));
+ }
+ }
+
+ // The buffer is not sent on callback for read operations
+ if (bufferCallback != null && buffer != null)
+ {
+ bufferCallback.bufferDone(buffer);
+ }
}
+ finally
+ {
+ callbackLock.unlock();
+ }
}
+ private void flushCallbacks()
+ {
+ while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence ==
nextReadSequence)
+ {
+ CallbackHolder holder = pendingCallbacks.poll();
+ if (holder.isError())
+ {
+ ErrorCallback error = (ErrorCallback) holder;
+ holder.callback.onError(error.errorCode, error.message);
+ }
+ else
+ {
+ holder.callback.done();
+ }
+ nextReadSequence++;
+ }
+ }
+
// Called by the JNI layer.. just ignore the
// warning
- private void callbackError(final AIOCallback callback, final ByteBuffer buffer, final
int errorCode, final String errorMessage)
+ private void callbackError(final AIOCallback callback,
+ final long sequence,
+ final ByteBuffer buffer,
+ final int errorCode,
+ final String errorMessage)
{
log.warn("CallbackError: " + errorMessage);
- writeSemaphore.release();
+
+ maxIOSemaphore.release();
+
pendingWrites.down();
- callback.onError(errorCode, errorMessage);
+ callbackLock.lock();
+
+ try
+ {
+ if (sequence == -1)
+ {
+ callback.onError(errorCode, errorMessage);
+ }
+ else
+ {
+ if (sequence == nextReadSequence)
+ {
+ nextReadSequence++;
+ callback.onError(errorCode, errorMessage);
+ flushCallbacks();
+ }
+ else
+ {
+ pendingCallbacks.add(new ErrorCallback(sequence, callback, errorCode,
errorMessage));
+ }
+ }
+ }
+ finally
+ {
+ callbackLock.unlock();
+ }
+
// The buffer is not sent on callback for read operations
if (bufferCallback != null && buffer != null)
{
@@ -504,10 +610,10 @@
private static native void resetBuffer(ByteBuffer directByteBuffer, int size);
public static native void destroyBuffer(ByteBuffer buffer);
-
+
/** Instead of passing the nanoSeconds through the stack call every time, we set it
statically inside the native method */
public static native void setNanoSleepInterval(int nanoseconds);
-
+
public static native void nanoSleep();
private static native ByteBuffer newNativeBuffer(long size);
@@ -516,7 +622,12 @@
private native long size0(long handle) throws HornetQException;
- private native void write(long handle, long position, long size, ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
+ private native void write(long handle,
+ long sequence,
+ long position,
+ long size,
+ ByteBuffer buffer,
+ AIOCallback aioPackage) throws HornetQException;
private native void read(long handle, long position, long size, ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
@@ -529,11 +640,63 @@
/** A native method that does nothing, and just validate if the ELF dependencies are
loaded and on the correct platform as this binary format */
private static native int getNativeVersion();
- /** Poll asynchrounous events from internal queues */
+ /** Poll asynchronous events from internal queues */
private static native void internalPollEvents(long handler);
// Inner classes
---------------------------------------------------------------------
+ private static class CallbackHolder implements Comparable<CallbackHolder>
+ {
+ final long sequence;
+
+ final AIOCallback callback;
+
+ public boolean isError()
+ {
+ return false;
+ }
+
+ public CallbackHolder(final long sequence, final AIOCallback callback)
+ {
+ this.sequence = sequence;
+ this.callback = callback;
+ }
+
+ public int compareTo(CallbackHolder o)
+ {
+ // It shouldn't be equals in any case
+ if (sequence <= o.sequence)
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+ }
+
+ private static class ErrorCallback extends CallbackHolder
+ {
+ final int errorCode;
+
+ final String message;
+
+ public boolean isError()
+ {
+ return true;
+ }
+
+ public ErrorCallback(final long sequence, final AIOCallback callback, int
errorCode, String message)
+ {
+ super(sequence, callback);
+
+ this.errorCode = errorCode;
+
+ this.message = message;
+ }
+ }
+
private class PollerRunnable implements Runnable
{
PollerRunnable()
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -1024,6 +1024,7 @@
}
catch (HornetQException e)
{
+ log.warn(e.getMessage(), e);
// This should never occur
throw new XAException(XAException.XAER_RMERR);
}
Modified: trunk/src/main/org/hornetq/core/exception/HornetQException.java
===================================================================
--- trunk/src/main/org/hornetq/core/exception/HornetQException.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/exception/HornetQException.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -38,6 +38,9 @@
public static final int UNBLOCKED = 005;
+ public static final int IO_ERROR = 006;
+
+
public static final int QUEUE_DOES_NOT_EXIST = 100;
public static final int QUEUE_EXISTS = 101;
@@ -85,8 +88,6 @@
public static final int NATIVE_ERROR_ALLOCATE_MEMORY = 209;
- public static final int NATIVE_ERROR_IO = 210;
-
public static final int NATIVE_ERROR_AIO_FULL = 211;
Added: trunk/src/main/org/hornetq/core/journal/IOAsyncTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/IOAsyncTask.java (rev
0)
+++ trunk/src/main/org/hornetq/core/journal/IOAsyncTask.java 2009-11-24 02:53:23 UTC (rev
8389)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+import org.hornetq.core.asyncio.AIOCallback;
+
+/**
+ *
+ * This class is just a direct extension of AIOCallback.
+ * Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the
journal.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
+ *
+ */
+public interface IOAsyncTask extends AIOCallback
+{
+}
Modified: trunk/src/main/org/hornetq/core/journal/IOCompletion.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-23 23:28:33 UTC (rev
8388)
+++ trunk/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-24 02:53:23 UTC (rev
8389)
@@ -13,17 +13,14 @@
package org.hornetq.core.journal;
-import org.hornetq.core.asyncio.AIOCallback;
-
/**
- *
- * This class is just a direct extension of AIOCallback.
- * Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the
journal.
- *
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
+ * A IOCompletion
*
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
*/
-public interface IOCompletion extends AIOCallback
+public interface IOCompletion extends IOAsyncTask
{
- void waitCompletion() throws Exception;
+ void lineUp();
}
Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java 2009-11-23 23:28:33 UTC (rev
8388)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java 2009-11-24 02:53:23 UTC (rev
8389)
@@ -19,8 +19,10 @@
/**
*
- * A Journal
+ * Most methods on the journal provide a blocking version where you select the sync mode
and a non blocking mode where you pass a completion callback as a parameter.
*
+ * Notice also that even on the callback methods it's possible to pass the sync mode.
That will only make sense on the NIO operations.
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
@@ -31,14 +33,24 @@
void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws
Exception;
+ void appendAddRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception;
+
void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync)
throws Exception;
+ void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync,
IOCompletion completionCallback) throws Exception;
+
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws
Exception;
+ void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception;
+
void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean
sync) throws Exception;
+ void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean
sync, IOCompletion completionCallback) throws Exception;
+
void appendDeleteRecord(long id, boolean sync) throws Exception;
+ void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws
Exception;
+
// Transactional operations
void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record)
throws Exception;
@@ -57,6 +69,8 @@
void appendCommitRecord(long txID, boolean sync) throws Exception;
+ void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws
Exception;
+
/**
*
* <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
@@ -70,10 +84,16 @@
*/
void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync)
throws Exception;
+ void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync,
IOCompletion callback) throws Exception;
+
void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws
Exception;
+ void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion
callback) throws Exception;
+
void appendRollbackRecord(long txID, boolean sync) throws Exception;
+ void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws
Exception;
+
// Load
JournalLoadInformation load(LoaderCallback reloadManager) throws Exception;
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-23 23:28:33 UTC
(rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-24 02:53:23 UTC
(rev 8389)
@@ -38,7 +38,7 @@
boolean exists();
/**
- * For certain operations (like loading) we don't need open the file with full
maxIO
+ * The maximum number of simultaneous writes accepted
* @param maxIO
* @throws Exception
*/
@@ -56,17 +56,17 @@
void delete() throws Exception;
- void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws
Exception;
+ void write(HornetQBuffer bytes, boolean sync, IOAsyncTask callback) throws Exception;
void write(HornetQBuffer bytes, boolean sync) throws Exception;
/** Write directly to the file without using any buffer */
- void writeDirect(ByteBuffer bytes, boolean sync, IOCompletion callback);
+ void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
/** Write directly to the file without using any buffer */
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
- int read(ByteBuffer bytes, IOCompletion callback) throws Exception;
+ int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
int read(ByteBuffer bytes) throws Exception;
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -15,21 +15,15 @@
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.asyncio.AIOCallback;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.journal.IOCompletion;
-import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
/**
*
@@ -50,14 +44,9 @@
private final BufferCallback bufferCallback;
- /** A context switch on AIO would make it to synchronize the disk before
- switching to the new thread, what would cause
- serious performance problems. Because of that we make all the writes on
- AIO using a single thread. */
- private final Executor executor;
-
/** The pool for Thread pollers */
private final Executor pollerExecutor;
+
public AIOSequentialFile(final SequentialFileFactory factory,
final int bufferSize,
@@ -66,13 +55,12 @@
final String fileName,
final int maxIO,
final BufferCallback bufferCallback,
- final Executor executor,
+ final Executor writerExecutor,
final Executor pollerExecutor)
{
- super(directory, new File(directory + "/" + fileName), factory);
+ super(directory, new File(directory + "/" + fileName), factory,
writerExecutor);
this.maxIO = maxIO;
this.bufferCallback = bufferCallback;
- this.executor = executor;
this.pollerExecutor = pollerExecutor;
}
@@ -99,7 +87,7 @@
public SequentialFile copy()
{
- return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(),
maxIO, bufferCallback, executor, pollerExecutor);
+ return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(),
maxIO, bufferCallback, writerExecutor, pollerExecutor);
}
public synchronized void close() throws Exception
@@ -108,27 +96,13 @@
{
return;
}
+
+ super.close();
+
opened = false;
timedBuffer = null;
- final CountDownLatch donelatch = new CountDownLatch(1);
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- donelatch.countDown();
- }
- });
-
- while (!donelatch.await(60, TimeUnit.SECONDS))
- {
- log.warn("Executor on file " + getFile().getName() + "
couldn't complete its tasks in 60 seconds.",
- new Exception("Warning: Executor on file " +
getFile().getName() +
- " couldn't complete its tasks in 60
seconds."));
- }
-
aioFile.close();
aioFile = null;
@@ -199,13 +173,18 @@
open(maxIO);
}
- public synchronized void open(final int currentMaxIO) throws Exception
+ public synchronized void open(final int maxIO) throws Exception
{
opened = true;
- aioFile = newFile();
- aioFile.open(getFile().getAbsolutePath(), currentMaxIO);
+
+ aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor);
+
+ aioFile.open(getFile().getAbsolutePath(), maxIO);
+
position.set(0);
+
aioFile.setBufferCallback(bufferCallback);
+
this.fileSize = aioFile.size();
}
@@ -214,7 +193,7 @@
aioFile.setBufferCallback(callback);
}
- public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
{
int bytesToRead = bytes.limit();
@@ -229,7 +208,7 @@
public int read(final ByteBuffer bytes) throws Exception
{
- IOCompletion waitCompletion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback();
int bytesRead = read(bytes, waitCompletion);
@@ -268,20 +247,12 @@
// Protected methods
//
-----------------------------------------------------------------------------------------------------
- /**
- * An extension point for tests
- */
- protected AsynchronousFile newFile()
- {
- return new AsynchronousFileImpl(executor, pollerExecutor);
- }
-
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
{
if (sync)
{
- IOCompletion completion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
writeDirect(bytes, true, completion);
@@ -298,7 +269,7 @@
*
* @param sync Not used on AIO
* */
- public void writeDirect(final ByteBuffer bytes, final boolean sync, IOCompletion
callback)
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, IOAsyncTask
callback)
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -36,15 +36,14 @@
public class AIOSequentialFileFactory extends AbstractSequentialFactory
{
- // Timeout used to wait executors to shutdown
- private static final int EXECUTOR_TIMEOUT = 60;
-
private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
private static final boolean trace = log.isTraceEnabled();
private final ReuseBuffersController buffersControl = new ReuseBuffersController();
+ private ExecutorService pollerExecutor;
+
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
@@ -53,13 +52,6 @@
log.trace(message);
}
- /** A single AIO write executor for every AIO File.
- * This is used only for AIO & instant operations. We only need one
executor-thread for the entire journal as we always have only one active file.
- * And even if we had multiple files at a given moment, this should still be ok, as
we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
- private ExecutorService writeExecutor;
-
- private ExecutorService pollerExecutor;
-
public AIOSequentialFileFactory(final String journalDir)
{
this(journalDir,
@@ -152,9 +144,6 @@
{
super.start();
- writeExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
- true));
-
pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
true));
@@ -163,23 +152,8 @@
@Override
public void stop()
{
- super.stop();
-
buffersControl.stop();
- writeExecutor.shutdown();
-
- try
- {
- if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
- {
- log.warn("Timed out on AIO writer shutdown", new
Exception("Timed out on AIO writer shutdown"));
- }
- }
- catch (InterruptedException e)
- {
- }
-
pollerExecutor.shutdown();
try
@@ -192,6 +166,8 @@
catch (InterruptedException e)
{
}
+
+ super.stop();
}
@Override
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -19,10 +19,14 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
/**
*
@@ -34,16 +38,28 @@
*/
public abstract class AbstractSequentialFactory implements SequentialFileFactory
{
+
+ // Timeout used to wait executors to shutdown
+ protected static final int EXECUTOR_TIMEOUT = 60;
+
private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
protected final String journalDir;
protected final TimedBuffer timedBuffer;
-
+
protected final int bufferSize;
protected final long bufferTimeout;
+
+ /**
+ * Asynchronous writes need to be done at another executor.
+ * This needs to be done at NIO, or else we would have the callers thread blocking for
the return.
+ * At AIO this is necessary as context switches on writes would fire flushes at the
kernel.
+ * */
+ protected ExecutorService writeExecutor;
+
public AbstractSequentialFactory(final String journalDir,
final boolean buffered,
@@ -71,6 +87,24 @@
{
timedBuffer.stop();
}
+
+ if (isSupportsCallbacks())
+ {
+ writeExecutor.shutdown();
+
+ try
+ {
+ if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ {
+ log.warn("Timed out on AIO writer shutdown", new
Exception("Timed out on AIO writer shutdown"));
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+
}
public void start()
@@ -79,6 +113,14 @@
{
timedBuffer.start();
}
+
+ if (isSupportsCallbacks())
+ {
+ writeExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" +
System.identityHashCode(this),
+
true));
+ }
+
+
}
/* (non-Javadoc)
@@ -99,7 +141,7 @@
}
}
}
-
+
public void flush()
{
if (timedBuffer != null)
@@ -117,7 +159,6 @@
}
}
-
public void releaseBuffer(ByteBuffer buffer)
{
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -16,9 +16,12 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -55,6 +58,9 @@
* This is the class returned to the factory when the file is being activated. */
protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
+ /** Used for asynchronous writes */
+ protected final Executor writerExecutor;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -63,12 +69,16 @@
* @param file
* @param directory
*/
- public AbstractSequentialFile(final String directory, final File file, final
SequentialFileFactory factory)
+ public AbstractSequentialFile(final String directory,
+ final File file,
+ final SequentialFileFactory factory,
+ final Executor writerExecutor)
{
super();
this.file = file;
this.directory = directory;
this.factory = factory;
+ this.writerExecutor = writerExecutor;
}
// Public --------------------------------------------------------
@@ -115,6 +125,29 @@
}
}
+ public synchronized void close() throws Exception
+ {
+ final CountDownLatch donelatch = new CountDownLatch(1);
+
+ if (writerExecutor != null)
+ {
+ writerExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ donelatch.countDown();
+ }
+ });
+
+ while (!donelatch.await(60, TimeUnit.SECONDS))
+ {
+ log.warn("Executor on file " + getFile().getName() + "
couldn't complete its tasks in 60 seconds.",
+ new Exception("Warning: Executor on file " +
getFile().getName() +
+ " couldn't complete its tasks in 60
seconds."));
+ }
+ }
+ }
+
public final boolean fits(final int size)
{
if (timedBuffer == null)
@@ -159,7 +192,7 @@
}
- public void write(final HornetQBuffer bytes, final boolean sync, final IOCompletion
callback) throws Exception
+ public void write(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask
callback) throws Exception
{
if (timedBuffer != null)
{
@@ -178,7 +211,7 @@
{
if (sync)
{
- IOCompletion completion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
write(bytes, true, completion);
@@ -203,18 +236,18 @@
// Inner classes -------------------------------------------------
- protected static class DelegateCallback implements IOCompletion
+ protected static class DelegateCallback implements IOAsyncTask
{
- final List<IOCompletion> delegates;
+ final List<IOAsyncTask> delegates;
- DelegateCallback(final List<IOCompletion> delegates)
+ DelegateCallback(final List<IOAsyncTask> delegates)
{
this.delegates = delegates;
}
public void done()
{
- for (IOCompletion callback : delegates)
+ for (IOAsyncTask callback : delegates)
{
try
{
@@ -229,7 +262,7 @@
public void onError(final int errorCode, final String errorMessage)
{
- for (IOCompletion callback : delegates)
+ for (IOAsyncTask callback : delegates)
{
try
{
@@ -249,7 +282,7 @@
protected class LocalBufferObserver implements TimedBufferObserver
{
- public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final
List<IOCompletion> callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final
List<IOAsyncTask> callbacks)
{
buffer.flip();
Modified: trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -14,7 +14,6 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
/**
@@ -24,13 +23,13 @@
*
*
*/
-public class DummyCallback implements IOCompletion
+class DummyCallback extends SyncIOCompletion
{
private static DummyCallback instance = new DummyCallback();
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
- public static IOCompletion getInstance()
+ public static DummyCallback getInstance()
{
return instance;
}
@@ -47,5 +46,12 @@
public void waitCompletion() throws Exception
{
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.IOCompletion#linedUp()
+ */
+ public void lineUp()
+ {
+ }
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-23 23:28:33 UTC
(rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-24 02:53:23 UTC
(rev 8389)
@@ -44,6 +44,7 @@
import org.hornetq.core.buffers.ChannelBuffer;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
@@ -84,7 +85,7 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = false;
+ private static final boolean trace = log.isTraceEnabled();
/** This is to be set to true at DEBUG & development only */
private static final boolean LOAD_TRACE = false;
@@ -95,6 +96,7 @@
private static final void trace(final String message)
{
log.trace(message);
+ //System.out.println("JournalImpl::" + message);
}
// The sizes of primitive types
@@ -845,15 +847,35 @@
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendAddRecord(final long id, final byte recordType, final byte[] record,
final boolean sync, final IOCompletion callback) throws Exception
+ {
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+ }
+
public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync) throws Exception
{
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendAddRecord(id, recordType, record, sync, callback);
+
+ // We only wait on explicit callbacks
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+ {
+ if (LOAD_TRACE)
+ {
+ trace("appendAddRecord id = " + id + ", recordType = " +
recordType);
+ }
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
-
- IOCompletion callback = null;
-
+
compactingLock.readLock().lock();
try
@@ -864,7 +886,10 @@
writeAddRecord(-1, id, recordType, record, size, bb); // fileID will be filled
later
- callback = getSyncCallback(sync);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
lockAppend.lock();
try
@@ -882,11 +907,6 @@
{
compactingLock.readLock().unlock();
}
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
}
public void appendUpdateRecord(final long id, final byte recordType, final byte[]
record, final boolean sync) throws Exception
@@ -894,15 +914,35 @@
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[]
record, final boolean sync, final IOCompletion callback) throws Exception
+ {
+ appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+ }
+
public void appendUpdateRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync) throws Exception
{
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendUpdateRecord(id, recordType, record, sync, callback);
+
+ // We only wait on explicit callbacks
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ public void appendUpdateRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+ {
+ if (LOAD_TRACE)
+ {
+ trace("appendUpdateRecord id = " + id + ", recordType = " +
recordType);
+ }
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
-
- IOCompletion callback = null;
-
+
compactingLock.readLock().lock();
try
@@ -924,7 +964,10 @@
writeUpdateRecord(-1, id, recordType, record, size, bb);
- callback = getSyncCallback(sync);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
lockAppend.lock();
try
@@ -951,24 +994,35 @@
{
compactingLock.readLock().unlock();
}
+ }
+
+ public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+ {
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendDeleteRecord(id, sync, callback);
+
+ // We only wait on explicit callbacks
if (callback != null)
{
callback.waitCompletion();
}
}
-
- public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+
+ public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion
callback) throws Exception
{
+ if (LOAD_TRACE)
+ {
+ trace("appendDeleteRecord id = " + id);
+ }
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
-
+
compactingLock.readLock().lock();
- IOCompletion callback = null;
-
try
{
@@ -981,14 +1035,17 @@
throw new IllegalStateException("Cannot find add info " + id);
}
}
-
+
int size = SIZE_DELETE_RECORD;
ChannelBuffer bb = newBuffer(size);
writeDeleteRecord(-1, id, size, bb);
- callback = getSyncCallback(sync);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
lockAppend.lock();
try
@@ -1016,11 +1073,6 @@
{
compactingLock.readLock().unlock();
}
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
}
public void appendAddRecordTransactional(final long txID, final long id, final byte
recordType, final byte[] record) throws Exception
@@ -1034,6 +1086,10 @@
final byte recordType,
final EncodingSupport record) throws
Exception
{
+ if (LOAD_TRACE)
+ {
+ trace("appendAddRecordTransactional txID " + txID + ", id =
" + id + ", recordType = " + recordType);
+ }
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -1083,6 +1139,10 @@
final byte recordType,
final EncodingSupport record) throws
Exception
{
+ if (LOAD_TRACE)
+ {
+ trace("appendUpdateRecordTransactional txID " + txID + ", id =
" + id + ", recordType = " + recordType);
+ }
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -1126,6 +1186,11 @@
public void appendDeleteRecordTransactional(final long txID, final long id, final
EncodingSupport record) throws Exception
{
+ if (LOAD_TRACE)
+ {
+ trace("appendDeleteRecordTransactional txID " + txID + ", id =
" + id);
+ }
+
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -1165,6 +1230,12 @@
{
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
+
+
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion completion) throws Exception
+ {
+ appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync,
completion);
+ }
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
@@ -1174,6 +1245,18 @@
appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
}
+ public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendPrepareRecord(txID, transactionData, sync, syncCompletion);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+ }
+
/**
*
* <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
@@ -1187,8 +1270,13 @@
* @param transactionData - extra user data for the prepare
* @throws Exception
*/
- public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync) throws Exception
+ public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync, IOCompletion callback) throws Exception
{
+ if (LOAD_TRACE)
+ {
+ trace("appendPrepareRecord txID " + txID);
+ }
+
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -1198,11 +1286,6 @@
JournalTransaction tx = getTransactionInfo(txID);
- if (sync)
- {
- tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
- }
-
try
{
@@ -1211,10 +1294,15 @@
writeTransaction(-1, PREPARE_RECORD, txID, transactionData, size, -1, bb);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
tx.prepare(usedFile);
}
@@ -1228,11 +1316,23 @@
{
compactingLock.readLock().unlock();
}
-
- // We should wait this outside of the lock, to increase throughput
- tx.waitCompletion();
}
+
+
+
+ public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendCommitRecord(txID, sync, syncCompletion);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+ }
+
/**
* <p>A transaction record (Commit or Prepare), will hold the number of elements
the transaction has on each file.</p>
* <p>For example, a transaction was spread along 3 journal files with 10
pendingTransactions on each file.
@@ -1250,7 +1350,9 @@
*
* @see JournalImpl#writeTransaction(byte, long,
org.hornetq.core.journal.impl.JournalImpl.JournalTransaction, EncodingSupport)
*/
- public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+
+
+ public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion
callback) throws Exception
{
if (state != STATE_LOADED)
{
@@ -1264,6 +1366,7 @@
try
{
+
if (tx == null)
{
throw new IllegalStateException("Cannot find tx with id " + txID);
@@ -1279,10 +1382,15 @@
-1 /* number of records on this transaction will be filled
later inside append record */,
bb);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
tx.commit(usedFile);
}
@@ -1296,15 +1404,23 @@
{
compactingLock.readLock().unlock();
}
+ }
- if (sync)
+
+ public void appendRollbackRecord(final long txID, final boolean sync) throws
Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendRollbackRecord(txID, sync, syncCompletion);
+
+ if (syncCompletion != null)
{
- // We should wait this outside of the lock, to increase throuput
- tx.waitCompletion();
+ syncCompletion.waitCompletion();
}
+
}
-
- public void appendRollbackRecord(final long txID, final boolean sync) throws
Exception
+
+ public void appendRollbackRecord(final long txID, final boolean sync, final
IOCompletion callback) throws Exception
{
if (state != STATE_LOADED)
{
@@ -1323,15 +1439,20 @@
{
throw new IllegalStateException("Cannot find tx with id " + txID);
}
-
+
ChannelBuffer bb = newBuffer(SIZE_ROLLBACK_RECORD);
writeRollback(-1, txID, bb);
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, false, sync, tx, callback);
tx.rollback(usedFile);
}
@@ -1345,14 +1466,6 @@
{
compactingLock.readLock().unlock();
}
-
- // We should wait this outside of the lock, to increase throuput
-
- if (sync)
- {
- tx.waitCompletion();
- }
-
}
public int getAlignment() throws Exception
@@ -2833,7 +2946,7 @@
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
- IOCompletion callback) throws Exception
+ final IOAsyncTask parameterCallback) throws
Exception
{
try
{
@@ -2841,6 +2954,8 @@
{
throw new IllegalStateException("The journal is not loaded " +
state);
}
+
+ final IOAsyncTask callback;
int size = bb.capacity();
@@ -2874,25 +2989,29 @@
if (tx != null)
{
- if (callback != null)
- {
- // sanity check, it should not happen.
- throw new IllegalArgumentException("Invalid callback parameter. Use
of tx is mutually exclusive with the callback");
- }
-
// The callback of a transaction has to be taken inside the lock,
// when we guarantee the currentFile will not be changed,
// since we individualize the callback per file
if (fileFactory.isSupportsCallbacks())
{
- callback = tx.getCallback(currentFile);
+ // Set the delegated callback as a parameter
+ TransactionCallback txcallback = tx.getCallback(currentFile);
+ if (parameterCallback != null)
+ {
+ txcallback.setDelegateCompletion(parameterCallback);
+ }
+ callback = txcallback;
}
+ else
+ {
+ callback = null;
+ }
if (sync)
{
- // 99 % of the times this will be already synced, as previous files should
be closed already.
- // This is to have 100% guarantee the transaction will be persisted and no
loss of information would
- // happen
+ // In an edge case the transaction could still have pending data from
previous files.
+ // This shouldn't cause any blocking issues, as this is here to
guarantee we cover all possibilities
+ // on guaranteeing the data is on the disk
tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
}
@@ -2903,6 +3022,10 @@
tx.fillNumberOfRecords(currentFile, bb);
}
}
+ else
+ {
+ callback = parameterCallback;
+ }
// Adding fileID
bb.writerIndex(DataConstants.SIZE_BYTE);
@@ -3233,13 +3356,13 @@
return tx;
}
- private IOCompletion getSyncCallback(final boolean sync)
+ private SyncIOCompletion getSyncCallback(final boolean sync)
{
if (fileFactory.isSupportsCallbacks())
{
if (sync)
{
- return SimpleWaitIOCallback.getInstance();
+ return new SimpleWaitIOCallback();
}
else
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -18,8 +18,12 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -40,14 +44,23 @@
private RandomAccessFile rfile;
- public NIOSequentialFile(final SequentialFileFactory factory, final String directory,
final String fileName)
+ /** The write semaphore here is only used when writing asynchronously */
+ private Semaphore maxIOSemaphore;
+
+ private final int defaultMaxIO;
+
+ private int maxIO;
+
+ public NIOSequentialFile(final SequentialFileFactory factory, final String directory,
final String fileName, final int maxIO, final Executor writerExecutor)
{
- super(directory, new File(directory + "/" + fileName), factory);
+ super(directory, new File(directory + "/" + fileName), factory,
writerExecutor);
+ this.defaultMaxIO = maxIO;
}
- public NIOSequentialFile(final SequentialFileFactory factory, final File file)
+ public NIOSequentialFile(final SequentialFileFactory factory, final File file, final
int maxIO, final Executor writerExecutor)
{
- super(file.getParent(), new File(file.getPath()), factory);
+ super(file.getParent(), new File(file.getPath()), factory, writerExecutor);
+ this.defaultMaxIO = maxIO;
}
public int getAlignment()
@@ -65,20 +78,28 @@
return channel != null;
}
+ /** this.maxIO represents the default maxIO.
+ * Some operations while initializing files on the journal may require a different
maxIO */
public synchronized void open() throws Exception
{
+ open(this.defaultMaxIO);
+ }
+
+ public void open(final int maxIO) throws Exception
+ {
rfile = new RandomAccessFile(getFile(), "rw");
channel = rfile.getChannel();
fileSize = channel.size();
+
+ if (writerExecutor != null)
+ {
+ this.maxIOSemaphore = new Semaphore(maxIO);
+ this.maxIO = maxIO;
+ }
}
- public void open(final int currentMaxIO) throws Exception
- {
- open();
- }
-
public void fill(final int position, final int size, final byte fillCharacter) throws
Exception
{
ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -111,6 +132,8 @@
public synchronized void close() throws Exception
{
+ super.close();
+
if (channel != null)
{
channel.close();
@@ -125,6 +148,16 @@
rfile = null;
+ if (maxIOSemaphore != null)
+ {
+ while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
+ {
+ log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.getFileName());
+ }
+ }
+
+ maxIOSemaphore = null;
+
notifyAll();
}
@@ -133,7 +166,7 @@
return read(bytes, null);
}
- public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
{
try
{
@@ -152,7 +185,7 @@
{
if (callback != null)
{
- callback.onError(-1, e.getLocalizedMessage());
+ callback.onError(HornetQException.IO_ERROR, e.getLocalizedMessage());
}
throw e;
@@ -194,10 +227,10 @@
public SequentialFile copy()
{
- return new NIOSequentialFile(factory, getFile());
+ return new NIOSequentialFile(factory, getFile(), maxIO, writerExecutor);
}
- public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCompletion
callback)
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask
callback)
{
if (callback == null)
{
@@ -219,6 +252,55 @@
internalWrite(bytes, sync, null);
}
+ private void internalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
+ {
+ if (!isOpen())
+ {
+ if (callback != null)
+ {
+ callback.onError(HornetQException.IO_ERROR, "File not opened");
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.IO_ERROR, "File not
opened");
+ }
+ return;
+ }
+
+ if (writerExecutor == null)
+ {
+ doInternalWrite(bytes, sync, callback);
+ }
+ else
+ {
+ // This is a flow control on writing, just like maxAIO on libaio
+ maxIOSemaphore.acquire();
+
+ writerExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ try
+ {
+ doInternalWrite(bytes, sync, callback);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Exception on submitting write", e);
+ callback.onError(HornetQException.IO_ERROR, e.getMessage());
+ }
+ }
+ finally
+ {
+ maxIOSemaphore.release();
+ }
+ }
+ });
+ }
+ }
+
/**
* @param bytes
* @param sync
@@ -226,8 +308,9 @@
* @throws IOException
* @throws Exception
*/
- private void internalWrite(final ByteBuffer bytes, final boolean sync, final
IOCompletion callback) throws Exception
+ private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
{
+
position.addAndGet(bytes.limit());
channel.write(bytes);
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -64,9 +64,15 @@
}
// maxIO is ignored on NIO
- public SequentialFile createSequentialFile(final String fileName, final int maxIO)
+ public SequentialFile createSequentialFile(final String fileName, int maxIO)
{
- return new NIOSequentialFile(this, journalDir, fileName);
+ if (maxIO < 0)
+ {
+ // A single threaded IO
+ maxIO = 1;
+ }
+
+ return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
}
public boolean isSupportsCallbacks()
Modified: trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -14,9 +14,9 @@
package org.hornetq.core.journal.impl;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
/**
@@ -26,7 +26,7 @@
*
*
*/
-public class SimpleWaitIOCallback implements IOCompletion
+public class SimpleWaitIOCallback extends SyncIOCompletion
{
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
@@ -37,12 +37,6 @@
private volatile int errorCode = 0;
- public static IOCompletion getInstance()
- {
- return new SimpleWaitIOCallback();
- }
-
-
public void done()
{
latch.countDown();
@@ -68,4 +62,16 @@
}
return;
}
+
+ public boolean waitCompletion(final long timeout) throws Exception
+ {
+ return latch.await(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.IOCompletion#linedUp()
+ */
+ public void lineUp()
+ {
+ }
}
Added: trunk/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.journal.IOCompletion;
+
+/**
+ * Internal class used to manage explicit syncs on the Journal through callbacks.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class SyncIOCompletion implements IOCompletion
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public abstract void waitCompletion() throws Exception;
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-23 23:28:33 UTC
(rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-24 02:53:23 UTC
(rev 8389)
@@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@@ -22,7 +23,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.VariableLatch;
@@ -56,7 +57,7 @@
private int bufferLimit = 0;
- private List<IOCompletion> callbacks;
+ private List<IOAsyncTask> callbacks;
private final Lock lock = new ReentrantReadWriteLock().writeLock();
@@ -106,7 +107,7 @@
buffer.clear();
bufferLimit = 0;
- callbacks = new ArrayList<IOCompletion>();
+ callbacks = new ArrayList<IOAsyncTask>();
this.flushOnSync = flushOnSync;
latchTimer.up();
this.timeout = timeout;
@@ -225,7 +226,7 @@
}
}
- public synchronized void addBytes(final byte[] bytes, final boolean sync, final
IOCompletion callback)
+ public synchronized void addBytes(final byte[] bytes, final boolean sync, final
IOAsyncTask callback)
{
if (buffer.writerIndex() == 0)
{
@@ -258,36 +259,55 @@
}
}
- public synchronized void flush()
+ public void flush()
{
- if (buffer.writerIndex() > 0)
+ ByteBuffer bufferToFlush = null;
+
+ boolean useSync = false;
+
+ List<IOAsyncTask> callbacksToCall = null;
+
+ synchronized (this)
{
- latchTimer.up();
+ if (buffer.writerIndex() > 0)
+ {
+ latchTimer.up();
+
+ int pos = buffer.writerIndex();
+
+ if (logRates)
+ {
+ bytesFlushed += pos;
+ }
+
+ bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+
+ // Putting a byteArray on a native buffer is much faster, since it will do in
a single native call.
+ // Using bufferToFlush.put(buffer) would make several append calls for each
byte
+
+ bufferToFlush.put(buffer.array(), 0, pos);
- int pos = buffer.writerIndex();
-
- if (logRates)
- {
- bytesFlushed += pos;
+ callbacksToCall = callbacks;
+
+ callbacks = new LinkedList<IOAsyncTask>();
+
+ useSync = pendingSync;
+
+ active = false;
+ pendingSync = false;
+
+ buffer.clear();
+ bufferLimit = 0;
}
+ }
+
+ // Execute the flush outside of the lock
+ // This is important for NIO performance while we are using NIO Callbacks
+ if (bufferToFlush != null)
+ {
+ bufferObserver.flushBuffer(bufferToFlush, useSync, callbacksToCall);
+ }
- ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
-
- // Putting a byteArray on a native buffer is much faster, since it will do in a
single native call.
- // Using directBuffer.put(buffer) would make several append calls for each byte
-
- directBuffer.put(buffer.array(), 0, pos);
-
- bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
-
- callbacks = new ArrayList<IOCompletion>();
-
- active = false;
- pendingSync = false;
-
- buffer.clear();
- bufferLimit = 0;
- }
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -17,7 +17,7 @@
import java.nio.ByteBuffer;
import java.util.List;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
/**
* A TimedBufferObserver
@@ -39,7 +39,7 @@
// Public --------------------------------------------------------
- public void flushBuffer(ByteBuffer buffer, boolean syncRequested,
List<IOCompletion> callbacks);
+ public void flushBuffer(ByteBuffer buffer, boolean syncRequested,
List<IOAsyncTask> callbacks);
/** Return the number of remaining bytes that still fit on the observer (file) */
Modified: trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -14,7 +14,7 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.utils.VariableLatch;
/**
@@ -24,22 +24,34 @@
*
*
*/
-public class TransactionCallback implements IOCompletion
+public class TransactionCallback implements IOAsyncTask
{
private final VariableLatch countLatch = new VariableLatch();
private volatile String errorMessage = null;
private volatile int errorCode = 0;
+
+ private volatile int up = 0;
+
+ private volatile int done = 0;
+
+ private volatile IOAsyncTask delegateCompletion;
public void countUp()
{
+ up++;
countLatch.up();
}
public void done()
{
countLatch.down();
+ if (++done == up && delegateCompletion != null)
+ {
+ delegateCompletion.done();
+ delegateCompletion = null;
+ }
}
public void waitCompletion() throws InterruptedException
@@ -59,9 +71,30 @@
this.errorCode = errorCode;
countLatch.down();
+
+ if (delegateCompletion != null)
+ {
+ delegateCompletion.onError(errorCode, errorMessage);
+ }
}
/**
+ * @return the delegateCompletion
+ */
+ public IOAsyncTask getDelegateCompletion()
+ {
+ return delegateCompletion;
+ }
+
+ /**
+ * @param delegateCompletion the delegateCompletion to set
+ */
+ public void setDelegateCompletion(IOAsyncTask delegateCompletion)
+ {
+ this.delegateCompletion = delegateCompletion;
+ }
+
+ /**
* @return the errorMessage
*/
public String getErrorMessage()
Modified: trunk/src/main/org/hornetq/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/ManagementService.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/management/ManagementService.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -71,6 +71,8 @@
ObjectNameBuilder getObjectNameBuilder();
// Resource Registration
+
+ void setStorageManager(StorageManager storageManager);
HornetQServerControlImpl registerServer(PostOffice postOffice,
StorageManager storageManager,
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -437,7 +437,8 @@
if (XidImpl.toBase64String(xid).equals(transactionAsBase64))
{
Transaction transaction = resourceManager.removeTransaction(xid);
- transaction.commit();
+ transaction.commit(false);
+ server.getStorageManager().waitOnOperations(-1);
long recordID = server.getStorageManager().storeHeuristicCompletion(xid,
true);
resourceManager.putHeuristicCompletion(recordID, xid, true);
return true;
@@ -456,6 +457,7 @@
{
Transaction transaction = resourceManager.removeTransaction(xid);
transaction.rollback();
+ server.getStorageManager().completeOperations();
long recordID = server.getStorageManager().storeHeuristicCompletion(xid,
false);
resourceManager.putHeuristicCompletion(recordID, xid, false);
return true;
Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -21,7 +21,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.NotificationBroadcasterSupport;
@@ -39,6 +41,7 @@
import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.AcceptorControl;
import org.hornetq.core.management.BridgeControl;
@@ -177,6 +180,11 @@
{
return messageCounterManager;
}
+
+ public void setStorageManager(StorageManager storageManager)
+ {
+ this.storageManager = storageManager;
+ }
public HornetQServerControlImpl registerServer(final PostOffice postOffice,
final StorageManager storageManager,
@@ -736,6 +744,12 @@
}
}
}
+
+ if (storageManager != null)
+ {
+ storageManager.waitOnOperations(managementRequestTimeout);
+ storageManager.clearContext();
+ }
}
public void enableNotifications(boolean enabled)
Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java 2009-11-23 23:28:33 UTC (rev
8388)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java 2009-11-24 02:53:23 UTC (rev
8389)
@@ -67,14 +67,14 @@
* @param message
* @throws Exception
*/
- void addSize(ServerMessage message, boolean add) throws Exception;
+ void addSize(ServerMessage message, boolean add);
/**
*
* @param reference
* @throws Exception
*/
- void addSize(MessageReference reference, boolean add) throws Exception;
+ void addSize(MessageReference reference, boolean add);
/**
*
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -279,7 +279,7 @@
checkReleaseProducerFlowControlCredits(-credits);
}
- public void addSize(final ServerMessage message, final boolean add) throws Exception
+ public void addSize(final ServerMessage message, final boolean add)
{
long size = message.getMemoryEstimate();
@@ -297,7 +297,7 @@
}
}
- public void addSize(final MessageReference reference, final boolean add) throws
Exception
+ public void addSize(final MessageReference reference, final boolean add)
{
long size = MessageReferenceImpl.getMemoryEstimate();
@@ -477,7 +477,7 @@
}
}
- public boolean startPaging() throws Exception
+ public boolean startPaging()
{
if (!running)
{
@@ -508,7 +508,17 @@
{
if (currentPage == null)
{
- openNewPage();
+ try
+ {
+ openNewPage();
+ }
+ catch (Exception e)
+ {
+ // If not possible to starting page due to an IO error, we will just
consider it non paging.
+ // This shouldn't happen anyway
+ log.warn("IO Error, impossible to start paging", e);
+ return false;
+ }
return true;
}
@@ -699,7 +709,7 @@
}
}
- private void addSize(final long size) throws Exception
+ private void addSize(final long size)
{
if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
{
@@ -996,10 +1006,9 @@
}
depageTransaction.commit();
+
+ storageManager.waitOnOperations();
- // StorageManager does the check: if (replicated) -> do the proper cleanup
already
- storageManager.completeReplication();
-
if (isTrace)
{
trace("Depage committed, running = " + running);
Added: trunk/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/OperationContext.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence;
+
+import java.util.concurrent.Executor;
+
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
+
+
+/**
+ * This represents a set of operations done as part of replication.
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface OperationContext extends IOCompletion
+{
+
+ /** The executor used on responses.
+ * If this is not set, it will use the current thread. */
+ void setExecutor(Executor executor);
+
+ /** Execute the task when all IO operations are complete,
+ * Or execute it immediately if nothing is pending. */
+ void executeOnCompletion(IOAsyncTask runnable);
+
+ void replicationLineUp();
+
+ void replicationDone();
+
+ /** To be called when there are no more operations pending */
+ void complete();
+
+}
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -15,9 +15,11 @@
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
import javax.transaction.xa.Xid;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -46,6 +48,16 @@
*/
public interface StorageManager extends HornetQComponent
{
+
+ /** Get the context associated with the thread for later reuse */
+ OperationContext getContext();
+
+ /** It just creates an OperationContext without associating it */
+ OperationContext newContext(Executor executor);
+
+ /** Set the context back to the thread */
+ void setContext(OperationContext context);
+
// Message related operations
void pageClosed(SimpleString storeName, int pageNumber);
@@ -56,14 +68,21 @@
boolean isReplicated();
- void afterReplicated(Runnable run);
+ void afterCompleteOperations(IOAsyncTask run);
- /** Block until the replication is done.
+ /** Block until the operations are done.
* @throws Exception */
- void waitOnReplication(long timeout) throws Exception;
+ void waitOnOperations(long timeout) throws Exception;
- void completeReplication();
+ /** Block until the operations are done.
+ * @throws Exception */
+ void waitOnOperations() throws Exception;
+ /** To close the OperationsContext */
+ void completeOperations();
+
+ void clearContext();
+
UUID getPersistentID();
void setPersistentID(UUID id) throws Exception;
@@ -147,6 +166,4 @@
void deleteGrouping(GroupBinding groupBinding) throws Exception;
-
- void sync();
}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -176,7 +176,7 @@
this.delayDeletionCount.incrementAndGet();
}
- public synchronized void decrementDelayDeletionCount() throws Exception
+ public synchronized void decrementDelayDeletionCount()
{
int count = this.delayDeletionCount.decrementAndGet();
@@ -191,7 +191,7 @@
return new DecodingContext();
}
- private void checkDelete() throws Exception
+ private void checkDelete()
{
if (getRefCount() <= 0)
{
@@ -220,7 +220,7 @@
}
@Override
- public synchronized int decrementRefCount(MessageReference reference) throws
Exception
+ public synchronized int decrementRefCount(MessageReference reference)
{
int currentRefCount = super.decrementRefCount(reference);
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -26,9 +26,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import javax.transaction.xa.Xid;
@@ -37,6 +35,8 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -47,6 +47,7 @@
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.SimpleWaitIOCallback;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -54,6 +55,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -76,6 +78,7 @@
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.UUID;
@@ -91,6 +94,7 @@
*/
public class JournalStorageManager implements StorageManager
{
+
private static final Logger log = Logger.getLogger(JournalStorageManager.class);
private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
@@ -142,6 +146,9 @@
private final SequentialFileFactory largeMessagesFactory;
private volatile boolean started;
+
+ /** Used to create Operation Contexts */
+ private final ExecutorFactory executorFactory;
private final Executor executor;
@@ -161,15 +168,17 @@
private final String largeMessagesDirectory;
- public JournalStorageManager(final Configuration config, final Executor executor)
+ public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory)
{
- this(config, executor, null);
+ this(config, executorFactory, null);
}
- public JournalStorageManager(final Configuration config, final Executor executor,
final ReplicationManager replicator)
+ public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory, final ReplicationManager replicator)
{
- this.executor = executor;
+ this.executorFactory = executorFactory;
+ this.executor = executorFactory.getExecutor();
+
this.replicator = replicator;
if (config.getJournalType() != JournalType.NIO && config.getJournalType()
!= JournalType.ASYNCIO)
@@ -291,34 +300,41 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
- if (replicator != null)
- {
- replicator.closeContext();
- }
+ getContext().complete();
}
+
+ public void clearContext()
+ {
+ OperationContextImpl.clearContext();
+ }
public boolean isReplicated()
{
return replicator != null;
}
+
+ public void waitOnOperations() throws Exception
+ {
+ waitOnOperations(-1);
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
*/
- public void waitOnReplication(final long timeout) throws Exception
+ public void waitOnOperations(final long timeout) throws Exception
{
- final CountDownLatch latch = new CountDownLatch(1);
- afterReplicated(new Runnable()
+ SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
+ afterCompleteOperations(waitCallback);
+ completeOperations();
+ if (timeout <= 0)
{
- public void run()
- {
- latch.countDown();
- }
- });
- completeReplication();
- if (!latch.await(timeout, TimeUnit.MILLISECONDS))
+ waitCallback.waitCompletion();
+ }
+ else
+ if (!waitCallback.waitCompletion(timeout))
{
throw new IllegalStateException("no response received from
replication");
}
@@ -363,15 +379,33 @@
// TODO: shouldn't those page methods be on the PageManager? ^^^^
- public void afterReplicated(Runnable run)
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#getContext()
+ */
+ public OperationContext getContext()
{
- if (replicator == null)
- {
- throw new IllegalStateException("StorageManager is not replicated");
- }
- replicator.afterReplicated(run);
+ return OperationContextImpl.getContext(executorFactory);
}
+
+ public void setContext(OperationContext context)
+ {
+ OperationContextImpl.setContext(context);
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#newContext()
+ */
+ public OperationContext newContext(Executor executor)
+ {
+ return new OperationContextImpl(executor);
+ }
+
+ public void afterCompleteOperations(IOAsyncTask run)
+ {
+ getContext().executeOnCompletion(run);
+ }
+
public UUID getPersistentID()
{
return persistentID;
@@ -452,27 +486,27 @@
messageJournal.appendAddRecord(message.getMessageID(),
ADD_LARGE_MESSAGE,
new
LargeMessageEncoding((LargeServerMessage)message),
- false);
+ false, getContext());
}
else
{
- messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message,
false);
+ messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message,
false, getContext());
}
}
public void storeReference(final long queueID, final long messageID) throws Exception
{
- messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID),
syncNonTransactional);
+ messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID),
syncNonTransactional, getContext());
}
public void storeAcknowledge(final long queueID, final long messageID) throws
Exception
{
- messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new
RefEncoding(queueID), syncNonTransactional);
+ messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new
RefEncoding(queueID), syncNonTransactional, getContext());
}
public void deleteMessage(final long messageID) throws Exception
{
- messageJournal.appendDeleteRecord(messageID, syncNonTransactional);
+ messageJournal.appendDeleteRecord(messageID, syncNonTransactional, getContext());
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
@@ -483,29 +517,21 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
encoding,
- syncNonTransactional);
+ syncNonTransactional, getContext());
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final
long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding,
syncNonTransactional);
+ messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding,
syncNonTransactional, getContext());
}
public void deleteDuplicateID(long recordID) throws Exception
{
- messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
+ messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext());
}
- public void sync()
- {
- if (replicator != null)
- {
- replicator.sync();
- }
- }
-
// Transactional operations
public void storeMessageTransactional(final long txID, final ServerMessage message)
throws Exception
@@ -559,13 +585,13 @@
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
{
long id = generateUniqueID();
- messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new
HeuristicCompletionEncoding(xid, isCommit), true);
+ messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new
HeuristicCompletionEncoding(xid, isCommit), true, getContext());
return id;
}
public void deleteHeuristicCompletion(long id) throws Exception
{
- messageJournal.appendDeleteRecord(id, true);
+ messageJournal.appendDeleteRecord(id, true, getContext());
}
public void deletePageTransactional(final long txID, final long recordID) throws
Exception
@@ -591,17 +617,17 @@
public void prepare(final long txID, final Xid xid) throws Exception
{
- messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional);
+ messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional,
getContext());
}
public void commit(final long txID) throws Exception
{
- messageJournal.appendCommitRecord(txID, syncTransactional);
+ messageJournal.appendCommitRecord(txID, syncTransactional, getContext());
}
public void rollback(final long txID) throws Exception
{
- messageJournal.appendRollbackRecord(txID, syncTransactional);
+ messageJournal.appendRollbackRecord(txID, syncTransactional, getContext());
}
public void storeDuplicateIDTransactional(final long txID,
@@ -639,7 +665,7 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
UPDATE_DELIVERY_COUNT,
updateInfo,
- syncNonTransactional);
+ syncNonTransactional, getContext());
}
private static final class AddMessageRecord
@@ -1323,7 +1349,7 @@
return info;
}
-
+
// Public
-----------------------------------------------------------------------------------
public Journal getMessageJournal()
@@ -1384,7 +1410,7 @@
}
// Private
----------------------------------------------------------------------------------
-
+
private void checkAndCreateDir(final String dir, final boolean create)
{
File f = new File(dir);
@@ -1874,11 +1900,11 @@
}
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
@@ -1909,7 +1935,6 @@
}
}
-
}
Added: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.utils.ExecutorFactory;
+
+/**
+ *
+ * This class will hold operations when there are IO operations...
+ * and it will
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class OperationContextImpl implements OperationContext
+{
+
+ private static final ThreadLocal<OperationContext> threadLocalContext = new
ThreadLocal<OperationContext>();
+
+ public static void clearContext()
+ {
+ threadLocalContext.set(null);
+ }
+
+ public static OperationContext getContext(final ExecutorFactory executorFactory)
+ {
+ OperationContext token = threadLocalContext.get();
+ if (token == null)
+ {
+ token = new OperationContextImpl(executorFactory.getExecutor());
+ threadLocalContext.set(token);
+ }
+ return token;
+ }
+
+ public static void setContext(OperationContext context)
+ {
+ threadLocalContext.set(context);
+ }
+
+
+ private List<TaskHolder> tasks;
+
+ private volatile int storeLineUp = 0;
+
+ private volatile int replicationLineUp = 0;
+
+ private int minimalStore = Integer.MAX_VALUE;
+
+ private int minimalReplicated = Integer.MAX_VALUE;
+
+ private int stored = 0;
+
+ private int replicated = 0;
+
+ private int errorCode = -1;
+
+ private String errorMessage = null;
+
+ private Executor executor;
+
+ private final AtomicInteger executorsPending = new AtomicInteger(0);
+
+ public OperationContextImpl(final Executor executor)
+ {
+ super();
+ this.executor = executor;
+ }
+
+ /** To be called by the replication manager, when new replication is added to the
queue */
+ public void lineUp()
+ {
+ storeLineUp++;
+ }
+
+ public void replicationLineUp()
+ {
+ replicationLineUp++;
+ }
+
+ /** this method needs to be called before the executor became operational */
+ public void setExecutor(Executor executor)
+ {
+ this.executor = executor;
+ }
+
+ public synchronized void replicationDone()
+ {
+ replicated++;
+ checkTasks();
+ }
+
+ /** You may have several actions to be done after a replication operation is
completed. */
+ public void executeOnCompletion(final IOAsyncTask completion)
+ {
+ if (errorCode != -1)
+ {
+ completion.onError(errorCode, errorMessage);
+ return;
+ }
+
+ boolean executeNow = false;
+
+ synchronized (this)
+ {
+ if (tasks == null)
+ {
+ tasks = new LinkedList<TaskHolder>();
+ minimalReplicated = replicationLineUp;
+ minimalStore = storeLineUp;
+ }
+
+ // On this case, we can just execute the context directly
+ if (replicationLineUp == replicated && storeLineUp == stored)
+ {
+ if (executor != null)
+ {
+ // We want to avoid the executor if everything is complete...
+ // However, we can't execute the context if there are executions
pending
+ // We need to use the executor on this case
+ if (executorsPending.get() == 0)
+ {
+ // No need to use an executor here or a context switch
+ // there are no actions pending.. hence we can just execute the task
directly on the same thread
+ executeNow = true;
+ }
+ else
+ {
+ execute(completion);
+ }
+ }
+ else
+ {
+ executeNow = true;
+ }
+ }
+ else
+ {
+ tasks.add(new TaskHolder(completion));
+ }
+ }
+
+ if (executeNow)
+ {
+ // Executing outside of any locks
+ completion.done();
+ }
+
+ }
+
+ /** To be called by the storage manager, when data is confirmed on the channel */
+ public synchronized void done()
+ {
+ stored++;
+ checkTasks();
+ }
+
+ private void checkTasks()
+ {
+ if (stored >= minimalStore && replicated >= minimalReplicated)
+ {
+ Iterator<TaskHolder> iter = tasks.iterator();
+ while (iter.hasNext())
+ {
+ TaskHolder holder = iter.next();
+ if (stored >= holder.storeLined && replicated >=
holder.replicationLined)
+ {
+ if (executor != null)
+ {
+ // If set, we use an executor to avoid the server being single
threaded
+ execute(holder.task);
+ }
+ else
+ {
+ holder.task.done();
+ }
+
+ iter.remove();
+ }
+ else
+ {
+ // The actions need to be done in order...
+ // so it must achieve both conditions before we can proceed to more tasks
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * @param holder
+ */
+ private void execute(final IOAsyncTask task)
+ {
+ executorsPending.incrementAndGet();
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ // If any IO is done inside the callback, it needs to be done on a new
context
+ clearContext();
+ task.done();
+ executorsPending.decrementAndGet();
+ }
+ });
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationToken#complete()
+ */
+ public void complete()
+ {
+ // We hold errors until the complete is set, or the callbacks will never get
informed
+ errorCode = -1;
+ errorMessage = null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
+ */
+ public synchronized void onError(int errorCode, String errorMessage)
+ {
+ this.errorCode = errorCode;
+ this.errorMessage = errorMessage;
+
+ if (tasks != null)
+ {
+ Iterator<TaskHolder> iter = tasks.iterator();
+ while (iter.hasNext())
+ {
+ TaskHolder holder = iter.next();
+ holder.task.onError(errorCode, errorMessage);
+ iter.remove();
+ }
+ }
+ }
+
+ class TaskHolder
+ {
+ int storeLined;
+
+ int replicationLined;
+
+ IOAsyncTask task;
+
+ TaskHolder(IOAsyncTask task)
+ {
+ this.storeLined = storeLineUp;
+ this.replicationLined = replicationLineUp;
+ this.task = task;
+ }
+ }
+
+}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -138,7 +138,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.LargeServerMessage#decrementDelayDeletionCount()
*/
- public void decrementDelayDeletionCount() throws Exception
+ public void decrementDelayDeletionCount()
{
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -15,16 +15,19 @@
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -282,14 +285,6 @@
}
/* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
- */
- public void afterReplicated(Runnable run)
- {
- run.run();
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#isReplicated()
*/
public boolean isReplicated()
@@ -300,7 +295,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
}
@@ -336,7 +331,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
*/
- public void waitOnReplication(long timeout) throws Exception
+ public void waitOnOperations(long timeout) throws Exception
{
}
@@ -348,4 +343,49 @@
throw new IllegalStateException("Null Persistence should never be used as
replicated");
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
+ */
+ public void afterCompleteOperations(IOAsyncTask run)
+ {
+ run.done();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#waitOnOperations()
+ */
+ public void waitOnOperations() throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#getContext()
+ */
+ public OperationContext getContext()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#newContext()
+ */
+ public OperationContext newContext(Executor executor)
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#setContext(org.hornetq.core.persistence.OperationContext)
+ */
+ public void setContext(OperationContext context)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#clearContext()
+ */
+ public void clearContext()
+ {
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -155,7 +155,7 @@
}
}
- private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID)
throws Exception
+ private synchronized void addToCacheInMemory(final byte[] duplID, final long
recordID)
{
cache.add(new ByteArrayHolder(duplID));
@@ -173,7 +173,14 @@
// reclaimed
id.a = new ByteArrayHolder(duplID);
- storageManager.deleteDuplicateID(id.b);
+ try
+ {
+ storageManager.deleteDuplicateID(id.b);
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on deleting duplicate cache", e);
+ }
id.b = recordID;
}
@@ -205,7 +212,7 @@
this.recordID = recordID;
}
- private void process() throws Exception
+ private void process()
{
if (!done)
{
@@ -227,17 +234,17 @@
{
}
- public void afterCommit(final Transaction tx) throws Exception
+ public void afterCommit(final Transaction tx)
{
process();
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
process();
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -28,6 +28,7 @@
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
@@ -914,13 +915,6 @@
}
}
}
- else
- {
- if (storageManager.isReplicated())
- {
- storageManager.sync();
- }
- }
message.incrementRefCount(reference);
}
@@ -931,20 +925,20 @@
}
else
{
- if (storageManager.isReplicated())
+ // This will use the same thread if there are no pending operations
+ // avoiding a context switch on this case
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- storageManager.afterReplicated(new Runnable()
+ public void onError(int errorCode, String errorMessage)
{
- public void run()
- {
- addReferences(refs);
- }
- });
- }
- else
- {
- addReferences(refs);
- }
+ log.warn("It wasn't possible to add references due to an IO error
code " + errorCode + " message = " + errorMessage);
+ }
+
+ public void done()
+ {
+ addReferences(refs);
+ }
+ });
}
}
@@ -1120,11 +1114,11 @@
}
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
@@ -1231,11 +1225,11 @@
}
}
- public void afterPrepare(Transaction tx) throws Exception
+ public void afterPrepare(Transaction tx)
{
}
- public void afterRollback(Transaction tx) throws Exception
+ public void afterRollback(Transaction tx)
{
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -29,7 +29,6 @@
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
@@ -94,7 +93,6 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -416,11 +414,6 @@
packet = new ReplicationDeleteMessage();
break;
}
- case REPLICATION_SYNC:
- {
- packet = new ReplicationSyncContextMessage();
- break;
- }
case REPLICATION_DELETE_TX:
{
packet = new ReplicationDeleteTXMessage();
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -243,6 +243,8 @@
log.warn("Connection failure has been detected: " + me.getMessage() +
" [code=" + me.getCode() + "]");
+ System.out.println("Fail on RemotingConnectio");
+
// Then call the listeners
callFailureListeners(me);
@@ -399,6 +401,7 @@
for (final FailureListener listener : listenersClone)
{
+ System.out.println("Calling failure listener: " +
listener.getClass().getName());
try
{
listener.connectionFailed(me);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -172,8 +172,6 @@
public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
public static final byte REPLICATION_COMPARE_DATA = 92;
-
- public static final byte REPLICATION_SYNC = 93;
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Deleted:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationSyncContextMessage.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -1,80 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.remoting.impl.wireformat;
-
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-
-/**
- * Message sent when a Replication Context is complete without any persistence
replicated.
- * On that case we need to go over the cluster to make sure we get the data sent at the
right order.
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationSyncContextMessage extends PacketImpl
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ReplicationSyncContextMessage()
- {
- super(REPLICATION_SYNC);
- }
-
- // Public --------------------------------------------------------
-
- @Override
- public int getRequiredBufferSize()
- {
- return BASIC_PACKET_SIZE;
-
- }
-
- @Override
- public void encodeBody(final HornetQBuffer buffer)
- {
- }
-
- @Override
- public void decodeBody(final HornetQBuffer buffer)
- {
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Deleted: trunk/src/main/org/hornetq/core/replication/ReplicationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication;
-
-
-/**
- * This represents a set of operations done as part of replication.
- * When the entire set is done a group of Runnables can be executed.
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface ReplicationContext
-{
- /** To be called by the replication manager, when new replication is added to the
queue */
- void linedUp();
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- void replicated();
-
- void addReplicationAction(Runnable runnable);
-
- /** To be called when there are no more operations pending */
- void complete();
-
- /** Flush all pending callbacks on the Context */
- void flush();
-
-}
Modified: trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -19,6 +19,7 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.utils.SimpleString;
@@ -49,13 +50,8 @@
void appendRollbackRecord(byte journalID, long txID) throws Exception;
- /** Add an action to be executed after the pending replications */
- void afterReplicated(Runnable runnable);
-
- void closeContext();
-
/** A list of tokens that are still waiting for replications to be completed */
- Set<ReplicationContext> getActiveTokens();
+ Set<OperationContext> getActiveTokens();
/**
* @param storeName
@@ -87,7 +83,5 @@
* @throws HornetQException
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
-
- void sync();
-
+
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -16,6 +16,7 @@
import java.util.List;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
@@ -27,7 +28,6 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
-
/**
* Used by the {@link JournalStorageManager} to replicate journal calls.
*
@@ -46,7 +46,7 @@
// Attributes ----------------------------------------------------
private static final boolean trace = false;
-
+
private static void trace(String message)
{
System.out.println("ReplicatedJournal::" + message);
@@ -58,9 +58,7 @@
private final byte journalID;
- public ReplicatedJournal(final byte journaID,
- final Journal localJournal,
- final ReplicationManager replicationManager)
+ public ReplicatedJournal(final byte journaID, final Journal localJournal, final
ReplicationManager replicationManager)
{
super();
journalID = journaID;
@@ -69,11 +67,11 @@
}
// Static --------------------------------------------------------
-
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+
/**
* @param id
* @param recordType
@@ -87,6 +85,21 @@
this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync) throws Exception
+ {
+ if (trace)
+ {
+ trace("Append record id = " + id + " recordType = " +
recordType);
+ }
+ replicationManager.appendAddRecord(journalID, id, recordType, record);
+ localJournal.appendAddRecord(id, recordType, record, sync);
+ }
+
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception
+ {
+ this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync,
completionCallback);
+ }
+
/**
* @param id
* @param recordType
@@ -95,14 +108,18 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync) throws Exception
+ public void appendAddRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync,
+ IOCompletion completionCallback) throws Exception
{
if (trace)
{
trace("Append record id = " + id + " recordType = " +
recordType);
}
replicationManager.appendAddRecord(journalID, id, recordType, record);
- localJournal.appendAddRecord(id, recordType, record, sync);
+ localJournal.appendAddRecord(id, recordType, record, sync, completionCallback);
}
/**
@@ -155,6 +172,19 @@
localJournal.appendCommitRecord(txID, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws
Exception
+ {
+ if (trace)
+ {
+ trace("AppendCommit " + txID);
+ }
+ replicationManager.appendCommitRecord(journalID, txID);
+ localJournal.appendCommitRecord(txID, sync, callback);
+ }
+
/**
* @param id
* @param sync
@@ -171,6 +201,19 @@
localJournal.appendDeleteRecord(id, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback)
throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendDelete " + id);
+ }
+ replicationManager.appendDeleteRecord(journalID, id);
+ localJournal.appendDeleteRecord(id, sync, completionCallback);
+ }
+
/**
* @param txID
* @param id
@@ -245,6 +288,27 @@
localJournal.appendPrepareRecord(txID, transactionData, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long,
org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync, IOCompletion callback) throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendPrepare txID=" + txID);
+ }
+ replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+ localJournal.appendPrepareRecord(txID, transactionData, sync, callback);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion callback) throws Exception
+ {
+ this.appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync,
callback);
+ }
+
/**
* @param txID
* @param sync
@@ -261,6 +325,19 @@
localJournal.appendRollbackRecord(txID, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendRollback " + txID);
+ }
+ replicationManager.appendRollbackRecord(journalID, txID);
+ localJournal.appendRollbackRecord(txID, sync, callback);
+ }
+
/**
* @param id
* @param recordType
@@ -291,7 +368,34 @@
replicationManager.appendUpdateRecord(journalID, id, recordType, record);
localJournal.appendUpdateRecord(id, recordType, record, sync);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[],
boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception
+ {
+ this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync,
completionCallback);
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendUpdateRecord id = " + id + " , recordType = " +
recordType);
+ }
+ replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+ localJournal.appendUpdateRecord(id, recordType, record, sync, completionCallback);
+ }
+
+
+
/**
* @param txID
* @param id
@@ -338,8 +442,8 @@
* @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List,
org.hornetq.core.journal.TransactionFailureCallback)
*/
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
- final List<PreparedTransactionInfo> preparedTransactions,
- final TransactionFailureCallback transactionFailure) throws
Exception
+ final List<PreparedTransactionInfo>
preparedTransactions,
+ final TransactionFailureCallback
transactionFailure) throws Exception
{
return localJournal.load(committedRecords, preparedTransactions,
transactionFailure);
}
Deleted: trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -1,105 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.replication.ReplicationContext;
-
-/**
- * A ReplicationToken
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationContextImpl implements ReplicationContext
-{
- private List<Runnable> tasks;
-
- private AtomicInteger pendings = new AtomicInteger(0);
-
- private volatile boolean complete = false;
-
- /**
- * @param executor
- */
- public ReplicationContextImpl()
- {
- super();
- }
-
- /** To be called by the replication manager, when new replication is added to the
queue */
- public void linedUp()
- {
- pendings.incrementAndGet();
- }
-
- /** You may have several actions to be done after a replication operation is
completed. */
- public void addReplicationAction(Runnable runnable)
- {
- if (complete)
- {
- // Sanity check, this shouldn't happen
- throw new IllegalStateException("The Replication Context is complete, and
no more tasks are accepted");
- }
-
- if (tasks == null)
- {
- // No need to use Concurrent, we only add from a single thread.
- // We don't add any more Runnables after it is complete
- tasks = new ArrayList<Runnable>();
- }
-
- tasks.add(runnable);
- }
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- public synchronized void replicated()
- {
- if (pendings.decrementAndGet() == 0 && complete)
- {
- flush();
- }
- }
-
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationToken#complete()
- */
- public synchronized void complete()
- {
- complete = true;
- if (pendings.get() == 0 && complete)
- {
- flush();
- }
- }
-
- public synchronized void flush()
- {
- if (tasks != null)
- {
- for (Runnable run : tasks)
- {
- run.run();
- }
- tasks.clear();
- }
- }
-
-
-}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -17,7 +17,6 @@
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_SYNC;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -163,11 +162,6 @@
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
}
- else if (packet.getType() == REPLICATION_SYNC)
- {
- //
https://jira.jboss.org/jira/browse/HORNETQ-218
- // Nothing to be done, we just needed a round trip to process events in
order
- }
else
{
log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
@@ -196,8 +190,10 @@
{
Configuration config = server.getConfiguration();
- storage = new JournalStorageManager(config,
server.getExecutorFactory().getExecutor());
+ storage = new JournalStorageManager(config, server.getExecutorFactory());
storage.start();
+
+ server.getManagementService().setStorageManager(storage);
bindingsJournal = storage.getBindingsJournal();
messagingJournal = storage.getMessageJournal();
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -13,6 +13,7 @@
package org.hornetq.core.replication.impl;
+import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -24,6 +25,9 @@
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
@@ -34,7 +38,6 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -44,9 +47,8 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.replication.ReplicationContext;
import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.utils.ConcurrentHashSet;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -80,12 +82,10 @@
private final Object replicationLock = new Object();
- private final ThreadLocal<ReplicationContext> tlReplicationContext = new
ThreadLocal<ReplicationContext>();
+ private final Queue<OperationContext> pendingTokens = new
ConcurrentLinkedQueue<OperationContext>();
+
+ private final ExecutorFactory executorFactory;
- private final Queue<ReplicationContext> pendingTokens = new
ConcurrentLinkedQueue<ReplicationContext>();
-
- private final ConcurrentHashSet<ReplicationContext> activeContexts = new
ConcurrentHashSet<ReplicationContext>();
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -93,11 +93,12 @@
/**
* @param replicationConnectionManager
*/
- public ReplicationManagerImpl(final FailoverManager failoverManager, final int
backupWindowSize)
+ public ReplicationManagerImpl(final FailoverManager failoverManager, final
ExecutorFactory executorFactory, final int backupWindowSize)
{
super();
this.failoverManager = failoverManager;
this.backupWindowSize = backupWindowSize;
+ this.executorFactory = executorFactory;
}
// Public --------------------------------------------------------
@@ -278,14 +279,6 @@
sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId));
}
}
-
- public void sync()
- {
- if (enabled)
- {
- sendReplicatePacket(new ReplicationSyncContextMessage());
- }
- }
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long,
byte[])
@@ -351,9 +344,9 @@
log.warn(e.getMessage(), e);
}
}
-
+
public void beforeReconnect(HornetQException me)
- {
+ {
}
});
@@ -374,14 +367,14 @@
enabled = false;
- for (ReplicationContext ctx : activeContexts)
+ // The same context will be replicated on the pending tokens...
+ // as the multiple operations will be replicated on the same context
+ while (!pendingTokens.isEmpty())
{
- ctx.complete();
- ctx.flush();
+ OperationContext ctx = pendingTokens.poll();
+ ctx.replicationDone();
}
-
- activeContexts.clear();
-
+
if (replicatingChannel != null)
{
replicatingChannel.close();
@@ -401,63 +394,47 @@
started = false;
}
- public ReplicationContext getContext()
- {
- ReplicationContext token = tlReplicationContext.get();
- if (token == null)
- {
- token = new ReplicationContextImpl();
- activeContexts.add(token);
- tlReplicationContext.set(token);
- }
- return token;
- }
- /* (non-Javadoc)
- * @see
org.hornetq.core.replication.ReplicationManager#addReplicationAction(java.lang.Runnable)
+ /* method for testcases only
+ * @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
*/
- public void afterReplicated(final Runnable runnable)
+ public Set<OperationContext> getActiveTokens()
{
- getContext().addReplicationAction(runnable);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#completeToken()
- */
- public void closeContext()
- {
- final ReplicationContext token = tlReplicationContext.get();
- if (token != null)
+ LinkedHashSet<OperationContext> activeContexts = new
LinkedHashSet<OperationContext>();
+
+ // The same context will be replicated on the pending tokens...
+ // as the multiple operations will be replicated on the same context
+
+ for (OperationContext ctx : pendingTokens)
{
- // Disassociate thread local
- tlReplicationContext.set(null);
- // Remove from pending tokens as soon as this is complete
- token.addReplicationAction(new Runnable()
- {
- public void run()
- {
- activeContexts.remove(token);
- }
- });
- token.complete();
+ activeContexts.add(ctx);
}
+
+ return activeContexts;
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
+ * @see
org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
*/
- public Set<ReplicationContext> getActiveTokens()
+ public void compareJournals(JournalLoadInformation[] journalInfo) throws
HornetQException
{
- return activeContexts;
+ replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
}
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
private void sendReplicatePacket(final Packet packet)
{
boolean runItNow = false;
- ReplicationContext repliToken = getContext();
- repliToken.linedUp();
+ OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
+ repliToken.replicationLineUp();
synchronized (replicationLock)
{
@@ -479,38 +456,22 @@
if (runItNow)
{
- repliToken.replicated();
+ repliToken.replicationDone();
}
}
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
- */
- public void compareJournals(JournalLoadInformation[] journalInfo) throws
HornetQException
- {
- replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
- }
-
private void replicated()
{
- ReplicationContext tokenPolled = pendingTokens.poll();
- if (tokenPolled == null)
+ OperationContext ctx = pendingTokens.poll();
+
+ if (ctx == null)
{
throw new IllegalStateException("Missing replication token on the
queue.");
}
- else
- {
- tokenPolled.replicated();
- }
+
+ ctx.replicationDone();
}
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
// Inner classes -------------------------------------------------
protected class ResponseHandler implements ChannelHandler
Modified: trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-23 23:28:33 UTC
(rev 8388)
+++ trunk/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-24 02:53:23 UTC
(rev 8389)
@@ -42,5 +42,5 @@
void incrementDelayDeletionCount();
- void decrementDelayDeletionCount() throws Exception;
+ void decrementDelayDeletionCount();
}
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-23 23:28:33 UTC (rev
8388)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-24 02:53:23 UTC (rev
8389)
@@ -33,7 +33,7 @@
int incrementRefCount(MessageReference reference) throws Exception;
- int decrementRefCount(MessageReference reference) throws Exception;
+ int decrementRefCount(MessageReference reference);
int incrementDurableRefCount();
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -16,6 +16,7 @@
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -144,21 +145,21 @@
tx.commit();
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- storageManager.afterReplicated(new Runnable()
+
+ public void onError(int errorCode, String errorMessage)
{
- public void run()
- {
- execPrompter();
- }
- });
- storageManager.completeReplication();
- }
- else
- {
- execPrompter();
- }
+ log.warn("IO Error during redistribution, errorCode = " + errorCode
+ " message = " + errorMessage);
+ }
+
+ public void done()
+ {
+ execPrompter();
+ }
+ });
+
+ storageManager.completeOperations();
}
private void execPrompter()
Modified: trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -87,10 +87,7 @@
}
newList.add(groupBinding);
storageManager.addGrouping(groupBinding);
- if (storageManager.isReplicated())
- {
- storageManager.waitOnReplication(timeout);
- }
+ storageManager.waitOnOperations(timeout);
return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
}
else
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -22,6 +22,7 @@
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -61,6 +62,7 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
@@ -414,6 +416,12 @@
{
log.debug("Waiting for " + task);
}
+
+ if (replicationManager != null)
+ {
+ replicationManager.stop();
+ replicationManager = null;
+ }
threadPool.shutdown();
@@ -640,6 +648,10 @@
}
Channel channel = connection.getChannel(channelID, sendWindowSize);
+
+ Executor sessionExecutor = executorFactory.getExecutor();
+
+ storageManager.newContext(sessionExecutor);
final ServerSessionImpl session = new ServerSessionImpl(name,
username,
@@ -655,7 +667,7 @@
postOffice,
resourceManager,
securityStore,
-
executorFactory.getExecutor(),
+ sessionExecutor,
channel,
managementService,
// queueFactory,
@@ -664,7 +676,8 @@
sessions.put(name, session);
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session);
+ // The executor on the OperationContext here has to be the same as the session, or
we would have ordering issues on messages
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
storageManager.newContext(sessionExecutor), storageManager);
session.setHandler(handler);
@@ -893,7 +906,7 @@
{
if (configuration.isPersistenceEnabled())
{
- return new JournalStorageManager(configuration, threadPool,
replicationManager);
+ return new JournalStorageManager(configuration, this.executorFactory,
replicationManager);
}
else
{
@@ -922,6 +935,7 @@
replicationFailoverManager = createBackupConnection(backupConnector,
threadPool, scheduledPool);
replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
+ executorFactory,
configuration.getBackupWindowSize());
replicationManager.start();
}
@@ -1031,7 +1045,7 @@
configuration.getManagementClusterPassword(),
managementService);
- queueFactory = new QueueFactoryImpl(scheduledPool, addressSettingsRepository,
storageManager);
+ queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool,
addressSettingsRepository, storageManager);
pagingManager = createPagingManager();
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -14,6 +14,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.core.filter.Filter;
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -22,6 +22,7 @@
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -42,11 +43,16 @@
private PostOffice postOffice;
private final StorageManager storageManager;
+
+ private final ExecutorFactory executorFactory;
- public QueueFactoryImpl(final ScheduledExecutorService scheduledExecutor,
+ public QueueFactoryImpl(final ExecutorFactory executorFactory,
+ final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<AddressSettings>
addressSettingsRepository,
final StorageManager storageManager)
{
+ this.executorFactory = executorFactory;
+
this.addressSettingsRepository = addressSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-23 23:28:33 UTC
(rev 8388)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-24 02:53:23 UTC
(rev 8389)
@@ -114,7 +114,7 @@
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
private final ScheduledExecutorService scheduledExecutor;
-
+
private final SimpleString address;
private Redistributor redistributor;
@@ -621,6 +621,8 @@
{
acknowledge(ref);
}
+
+ storageManager.completeOperations();
}
public void setExpiryAddress(final SimpleString expiryAddress)
@@ -643,40 +645,46 @@
return deleteMatchingReferences(null);
}
- public synchronized int deleteMatchingReferences(final Filter filter) throws
Exception
+ public int deleteMatchingReferences(final Filter filter) throws Exception
{
int count = 0;
-
- Transaction tx = new TransactionImpl(storageManager);
-
- Iterator<MessageReference> iter = messageReferences.iterator();
-
- while (iter.hasNext())
+
+ synchronized(this)
{
- MessageReference ref = iter.next();
-
- if (filter == null || filter.match(ref.getMessage()))
+
+ Transaction tx = new TransactionImpl(storageManager);
+
+ Iterator<MessageReference> iter = messageReferences.iterator();
+
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- acknowledge(tx, ref);
- iter.remove();
- count++;
+ MessageReference ref = iter.next();
+
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, ref);
+ iter.remove();
+ count++;
+ }
}
- }
-
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
- for (MessageReference messageReference : cancelled)
- {
- if (filter == null || filter.match(messageReference.getMessage()))
+
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ for (MessageReference messageReference : cancelled)
{
- deliveringCount.incrementAndGet();
- acknowledge(tx, messageReference);
- count++;
+ if (filter == null || filter.match(messageReference.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, messageReference);
+ count++;
+ }
}
+
+ tx.commit();
}
+
+ storageManager.waitOnOperations(-1);
- tx.commit();
-
return count;
}
@@ -930,6 +938,7 @@
if (message.isDurable() && durable)
{
storageManager.updateDeliveryCount(reference);
+ storageManager.waitOnOperations();
}
AddressSettings addressSettings =
addressSettingsRepository.getMatch(address.toString());
@@ -939,6 +948,7 @@
if (maxDeliveries > 0 && reference.getDeliveryCount() >=
maxDeliveries)
{
sendToDeadLetterAddress(reference);
+ storageManager.waitOnOperations();
return false;
}
@@ -1381,7 +1391,7 @@
return status;
}
- private void removeExpiringReference(final MessageReference ref) throws Exception
+ private void removeExpiringReference(final MessageReference ref)
{
if (ref.getMessage().getExpiration() > 0)
{
@@ -1389,9 +1399,9 @@
}
}
- private void postAcknowledge(final MessageReference ref) throws Exception
+ private void postAcknowledge(final MessageReference ref)
{
- ServerMessage message = ref.getMessage();
+ final ServerMessage message = ref.getMessage();
QueueImpl queue = (QueueImpl)ref.getQueue();
@@ -1431,7 +1441,7 @@
message.decrementRefCount(ref);
}
- void postRollback(final LinkedList<MessageReference> refs) throws Exception
+ void postRollback(final LinkedList<MessageReference> refs)
{
synchronized (this)
{
@@ -1481,29 +1491,36 @@
{
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
Map<QueueImpl, LinkedList<MessageReference>> queueMap = new
HashMap<QueueImpl, LinkedList<MessageReference>>();
for (MessageReference ref : refsToAck)
{
- if (ref.getQueue().checkDLQ(ref))
+ try
{
- LinkedList<MessageReference> toCancel =
queueMap.get(ref.getQueue());
-
- if (toCancel == null)
+ if (ref.getQueue().checkDLQ(ref))
{
- toCancel = new LinkedList<MessageReference>();
-
- queueMap.put((QueueImpl)ref.getQueue(), toCancel);
+ LinkedList<MessageReference> toCancel =
queueMap.get(ref.getQueue());
+
+ if (toCancel == null)
+ {
+ toCancel = new LinkedList<MessageReference>();
+
+ queueMap.put((QueueImpl)ref.getQueue(), toCancel);
+ }
+
+ toCancel.addFirst(ref);
}
-
- toCancel.addFirst(ref);
}
+ catch (Exception e)
+ {
+ log.warn("Error on checkDLQ", e);
+ }
}
for (Map.Entry<QueueImpl, LinkedList<MessageReference>> entry :
queueMap.entrySet())
@@ -1519,7 +1536,7 @@
}
}
- public void afterCommit(final Transaction tx) throws Exception
+ public void afterCommit(final Transaction tx)
{
for (MessageReference ref : refsToAck)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -119,7 +119,7 @@
return count;
}
- public int decrementRefCount(final MessageReference reference) throws Exception
+ public int decrementRefCount(final MessageReference reference)
{
int count = refCount.decrementAndGet();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -1,5 +1,5 @@
/*
- * Copyright 2009 Red Hat, Inc.
+x * Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -34,6 +34,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
@@ -185,7 +186,7 @@
private final HornetQServer server;
private final SimpleString managementAddress;
-
+
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
@@ -240,7 +241,7 @@
this.resourceManager = resourceManager;
this.securityStore = securityStore;
-
+
this.executor = executor;
if (!xa)
@@ -1457,7 +1458,7 @@
public void handleSend(final SessionSendMessage packet)
{
Packet response = null;
-
+
ServerMessage message = packet.getServerMessage();
try
@@ -1718,23 +1719,23 @@
final boolean flush,
final boolean closeChannel)
{
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- storageManager.afterReplicated(new Runnable()
+
+ public void onError(int errorCode, String errorMessage)
{
- public void run()
- {
- doSendResponse(confirmPacket, response, flush, closeChannel);
- }
+ log.warn("Error processing IOCallback code = " + errorCode + "
message = " + errorMessage);
- });
+ HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new
HornetQException(errorCode, errorMessage));
+
+ doSendResponse(confirmPacket, exceptionMessage, flush, closeChannel);
+ }
- storageManager.completeReplication();
- }
- else
- {
- doSendResponse(confirmPacket, response, flush, closeChannel);
- }
+ public void done()
+ {
+ doSendResponse(confirmPacket, response, flush, closeChannel);
+ }
+ });
}
/**
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -45,6 +45,8 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
@@ -80,16 +82,26 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org>Clebert
Suconic</a>
*/
public class ServerSessionPacketHandler implements ChannelHandler
{
private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
private final ServerSession session;
+
+ private final OperationContext sessionContext;
+
+ // Storagemanager here is used to set the Context
+ private final StorageManager storageManager;
- public ServerSessionPacketHandler(final ServerSession session)
+ public ServerSessionPacketHandler(final ServerSession session, OperationContext
sessionContext, StorageManager storageManager)
{
this.session = session;
+
+ this.storageManager = storageManager;
+
+ this.sessionContext = sessionContext;
}
public long getID()
@@ -101,6 +113,8 @@
{
byte type = packet.getType();
+ storageManager.setContext(sessionContext);
+
try
{
switch (type)
@@ -289,5 +303,10 @@
{
log.error("Caught unexpected exception", t);
}
+ finally
+ {
+ storageManager.completeOperations();
+ storageManager.clearContext();
+ }
}
}
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -29,9 +29,9 @@
void beforeRollback(Transaction tx) throws Exception;
- void afterPrepare(Transaction tx) throws Exception;
+ void afterPrepare(Transaction tx);
- void afterCommit(Transaction tx) throws Exception;
+ void afterCommit(Transaction tx);
- void afterRollback(Transaction tx) throws Exception;
+ void afterRollback(Transaction tx);
}
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -19,6 +19,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -98,7 +99,7 @@
{
this.containsPersistent = true;
}
-
+
public long getID()
{
return id;
@@ -146,14 +147,37 @@
storageManager.prepare(id, xid);
state = State.PREPARED;
+ // We use the Callback even for non persistence
+ // If we are using non-persistence with replication, the replication manager
will have
+ // to execute this runnable in the correct order
+ storageManager.afterCompleteOperations(new IOAsyncTask()
+ {
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
+ public void onError(int errorCode, String errorMessage)
{
- operation.afterPrepare(this);
+ log.warn("IO Error completing the transaction, code = " +
errorCode + ", message = " + errorMessage);
}
- }
+
+ public void done()
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ try
+ {
+ operation.afterPrepare(TransactionImpl.this);
+ }
+ catch (Exception e)
+ {
+ //
https://jira.jboss.org/jira/browse/HORNETQ-188
+ // After commit shouldn't throw an exception
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ });
}
}
@@ -181,15 +205,8 @@
if (xid != null)
{
- if (onePhase)
+ if (onePhase && state != State.ACTIVE || !onePhase && state
!= State.PREPARED)
{
- if (state == State.ACTIVE)
- {
- prepare();
- }
- }
- if (state != State.PREPARED)
- {
throw new IllegalStateException("Transaction is in invalid state
" + state);
}
}
@@ -209,13 +226,29 @@
}
}
- Runnable execAfterCommit = null;
+ if (containsPersistent || (xid != null && state == State.PREPARED))
+ {
+ storageManager.commit(id);
- if (operations != null)
+ state = State.COMMITTED;
+ }
+
+ // We use the Callback even for non persistence
+ // If we are using non-persistence with replication, the replication manager
will have
+ // to execute this runnable in the correct order
+ // This also will only use a different thread if there are any IO pendings.
+ // If the IO finished early by the time we got here, we won't need an
executor
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- execAfterCommit = new Runnable()
+
+ public void onError(int errorCode, String errorMessage)
{
- public void run()
+ log.warn("IO Error completing the transaction, code = " +
errorCode + ", message = " + errorMessage);
+ }
+
+ public void done()
+ {
+ if (operations != null)
{
for (TransactionOperation operation : operations)
{
@@ -231,31 +264,9 @@
}
}
}
- };
- }
+ }
+ });
- if (containsPersistent || (xid != null && state == State.PREPARED))
- {
- storageManager.commit(id);
-
- state = State.COMMITTED;
-
- if (execAfterCommit != null)
- {
- if (storageManager.isReplicated())
- {
- storageManager.afterReplicated(execAfterCommit);
- }
- else
- {
- execAfterCommit.run();
- }
- }
- }
- else if (execAfterCommit != null)
- {
- execAfterCommit.run();
- }
}
}
@@ -290,13 +301,37 @@
state = State.ROLLEDBACK;
- if (operations != null)
+ // We use the Callback even for non persistence
+ // If we are using non-persistence with replication, the replication manager
will have
+ // to execute this runnable in the correct order
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- for (TransactionOperation operation : operations)
+
+ public void onError(int errorCode, String errorMessage)
{
- operation.afterRollback(this);
+ log.warn("IO Error completing the transaction, code = " +
errorCode + ", message = " + errorMessage);
}
- }
+
+ public void done()
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ try
+ {
+ operation.afterRollback(TransactionImpl.this);
+ }
+ catch (Exception e)
+ {
+ //
https://jira.jboss.org/jira/browse/HORNETQ-188
+ // After commit shouldn't throw an exception
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ });
}
}
Modified: trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -40,8 +40,20 @@
{
private static final Logger log = Logger.getLogger(QueueTest.class);
- private QueueFactory queueFactory = new FakeQueueFactory();
+ private FakeQueueFactory queueFactory = new FakeQueueFactory();
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ queueFactory = new FakeQueueFactory();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ queueFactory.stop();
+ super.tearDown();
+ }
+
/*
* Concurrent set consumer not busy, busy then, call deliver while messages are being
added and consumed
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -179,6 +179,14 @@
clientSession.rollback();
}
+ long timeout = System.currentTimeMillis() + 5000;
+
+ // DLA transfer is asynchronous fired on the rollback
+ while (System.currentTimeMillis() < timeout &&
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount() != 0)
+ {
+ Thread.sleep(1);
+ }
+
assertEquals(0,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -433,7 +433,7 @@
assertTrue(latch.await(10, TimeUnit.SECONDS));
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
assertEquals(dummyMessageHandler.list.size(), 50);
int i = 0;
for (ClientMessage message : dummyMessageHandler.list)
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -13,6 +13,14 @@
package org.hornetq.tests.integration.client;
+import java.util.HashMap;
+
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.integration.transports.netty.TransportConstants;
+
/**
* A NettyConsumerWindowSizeTest
*
@@ -32,7 +40,39 @@
return true;
}
+ protected ClientSessionFactory createNettyFactory()
+ {
+ HashMap<String, Object> parameters = new HashMap<String, Object>();
+
+ parameters.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
+
+ TransportConfiguration config = new TransportConfiguration(NETTY_CONNECTOR_FACTORY,
parameters);
+
+ return new ClientSessionFactoryImpl(config);
+
+ //return super.createNettyFactory();
+ }
+ protected Configuration createDefaultConfig(final boolean netty)
+ {
+ if (netty)
+ {
+
+ HashMap<String, Object> parameters = new HashMap<String, Object>();
+
+ parameters.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
+
+ return createDefaultConfig(parameters, INVM_ACCEPTOR_FACTORY,
NETTY_ACCEPTOR_FACTORY);
+ }
+ else
+ {
+ new Exception("This test wasn't supposed to use
InVM").printStackTrace();
+ return super.createDefaultConfig(false);
+ }
+ }
+
+
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
Added: trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java
(rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A OrderTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OrderTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ super.tearDown();
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testSimpleOrderNoStorage() throws Exception
+ {
+ doTestSimpleOrder(false);
+ }
+
+ public void testSimpleOrderPersistence() throws Exception
+ {
+ doTestSimpleOrder(true);
+ }
+
+ public void doTestSimpleOrder(final boolean persistent) throws Exception
+ {
+ server = createServer(persistent, true);
+ server.start();
+
+ ClientSessionFactory sf = createNettyFactory();
+
+ sf.setBlockOnNonPersistentSend(false);
+ sf.setBlockOnPersistentSend(false);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ try
+ {
+ session.createQueue("queue", "queue", true);
+
+ ClientProducer prod = session.createProducer("queue");
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = session.createClientMessage(i % 2 == 0);
+ msg.setBody(session.createBuffer(new byte[1024]));
+ msg.putIntProperty("id", i);
+ prod.send(msg);
+ }
+
+ session.close();
+
+ boolean started = false;
+
+ for (int start = 0; start < 2; start++)
+ {
+
+ if (persistent && start == 1)
+ {
+ started = true;
+ server.stop();
+ server.start();
+ }
+
+ session = sf.createSession(true, true);
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer("queue");
+
+ for (int i = 0; i < 100; i++)
+ {
+ if (!started || started && i % 2 == 0)
+ {
+ ClientMessage msg = cons.receive(10000);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ }
+ }
+
+ cons.close();
+
+ cons = session.createConsumer("queue");
+
+ for (int i = 0; i < 100; i++)
+ {
+ if (!started || started && i % 2 == 0)
+ {
+ ClientMessage msg = cons.receive(10000);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ }
+ }
+
+ session.close();
+ }
+
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -158,15 +158,18 @@
for (int i = 0; i < numberOfMessages; i++)
{
+ System.out.println("Message " + i + " of " +
numberOfMessages);
ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
assertNotNull(message2);
- assertEquals(i, ((Integer)message2.getObjectProperty(new
SimpleString("id"))).intValue());
+ assertEquals(i, message2.getIntProperty("id").intValue());
message2.acknowledge();
assertNotNull(message2);
+
+ session.commit();
try
{
@@ -968,9 +971,6 @@
assertNull(consumerPaged.receiveImmediate());
-
assertFalse(server.getPostOffice().getPagingManager().getPageStore(PAGED_ADDRESS).isPaging());
-
assertFalse(server.getPostOffice().getPagingManager().getPageStore(NON_PAGED_ADDRESS).isPaging());
-
session.close();
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -169,7 +169,6 @@
assertEquals(0, sf.numConnections());
}
-
/** It doesn't fail, but it restart both servers, live and backup, and the data
should be received after the restart,
* and the servers should be able to connect without any problems. */
public void testRestartServers() throws Exception
@@ -1670,6 +1669,73 @@
assertEquals(0, sf.numConnections());
}
+ public void testSimpleSendAfterFailover() throws Exception
+ {
+ ClientSessionFactoryInternal sf = getSessionFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener extends BaseListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ session.addFailureListener(new MyListener());
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ fail(session, latch);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(i % 2 == 0);
+
+ setBody(i, message);
+
+ System.out.println("Durable = " + message.isDurable());
+
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertNotNull(message);
+
+ assertMessageBody(i, message);
+
+ assertEquals(i, message.getIntProperty("counter").intValue());
+
+ message.acknowledge();
+ }
+
+ session.close();
+
+ assertEquals(0, sf.numSessions());
+
+ assertEquals(0, sf.numConnections());
+ }
+
public void testForceBlockingReturn() throws Exception
{
ClientSessionFactoryInternal sf = this.getSessionFactory();
Modified:
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -237,7 +237,7 @@
producer = session.createProducer(ADDRESS);
- session.commit(xid, true);
+ session.commit(xid, false);
xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.largemessage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.hornetq.core.config.Configuration;
@@ -21,6 +23,8 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
/**
* A ServerLargeMessageTest
@@ -36,12 +40,32 @@
// Attributes ----------------------------------------------------
+ ExecutorService executor;
+
+ ExecutorFactory execFactory;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ executor = Executors.newCachedThreadPool();
+
+ execFactory = new OrderedExecutorFactory(executor);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ executor.shutdown();
+
+ super.tearDown();
+ }
+
public void testLargeMessageCopy() throws Exception
{
clearData();
@@ -52,7 +76,7 @@
configuration.setJournalType(JournalType.ASYNCIO);
- final JournalStorageManager journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
+ final JournalStorageManager journal = new JournalStorageManager(configuration,
execFactory);
journal.start();
LargeServerMessage msg = journal.createLargeMessage();
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -18,6 +18,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.hornetq.core.config.Configuration;
@@ -30,6 +32,8 @@
import org.hornetq.core.server.Queue;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
/**
* A DeleteMessagesRestartTest
@@ -48,11 +52,32 @@
// Attributes ----------------------------------------------------
+ ExecutorService executor;
+
+ ExecutorFactory execFactory;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ executor = Executors.newCachedThreadPool();
+
+ this.execFactory = new OrderedExecutorFactory(executor);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ executor.shutdown();
+
+ super.tearDown();
+ }
+
public void testRestartStorageManager() throws Exception
{
@@ -67,7 +92,8 @@
PostOffice postOffice = new FakePostOffice();
- final JournalStorageManager journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
+ final JournalStorageManager journal = new JournalStorageManager(configuration,
execFactory);
+
try
{
Modified:
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -52,17 +52,6 @@
// Public --------------------------------------------------------
- public void _test() throws Exception
- {
- for (int i = 0; i < 100; i++)
- {
- System.out.println("<<<<<< " + i + "
>>>>>>>");
- testTxMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup();
- tearDown();
- setUp();
- }
- }
-
public void testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup()
throws Exception
{
doTestMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup(false);
@@ -92,7 +81,7 @@
}
session.createQueue(address, queue, true);
ClientProducer producer = session.createProducer(address);
- boolean durable = true;
+ boolean durable = false;
for (int i = 0; i < NUM; i++)
{
ClientMessage msg = session.createClientMessage(durable);
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -26,6 +26,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.ClientSessionFactory;
@@ -37,6 +38,8 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
@@ -49,7 +52,10 @@
import org.hornetq.core.paging.impl.PagedMessageImpl;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
@@ -66,6 +72,7 @@
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -85,6 +92,8 @@
private ThreadFactory tFactory;
private ExecutorService executor;
+
+ private ExecutorFactory factory;
private ScheduledExecutorService scheduledExecutor;
@@ -110,6 +119,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
manager.stop();
@@ -136,6 +146,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
try
@@ -178,6 +189,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
@@ -185,6 +197,7 @@
try
{
ReplicationManagerImpl manager2 = new
ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager2.start();
@@ -219,6 +232,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
try
@@ -237,7 +251,7 @@
server.stop();
}
}
-
+
public void testSendPackets() throws Exception
{
@@ -253,7 +267,10 @@
try
{
+ StorageManager storage = getStorage();
+
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
@@ -272,20 +289,8 @@
replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
replicatedJournal.appendRollbackRecord(3, false);
- assertEquals(1, manager.getActiveTokens().size());
+ blockOnReplication(storage, manager);
- blockOnReplication(manager);
-
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
assertEquals(0, manager.getActiveTokens().size());
ServerMessage msg = new ServerMessageImpl();
@@ -302,7 +307,7 @@
manager.pageWrite(pgmsg, 3);
manager.pageWrite(pgmsg, 4);
- blockOnReplication(manager);
+ blockOnReplication(storage, manager);
PagingManager pagingManager = createPageManager(server.getStorageManager(),
server.getConfiguration(),
@@ -321,7 +326,7 @@
manager.pageDeleted(dummy, 5);
manager.pageDeleted(dummy, 6);
- blockOnReplication(manager);
+ blockOnReplication(storage, manager);
ServerMessageImpl serverMsg = new ServerMessageImpl();
serverMsg.setMessageID(500);
@@ -336,7 +341,7 @@
manager.largeMessageDelete(500);
- blockOnReplication(manager);
+ blockOnReplication(storage, manager);
store.start();
@@ -371,13 +376,14 @@
try
{
+ StorageManager storage = getStorage();
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
- Thread.sleep(100);
TestInterceptor.value.set(false);
for (int i = 0; i < 500; i++)
@@ -386,16 +392,19 @@
}
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ storage.afterCompleteOperations(new IOAsyncTask()
{
- public void run()
+
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
latch.countDown();
}
});
- manager.closeContext();
-
server.stop();
assertTrue(latch.await(50, TimeUnit.SECONDS));
@@ -405,26 +414,124 @@
server.stop();
}
}
+
+ public void testExceptionSettingActionBefore() throws Exception
+ {
+ OperationContext ctx = OperationContextImpl.getContext(factory);
+
+ ctx.lineUp();
+
+ String msg = "I'm an exception";
+
+ ctx.onError(5, msg);
+
+ final AtomicInteger lastError = new AtomicInteger(0);
+
+ final List<String> msgsResult = new ArrayList<String>();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ lastError.set(errorCode);
+ msgsResult.add(errorMessage);
+ latch.countDown();
+ }
+
+ public void done()
+ {
+ }
+ });
+
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ assertEquals(5, lastError.get());
+
+ assertEquals(1, msgsResult.size());
+
+ assertEquals(msg, msgsResult.get(0));
+
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ // Adding the Task after the exception should still throw an exception
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ lastError.set(errorCode);
+ msgsResult.add(errorMessage);
+ latch2.countDown();
+ }
+
+ public void done()
+ {
+ }
+ });
+
+ assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+ assertEquals(2, msgsResult.size());
+ assertEquals(msg, msgsResult.get(0));
+
+ assertEquals(msg, msgsResult.get(1));
+
+ // Clearing any exception from the Context, so we can use the context again
+ ctx.complete();
+
+
+ final CountDownLatch latch3 = new CountDownLatch(1);
+
+ ctx.executeOnCompletion(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ latch3.countDown();
+ }
+ });
+
+
+ assertTrue(latch2.await(5, TimeUnit.SECONDS));
+
+
+
+
+ }
+
/**
+ * @return
+ */
+ private JournalStorageManager getStorage()
+ {
+ return new JournalStorageManager(createDefaultConfig(), factory);
+ }
+
+ /**
* @param manager
* @return
*/
- private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
+ private void blockOnReplication(StorageManager storage, ReplicationManagerImpl
manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ storage.afterCompleteOperations(new IOAsyncTask()
{
- public void run()
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
latch.countDown();
}
-
});
- manager.closeContext();
-
assertTrue(latch.await(30, TimeUnit.SECONDS));
}
@@ -435,6 +542,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
fail("Exception expected");
@@ -460,7 +568,9 @@
try
{
+ StorageManager storage = getStorage();
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
@@ -469,32 +579,21 @@
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ storage.afterCompleteOperations(new IOAsyncTask()
{
- public void run()
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
latch.countDown();
}
-
});
- assertEquals(1, manager.getActiveTokens().size());
-
- manager.closeContext();
-
assertTrue(latch.await(1, TimeUnit.SECONDS));
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
assertEquals(0, manager.getActiveTokens().size());
manager.stop();
}
@@ -521,63 +620,52 @@
try
{
+ StorageManager storage = getStorage();
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
int numberOfAdds = 200;
-
+
final CountDownLatch latch = new CountDownLatch(numberOfAdds);
+
+ OperationContext ctx = storage.getContext();
for (int i = 0; i < numberOfAdds; i++)
{
final int nAdd = i;
-
+
if (i % 2 == 0)
{
replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
}
- else
- {
- manager.sync();
- }
-
- manager.afterReplicated(new Runnable()
+ ctx.executeOnCompletion(new IOAsyncTask()
{
- public void run()
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
+ System.out.println("Add " + nAdd);
executions.add(nAdd);
latch.countDown();
}
-
});
+ }
- manager.closeContext();
- }
-
assertTrue(latch.await(10, TimeUnit.SECONDS));
-
for (int i = 0; i < numberOfAdds; i++)
{
assertEquals(i, executions.get(i).intValue());
}
-
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
assertEquals(0, manager.getActiveTokens().size());
manager.stop();
}
@@ -622,9 +710,26 @@
executor = Executors.newCachedThreadPool(tFactory);
scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
+
+ factory = new OrderedExecutorFactory(executor);
+ }
+ protected void tearDown() throws Exception
+ {
+
+ executor.shutdown();
+
+ scheduledExecutor.shutdown();
+
+ tFactory = null;
+
+ scheduledExecutor = null;
+
+ super.tearDown();
+
}
+
private FailoverManagerImpl createFailoverManager()
{
return createFailoverManager(null);
@@ -651,22 +756,6 @@
scheduledExecutor,
interceptors);
}
-
- protected void tearDown() throws Exception
- {
-
- executor.shutdown();
-
- scheduledExecutor.shutdown();
-
- tFactory = null;
-
- scheduledExecutor = null;
-
- super.tearDown();
-
- }
-
protected PagingManager createPageManager(StorageManager storageManager,
Configuration configuration,
ExecutorFactory executorFactory,
@@ -906,5 +995,87 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[],
boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendAddRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion
completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long,
org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync, IOCompletion callback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion callback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[],
boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ byte[] record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.Journal#sync(org.hornetq.core.journal.IOCompletion)
+ */
+ public void sync(IOCompletion callback)
+ {
+ }
+
}
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -12,8 +12,6 @@
*/
package org.hornetq.tests.integration.scheduling;
-import java.util.Calendar;
-
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -583,7 +581,7 @@
session = sessionFactory.createSession(true, false, false);
}
- session.commit(xid, true);
+ session.commit(xid, false);
ClientConsumer consumer = session.createConsumer(atestq);
session.start();
@@ -598,7 +596,7 @@
message2.acknowledge();
session.end(xid2, XAResource.TMSUCCESS);
session.prepare(xid2);
- session.commit(xid2, true);
+ session.commit(xid2, false);
consumer.close();
// Make sure no more messages
consumer = session.createConsumer(atestq);
@@ -683,7 +681,7 @@
{
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
- session.commit(xid, true);
+ session.commit(xid, false);
} else
{
session.commit();
@@ -758,7 +756,7 @@
{
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
- session.commit(xid, true);
+ session.commit(xid, false);
}
session.close();
Modified: trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/tests/src/org/hornetq/tests/integration/server/LVQRecoveryTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -76,7 +76,7 @@
clientSessionXa.close();
restartServer();
- clientSessionXa.commit(xid, true);
+ clientSessionXa.commit(xid, false);
ClientConsumer consumer = clientSession.createConsumer(qName1);
clientSession.start();
ClientMessage m = consumer.receive(1000);
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -295,7 +295,7 @@
assertEqualsByteArrays(xids[0].getBranchQualifier(), xid.getBranchQualifier());
assertEqualsByteArrays(xids[0].getGlobalTransactionId(),
xid.getGlobalTransactionId());
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
clientSession.close();
@@ -426,7 +426,7 @@
assertEquals(xids.length, 0);
if (commit)
{
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
}
else
{
@@ -455,7 +455,7 @@
if (i == 2)
{
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
}
recreateClients();
@@ -504,7 +504,7 @@
xids = clientSession.recover(XAResource.TMENDRSCAN);
assertEquals(xids.length, 0);
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
clientSession.start();
ClientMessage m = clientConsumer.receive(1000);
assertNotNull(m);
@@ -607,7 +607,7 @@
assertEqualsByteArrays(xids[0].getGlobalTransactionId(),
xid.getGlobalTransactionId());
xids = clientSession.recover(XAResource.TMENDRSCAN);
assertEquals(xids.length, 0);
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
clientSession.start();
ClientMessage m = clientConsumer.receive(1000);
assertNotNull(m);
@@ -668,8 +668,8 @@
assertEqualXids(xids, xid, xid2);
xids = clientSession.recover(XAResource.TMENDRSCAN);
assertEquals(xids.length, 0);
- clientSession.commit(xid, true);
- clientSession.commit(xid2, true);
+ clientSession.commit(xid, false);
+ clientSession.commit(xid2, false);
clientSession.start();
ClientMessage m = clientConsumer.receive(1000);
assertNotNull(m);
@@ -795,7 +795,7 @@
xids = clientSession.recover(XAResource.TMENDRSCAN);
assertEquals(xids.length, 0);
clientSession.rollback(xid);
- clientSession.commit(xid2, true);
+ clientSession.commit(xid2, false);
clientSession.start();
ClientMessage m = clientConsumer.receive(1000);
assertNotNull(m);
@@ -858,7 +858,7 @@
assertEqualsByteArrays(xids[0].getGlobalTransactionId(),
xid.getGlobalTransactionId());
xids = clientSession.recover(XAResource.TMENDRSCAN);
assertEquals(xids.length, 0);
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
clientSession.start();
ClientMessage m = clientConsumer.receive(1000);
assertNotNull(m);
@@ -938,7 +938,7 @@
assertEqualsByteArrays(xids[0].getGlobalTransactionId(),
xid.getGlobalTransactionId());
xids = clientSession.recover(XAResource.TMENDRSCAN);
assertEquals(xids.length, 0);
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
clientSession.start();
m = clientConsumer.receiveImmediate();
assertNull(m);
@@ -1100,7 +1100,7 @@
assertEqualXids(xids, xid, xid2);
xids = clientSession.recover(XAResource.TMENDRSCAN);
assertEquals(xids.length, 0);
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
clientSession.start();
m = clientConsumer.receiveImmediate();
assertNull(m);
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2009-11-23 23:28:33
UTC (rev 8388)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2009-11-24 02:53:23
UTC (rev 8389)
@@ -158,7 +158,7 @@
clientSession = sessionFactory.createSession(true, false, false);
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
ClientMessage m = clientConsumer.receive(1000);
@@ -216,7 +216,7 @@
clientSession = sessionFactory.createSession(true, false, false);
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
clientSession.start();
clientConsumer = clientSession.createConsumer(atestq);
m = clientConsumer.receiveImmediate();
@@ -729,7 +729,7 @@
}
else
{
- session.commit(xid, true);
+ session.commit(xid, false);
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -354,7 +354,7 @@
CountDownLatch latch = new CountDownLatch(1);
messagingService.getResourceManager().getTransaction(xid).addOperation(new
RollbackCompleteOperation(latch));
assertFalse(latch.await(2600, TimeUnit.MILLISECONDS));
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
clientSession.setTransactionTimeout(0);
clientConsumer.close();
@@ -434,7 +434,7 @@
messagingService.getResourceManager().getTransaction(xid).addOperation(new
RollbackCompleteOperation(latch));
assertFalse(latch.await(2600, TimeUnit.MILLISECONDS));
clientSession.prepare(xid);
- clientSession.commit(xid, true);
+ clientSession.commit(xid, false);
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
ClientConsumer consumer = clientSession2.createConsumer(atestq);
clientSession2.start();
@@ -538,15 +538,15 @@
{
}
- public void afterPrepare(Transaction tx) throws Exception
+ public void afterPrepare(Transaction tx)
{
}
- public void afterCommit(Transaction tx) throws Exception
+ public void afterCommit(Transaction tx)
{
}
- public void afterRollback(Transaction tx) throws Exception
+ public void afterRollback(Transaction tx)
{
latch.countDown();
}
Deleted:
trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -1,280 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.performance.persistence;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.config.impl.FileConfiguration;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
-import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.SimpleString;
-
-/**
- *
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
- *
- */
-public class StorageManagerTimingTest extends UnitTestCase
-{
- private static final Logger log = Logger.getLogger(StorageManagerTimingTest.class);
-
- protected void tearDown() throws Exception
- {
- assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
-
- super.tearDown();
- }
-
- public void testAIO() throws Exception
- {
- // just to do some initial loading.. ignore this rate
- internalTestStorage(JournalType.ASYNCIO, 1000, 1, 1);
-
- double rate = internalTestStorage(JournalType.ASYNCIO, 60000, 1, 1)[0];
- printRates("Rate of AIO, 60000 inserts / commits on every insert",
rate);
-
- rate = internalTestStorage(JournalType.ASYNCIO, 30000, -1, 1)[0];
- printRates("Rate of AIO, 30000 inserts / single commit at the end",
rate);
-
- rate = internalTestStorage(JournalType.ASYNCIO, 30000, 5, 1)[0];
- printRates("Rate of AIO, 30000 inserts / commit every 5 recodds", rate);
-
- rate = internalTestStorage(JournalType.ASYNCIO, 30000, -1, 1)[0];
- printRates("Rate of AIO, 30000 inserts / single commit at the end
(again)", rate);
-
- }
-
- public void testAIOMultiThread() throws Exception
- {
- double[] rates = internalTestStorage(JournalType.ASYNCIO, 10000, -1, 1);
- rates = internalTestStorage(JournalType.ASYNCIO, 30000, -1, 5);
-
- printRates("Rate of AIO, 30000 inserts / single commit at the end",
rates);
-
- rates = internalTestStorage(JournalType.ASYNCIO, 5000, 1, 5);
-
- printRates("Rate of AIO, 30000 inserts / commit on every insert",
rates);
- }
-
- public void testNIO() throws Exception
- {
- // just to do some initial loading.. ignore this rate
- internalTestStorage(JournalType.NIO, 1000, 1, 1);
- double rate = internalTestStorage(JournalType.NIO, 1000, 1, 1)[0];
- printRates("Rate of NIO, 1000 inserts, 1000 commits", rate);
-
- rate = internalTestStorage(JournalType.NIO, 30000, -1, 1)[0];
- printRates("Rate of NIO, 30000 inserts / single commit at the end",
rate);
-
- rate = internalTestStorage(JournalType.NIO, 30000, 5, 1)[0];
- printRates("Rate of NIO, 30000 inserts / commit every 5 records", rate);
- }
-
- public void testNIOMultiThread() throws Exception
- {
-
- double[] rates = internalTestStorage(JournalType.NIO, 5000, -1, 5);
-
- printRates("Rate of NIO, 5000 inserts / single commit at the end",
rates);
-
- rates = internalTestStorage(JournalType.NIO, 5000, 1, 5);
-
- printRates("Rate of NIO, 5000 inserts / commit on every insert", rates);
-
- }
-
- public double[] internalTestStorage(final JournalType journalType,
- final long numberOfMessages,
- final int transInterval,
- final int numberOfThreads) throws Exception
- {
- FileConfiguration configuration = new FileConfiguration();
-
- configuration.start();
-
- deleteDirectory(new File(configuration.getBindingsDirectory()));
- deleteDirectory(new File(configuration.getJournalDirectory()));
-
- configuration.setJournalType(journalType);
-
- PostOffice postOffice = new FakePostOffice();
-
- final JournalStorageManager journal = new JournalStorageManager(configuration,
-
Executors.newCachedThreadPool());
- journal.start();
-
- HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
-
- journal.loadMessageJournal(postOffice, null, null, queues, null);
-
- final byte[] bytes = new byte[900];
-
- for (int i = 0; i < bytes.length; i++)
- {
- bytes[i] = (byte)('a' + (i % 20));
- }
-
- final AtomicLong transactionGenerator = new AtomicLong(1);
-
- class LocalThread extends Thread
- {
- int id;
-
- int commits = 1;
-
- Exception e;
-
- long totalTime = 0;
-
- public LocalThread(int id)
- {
- super("LocalThread:" + id);
- this.id = id;
- }
-
- public void run()
- {
- try
- {
- long start = System.currentTimeMillis();
-
- long trans = transactionGenerator.incrementAndGet();
- boolean commitPending = false;
- for (long i = 1; i <= numberOfMessages; i++)
- {
-
- final SimpleString address = new SimpleString("Destination "
+ i);
-
- ServerMessageImpl implMsg = new ServerMessageImpl(/* type */(byte)1, /*
durable */
- true, /* expiration */
- 0,
- /* timestamp */0, /* priority */
- (byte)0, ChannelBuffers.wrappedBuffer(new byte[1024]));
-
- implMsg.putStringProperty(new SimpleString("Key"), new
SimpleString("This String is worthless!"));
-
- implMsg.setMessageID(i);
- implMsg.setBody(ChannelBuffers.wrappedBuffer(bytes));
-
- implMsg.setDestination(address);
-
- journal.storeMessageTransactional(trans, implMsg);
-
- commitPending = true;
-
- if (transInterval > 0 && i % transInterval == 0)
- {
- journal.commit(trans);
- commits++;
- trans = transactionGenerator.incrementAndGet();
- commitPending = false;
- }
- }
-
- if (commitPending)
- journal.commit(trans);
-
- long end = System.currentTimeMillis();
-
- totalTime = end - start;
- }
- catch (Exception e)
- {
- log.error(e.getMessage(), e);
- this.e = e;
- }
- }
- }
-
- try
- {
- LocalThread[] threads = new LocalThread[numberOfThreads];
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- threads[i] = new LocalThread(i);
- }
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- threads[i].start();
- }
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- threads[i].join();
- }
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- if (threads[i].e != null)
- {
- throw threads[i].e;
- }
- }
-
- double rates[] = new double[numberOfThreads];
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- rates[i] = (numberOfMessages + threads[i].commits) * 1000 /
threads[i].totalTime;
- }
-
- return rates;
- }
- finally
- {
- journal.stop();
- }
-
- }
-
- private void printRates(String msg, double rate)
- {
- printRates(msg, new double[] { rate });
- }
-
- private void printRates(String msg, double[] rates)
- {
- double rate = 0;
-
-
log.info("*************************************************************************");
- log.info(" " + msg + " ");
-
- double totalRate = 0;
- for (int i = 0; i < rates.length; i++)
- {
- rate = rates[i];
- totalRate += rate;
- if (rates.length > 1)
- {
- log.info(" Thread " + i + ": = " + rate + "
inserts/sec (including commits)");
- }
- }
-
- log.info(" Total rate : = " + totalRate + " inserts/sec
(including commits)");
-
log.info("*************************************************************************");
- }
-}
Modified: trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -41,6 +41,8 @@
private static final Logger log = Logger.getLogger(QueueImplTest.class);
private ScheduledExecutorService scheduledExecutor;
+
+ //private ExecutorService executor;
public void setUp() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -15,6 +15,8 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -84,20 +86,36 @@
protected static class CountDownCallback implements AIOCallback
{
private final CountDownLatch latch;
+
+ private final List<Integer> outputList;
+
+ private final int order;
+
+ private final AtomicInteger errors;
- public CountDownCallback(final CountDownLatch latch)
+ public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors,
final List<Integer> outputList, final int order)
{
this.latch = latch;
+
+ this.outputList = outputList;
+
+ this.order = order;
+
+ this.errors = errors;
}
volatile boolean doneCalled = false;
- volatile boolean errorCalled = false;
+ volatile int errorCalled = 0;
final AtomicInteger timesDoneCalled = new AtomicInteger(0);
public void done()
{
+ if (outputList != null)
+ {
+ outputList.add(order);
+ }
doneCalled = true;
timesDoneCalled.incrementAndGet();
if (latch != null)
@@ -108,7 +126,15 @@
public void onError(final int errorCode, final String errorMessage)
{
- errorCalled = true;
+ errorCalled++;
+ if (outputList != null)
+ {
+ outputList.add(order);
+ }
+ if (errors != null)
+ {
+ errors.incrementAndGet();
+ }
if (latch != null)
{
// even thought an error happened, we need to inform the latch,
@@ -116,6 +142,16 @@
latch.countDown();
}
}
+
+ public static void checkResults(final int numberOfElements, final
ArrayList<Integer> result)
+ {
+ assertEquals(numberOfElements, result.size());
+ int i = 0;
+ for (Integer resultI : result)
+ {
+ assertEquals(i++, resultI.intValue());
+ }
+ }
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -136,6 +136,11 @@
int numberOfLines = 1000;
int size = 1024;
+
+ ArrayList<Integer> listResult1 = new ArrayList<Integer>();
+ ArrayList<Integer> listResult2 = new ArrayList<Integer>();
+
+ AtomicInteger errors = new AtomicInteger(0);
ByteBuffer buffer = null;
try
@@ -154,41 +159,43 @@
for (int i = 0; i < numberOfLines; i++)
{
- list.add(new CountDownCallback(latchDone));
- list2.add(new CountDownCallback(latchDone2));
+ list.add(new CountDownCallback(latchDone, errors, listResult1, i));
+ list2.add(new CountDownCallback(latchDone2, errors, listResult2, i));
}
- long valueInitial = System.currentTimeMillis();
-
int counter = 0;
+
Iterator<CountDownCallback> iter2 = list2.iterator();
- for (CountDownCallback tmp : list)
+ for (CountDownCallback cb1 : list)
{
- CountDownCallback tmp2 = iter2.next();
+ CountDownCallback cb2 = iter2.next();
- controller.write(counter * size, size, buffer, tmp);
- controller.write(counter * size, size, buffer, tmp2);
+ controller.write(counter * size, size, buffer, cb1);
+ controller2.write(counter * size, size, buffer, cb2);
++counter;
}
latchDone.await();
latchDone2.await();
+
+ CountDownCallback.checkResults(numberOfLines, listResult1);
+ CountDownCallback.checkResults(numberOfLines, listResult2);
for (CountDownCallback callback : list)
{
assertEquals(1, callback.timesDoneCalled.get());
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
}
for (CountDownCallback callback : list2)
{
assertEquals(1, callback.timesDoneCalled.get());
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
}
+
+ assertEquals(0, errors.get());
controller.close();
}
@@ -358,7 +365,7 @@
controller.setBufferCallback(bufferCallback);
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
+ ArrayList<Integer> result = new ArrayList<Integer>();
for (int i = 0; i < NUMBER_LINES; i++)
{
ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -367,6 +374,7 @@
{
buffer.put((byte)(j % Byte.MAX_VALUE));
}
+ CountDownCallback aio = new CountDownCallback(latch, null, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -379,7 +387,7 @@
controller.close();
closed = true;
- assertEquals(NUMBER_LINES, buffers.size());
+ CountDownCallback.checkResults(NUMBER_LINES, result);
// Make sure all the buffers are unique
ByteBuffer lineOne = null;
@@ -439,7 +447,6 @@
controller.setBufferCallback(bufferCallback);
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
buffer = AsynchronousFileImpl.newBuffer(SIZE);
buffer.rewind();
@@ -448,8 +455,11 @@
buffer.put((byte)(j % Byte.MAX_VALUE));
}
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
for (int i = 0; i < NUMBER_LINES; i++)
{
+ CountDownCallback aio = new CountDownCallback(latch, null, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -462,6 +472,8 @@
controller.close();
closed = true;
+ CountDownCallback.checkResults(NUMBER_LINES, result);
+
assertEquals(NUMBER_LINES, buffers.size());
// Make sure all the buffers are unique
@@ -517,7 +529,9 @@
{
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
+ AtomicInteger errors = new AtomicInteger(0);
for (int i = 0; i < NUMBER_LINES; i++)
{
@@ -531,12 +545,15 @@
buffer.put(getSamplebyte(j));
}
+ CountDownCallback aio = new CountDownCallback(latch, errors, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
latch.await();
- assertFalse(aio.errorCalled);
- assertEquals(NUMBER_LINES, aio.timesDoneCalled.get());
+
+ assertEquals(0, errors.get());
+
+ CountDownCallback.checkResults(NUMBER_LINES, result);
}
// If you call close you're supposed to wait events to finish before
@@ -557,12 +574,13 @@
AsynchronousFileImpl.clearBuffer(readBuffer);
CountDownLatch latch = new CountDownLatch(1);
- CountDownCallback aio = new CountDownCallback(latch);
+ AtomicInteger errors = new AtomicInteger(0);
+ CountDownCallback aio = new CountDownCallback(latch, errors, null, 0);
controller.read(i * SIZE, SIZE, readBuffer, aio);
latch.await();
- assertFalse(aio.errorCalled);
+ assertEquals(0, errors.get());
assertTrue(aio.doneCalled);
byte bytesRead[] = new byte[SIZE];
@@ -634,7 +652,7 @@
}
buffer.put((byte)'\n');
- CountDownCallback aio = new CountDownCallback(readLatch);
+ CountDownCallback aio = new CountDownCallback(readLatch, null, null, 0);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -663,10 +681,10 @@
newBuffer.put((byte)'\n');
CountDownLatch latch = new CountDownLatch(1);
- CountDownCallback aio = new CountDownCallback(latch);
+ CountDownCallback aio = new CountDownCallback(latch, null, null, 0);
controller.read(i * SIZE, SIZE, buffer, aio);
latch.await();
- assertFalse(aio.errorCalled);
+ assertEquals(0, aio.errorCalled);
assertTrue(aio.doneCalled);
byte bytesRead[] = new byte[SIZE];
@@ -720,9 +738,11 @@
ArrayList<CountDownCallback> list = new
ArrayList<CountDownCallback>();
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
for (int i = 0; i < numberOfLines; i++)
{
- list.add(new CountDownCallback(latchDone));
+ list.add(new CountDownCallback(latchDone, null, result, i));
}
long valueInitial = System.currentTimeMillis();
@@ -743,6 +763,9 @@
latchDone.await();
long timeTotal = System.currentTimeMillis() - valueInitial;
+
+ CountDownCallback.checkResults(numberOfLines, result);
+
debug("After completions time = " + timeTotal +
" for " +
numberOfLines +
@@ -759,7 +782,7 @@
{
assertEquals(1, tmp.timesDoneCalled.get());
assertTrue(tmp.doneCalled);
- assertFalse(tmp.errorCalled);
+ assertEquals(0, tmp.errorCalled);
}
controller.close();
@@ -799,11 +822,11 @@
for (int i = 0; i < NUMBER_LINES; i++)
{
CountDownLatch latchDone = new CountDownLatch(1);
- CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null,
0);
controller.write(i * 512, 512, buffer, aioBlock);
latchDone.await();
assertTrue(aioBlock.doneCalled);
- assertFalse(aioBlock.errorCalled);
+ assertEquals(0, aioBlock.errorCalled);
}
long timeTotal = System.currentTimeMillis() - startTime;
@@ -850,12 +873,12 @@
CountDownLatch latchDone = new CountDownLatch(1);
- CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null, 0);
controller.write(11, 512, buffer, aioBlock);
latchDone.await();
- assertTrue(aioBlock.errorCalled);
+ assertTrue(aioBlock.errorCalled != 0);
assertFalse(aioBlock.doneCalled);
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -40,7 +40,7 @@
* */
public class MultiThreadAsynchronousFileTest extends AIOTestBase
{
-
+
public static TestSuite suite()
{
return createAIOTestSuite(MultiThreadAsynchronousFileTest.class);
@@ -56,37 +56,33 @@
static final int NUMBER_OF_LINES = 1000;
- // Executor exec
-
ExecutorService executor;
-
+
ExecutorService pollerExecutor;
-
private static void debug(final String msg)
{
log.debug(msg);
}
-
-
protected void setUp() throws Exception
{
super.setUp();
- pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
false));
+ pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
+ false));
executor = Executors.newSingleThreadExecutor();
}
-
+
protected void tearDown() throws Exception
{
executor.shutdown();
pollerExecutor.shutdown();
super.tearDown();
}
-
+
public void testMultipleASynchronousWrites() throws Throwable
{
- executeTest(false);
+ executeTest(false);
}
public void testMultipleSynchronousWrites() throws Throwable
@@ -132,15 +128,15 @@
long endTime = System.currentTimeMillis();
debug((sync ? "Sync result:" : "Async result:") + "
Records/Second = " +
- NUMBER_OF_THREADS *
- NUMBER_OF_LINES *
- 1000 /
- (endTime - startTime) +
- " total time = " +
- (endTime - startTime) +
- " total number of records = " +
- NUMBER_OF_THREADS *
- NUMBER_OF_LINES);
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES *
+ 1000 /
+ (endTime - startTime) +
+ " total time = " +
+ (endTime - startTime) +
+ " total number of records = " +
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES);
}
finally
{
@@ -180,9 +176,8 @@
{
super.run();
-
ByteBuffer buffer = null;
-
+
synchronized (MultiThreadAsynchronousFileTest.class)
{
buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -222,7 +217,7 @@
{
latchFinishThread = new CountDownLatch(1);
}
- CountDownCallback callback = new CountDownCallback(latchFinishThread);
+ CountDownCallback callback = new CountDownCallback(latchFinishThread,
null, null, 0);
if (!sync)
{
list.add(callback);
@@ -232,7 +227,7 @@
{
latchFinishThread.await();
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
}
if (!sync)
@@ -243,13 +238,13 @@
for (CountDownCallback callback : list)
{
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
for (CountDownCallback callback : list)
{
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -13,7 +13,6 @@
package org.hornetq.tests.unit.core.journal.impl;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -57,6 +56,16 @@
public void testAsynchronousCommit() throws Exception
{
+ doAsynchronousTest(true);
+ }
+
+ public void testAsynchronousRollback() throws Exception
+ {
+ doAsynchronousTest(false);
+ }
+
+ public void doAsynchronousTest(final boolean isCommit) throws Exception
+ {
final int JOURNAL_SIZE = 20000;
setupJournal(JOURNAL_SIZE, 100, 5);
@@ -81,7 +90,14 @@
latch.countDown();
factory.setHoldCallbacks(false, null);
- journalImpl.appendCommitRecord(1l, true);
+ if (isCommit)
+ {
+ journalImpl.appendCommitRecord(1l, true);
+ }
+ else
+ {
+ journalImpl.appendRollbackRecord(1l, true);
+ }
}
catch (Exception e)
{
@@ -97,6 +113,8 @@
assertTrue(latch.await(5, TimeUnit.SECONDS));
Thread.yield();
+
+ Thread.sleep(100);
assertTrue(t.isAlive());
@@ -109,145 +127,7 @@
throw t.e;
}
}
-
- public void testAsynchronousRollbackWithError() throws Exception
- {
- final int JOURNAL_SIZE = 20000;
-
- setupJournal(JOURNAL_SIZE, 100, 5);
-
- final CountDownLatch latch = new CountDownLatch(11);
-
- factory.setHoldCallbacks(true, new
FakeSequentialFileFactory.ListenerHoldCallback()
- {
-
- public void callbackAdded(final ByteBuffer bytes)
- {
- latch.countDown();
- }
- });
-
- class LocalThread extends Thread
- {
- Exception e;
-
- @Override
- public void run()
- {
- try
- {
- for (int i = 0; i < 10; i++)
- {
- journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new
SimpleEncoding(1, (byte)0));
- }
-
- journalImpl.appendRollbackRecord(1l, true);
- }
- catch (Exception e)
- {
- this.e = e;
- }
- }
- };
-
- LocalThread t = new LocalThread();
- t.start();
-
- latch.await();
-
- Thread.yield();
-
- assertTrue(t.isAlive());
-
- factory.setCallbackAsError(0);
-
- factory.flushCallback(0);
-
- Thread.yield();
-
- assertTrue(t.isAlive());
-
- factory.flushAllCallbacks();
-
- t.join();
-
- assertNotNull(t.e);
- }
-
- public void testAsynchronousCommitWithError() throws Exception
- {
- final int JOURNAL_SIZE = 20000;
-
- setupJournal(JOURNAL_SIZE, 100, 5);
-
- final CountDownLatch latch = new CountDownLatch(11);
-
- factory.setHoldCallbacks(true, new
FakeSequentialFileFactory.ListenerHoldCallback()
- {
-
- public void callbackAdded(final ByteBuffer bytes)
- {
- latch.countDown();
- }
- });
-
- class LocalThread extends Thread
- {
- Exception e;
-
- @Override
- public void run()
- {
- try
- {
- for (int i = 0; i < 10; i++)
- {
- journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new
SimpleEncoding(1, (byte)0));
- }
-
- journalImpl.appendCommitRecord(1l, true);
- }
- catch (Exception e)
- {
- this.e = e;
- }
- }
- };
-
- LocalThread t = new LocalThread();
- t.start();
-
- latch.await();
-
- Thread.yield();
-
- assertTrue(t.isAlive());
-
- factory.setCallbackAsError(0);
-
- factory.flushCallback(0);
-
- Thread.yield();
-
- assertTrue(t.isAlive());
-
- factory.flushAllCallbacks();
-
- t.join();
-
- assertNotNull(t.e);
-
- try
- {
- journalImpl.appendRollbackRecord(1l, false);
- fail("Supposed to throw an exception");
- }
- catch (Exception e)
- {
-
- }
- }
-
+
// If a callback error already arrived, we should just throw the exception
// right away
public void testPreviousError() throws Exception
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -19,9 +19,11 @@
import java.util.UUID;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -213,30 +215,30 @@
String s1 = "aardvark";
byte[] bytes1 = s1.getBytes("UTF-8");
- ByteBuffer bb1 = factory.wrapBuffer(bytes1);
+ HornetQBuffer bb1 = wrapBuffer(bytes1);
String s2 = "hippopotamus";
byte[] bytes2 = s2.getBytes("UTF-8");
- ByteBuffer bb2 = factory.wrapBuffer(bytes2);
+ HornetQBuffer bb2 = wrapBuffer(bytes2);
String s3 = "echidna";
byte[] bytes3 = s3.getBytes("UTF-8");
- ByteBuffer bb3 = factory.wrapBuffer(bytes3);
+ HornetQBuffer bb3 = wrapBuffer(bytes3);
long initialPos = sf.position();
- sf.writeDirect(bb1, true);
+ sf.write(bb1, true);
long bytesWritten = sf.position() - initialPos;
assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
initialPos = sf.position();
- sf.writeDirect(bb2, true);
+ sf.write(bb2, true);
bytesWritten = sf.position() - initialPos;
assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten);
initialPos = sf.position();
- sf.writeDirect(bb3, true);
+ sf.write(bb3, true);
bytesWritten = sf.position() - initialPos;
assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten);
@@ -286,33 +288,30 @@
String s1 = "orange";
byte[] bytes1 = s1.getBytes("UTF-8");
- ByteBuffer bb1 = factory.wrapBuffer(bytes1);
byte[] bytes2 = s1.getBytes("UTF-8");
- ByteBuffer bb2 = factory.wrapBuffer(bytes2);
String s3 = "lemon";
byte[] bytes3 = s3.getBytes("UTF-8");
- ByteBuffer bb3 = factory.wrapBuffer(bytes3);
-
+
long initialPos = sf.position();
- sf.writeDirect(bb1, true);
+ sf.write(wrapBuffer(bytes1), true);
long bytesWritten = sf.position() - initialPos;
- assertEquals(bb1.limit(), bytesWritten);
+ assertEquals(calculateRecordSize(bytes1.length,sf.getAlignment()),
bytesWritten);
initialPos = sf.position();
- sf.writeDirect(bb2, true);
+ sf.write(wrapBuffer(bytes2), true);
bytesWritten = sf.position() - initialPos;
- assertEquals(bb2.limit(), bytesWritten);
+ assertEquals(calculateRecordSize(bytes2.length,sf.getAlignment()),
bytesWritten);
initialPos = sf.position();
- sf.writeDirect(bb3, true);
+ sf.write(wrapBuffer(bytes3), true);
bytesWritten = sf.position() - initialPos;
- assertEquals(bb3.limit(), bytesWritten);
+ assertEquals(calculateRecordSize(bytes3.length,sf.getAlignment()),
bytesWritten);
byte[] rbytes1 = new byte[bytes1.length];
@@ -324,7 +323,7 @@
ByteBuffer rb2 = factory.newBuffer(rbytes2.length);
ByteBuffer rb3 = factory.newBuffer(rbytes3.length);
- sf.position(bb1.limit() + bb2.limit());
+ sf.position(calculateRecordSize(bytes1.length, sf.getAlignment()) +
calculateRecordSize(bytes2.length, sf.getAlignment()));
int bytesRead = sf.read(rb3);
assertEquals(rb3.limit(), bytesRead);
@@ -370,39 +369,45 @@
String s1 = "cheesecake";
byte[] bytes1 = s1.getBytes("UTF-8");
- ByteBuffer bb1 = factory.wrapBuffer(bytes1);
long initialPos = sf.position();
- sf.writeDirect(bb1, true);
+ sf.write(wrapBuffer(bytes1), true);
long bytesWritten = sf.position() - initialPos;
- assertEquals(bb1.limit(), bytesWritten);
+ assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
sf.close();
try
{
- bb1 = factory.wrapBuffer(bytes1);
-
- sf.writeDirect(bb1, true);
+ sf.write(wrapBuffer(bytes1), true);
fail("Should throw exception");
}
catch (Exception e)
{
- // OK
}
sf.open();
- sf.writeDirect(bb1, true);
+ sf.write(wrapBuffer(bytes1), true);
sf.close();
}
// Private ---------------------------------
+
+ private HornetQBuffer wrapBuffer(ByteBuffer buffer)
+ {
+ return ChannelBuffers.wrappedBuffer(buffer);
+ }
+ private HornetQBuffer wrapBuffer(byte[] bytes)
+ {
+ return ChannelBuffers.wrappedBuffer(bytes);
+ }
+
protected void checkFill(final SequentialFile file, final int pos, final int size,
final byte fillChar) throws Exception
{
file.fill(pos, size, fillChar);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -18,7 +18,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.impl.TimedBuffer;
import org.hornetq.core.journal.impl.TimedBufferObserver;
import org.hornetq.tests.util.UnitTestCase;
@@ -42,7 +42,7 @@
// Public --------------------------------------------------------
- IOCompletion dummyCallback = new IOCompletion()
+ IOAsyncTask dummyCallback = new IOAsyncTask()
{
public void done()
@@ -64,7 +64,7 @@
final AtomicInteger flushTimes = new AtomicInteger(0);
class TestObserver implements TimedBufferObserver
{
- public void flushBuffer(final ByteBuffer buffer, final boolean sync, final
List<IOCompletion> callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final boolean sync, final
List<IOAsyncTask> callbacks)
{
buffers.add(buffer);
flushTimes.incrementAndGet();
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -20,7 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.core.asyncio.BufferCallback;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.TimedBuffer;
@@ -241,11 +241,11 @@
final ByteBuffer bytes;
- final IOCompletion callback;
+ final IOAsyncTask callback;
volatile boolean sendError;
- CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final
IOCompletion callback)
+ CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final
IOAsyncTask callback)
{
this.file = file;
this.bytes = bytes;
@@ -399,7 +399,7 @@
return read(bytes, null);
}
- public int read(final ByteBuffer bytes, final IOCompletion callback) throws
Exception
+ public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws
Exception
{
if (!open)
{
@@ -439,7 +439,7 @@
return data.position();
}
- public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync,
final IOCompletion callback)
+ public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync,
final IOAsyncTask callback)
{
if (!open)
{
@@ -605,7 +605,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.remoting.spi.HornetQBuffer,
boolean, org.hornetq.core.journal.IOCallback)
*/
- public void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws
Exception
+ public void write(HornetQBuffer bytes, boolean sync, IOAsyncTask callback) throws
Exception
{
writeDirect(ByteBuffer.wrap(bytes.array()), sync, callback);
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -29,6 +29,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -43,6 +44,7 @@
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -1155,7 +1157,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
*/
- public void afterReplicated(Runnable run)
+ public void afterCompleteOperations(Runnable run)
{
}
@@ -1163,7 +1165,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
}
@@ -1221,7 +1223,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
*/
- public void waitOnReplication(long timeout) throws Exception
+ public void waitOnOperations(long timeout) throws Exception
{
}
@@ -1232,6 +1234,50 @@
{
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
+ */
+ public void afterCompleteOperations(IOAsyncTask run)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#waitOnOperations()
+ */
+ public void waitOnOperations() throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#getContext()
+ */
+ public OperationContext getContext()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#newContext(java.util.concurrent.Executor)
+ */
+ public OperationContext newContext(Executor executor)
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#clearContext()
+ */
+ public void clearContext()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#setContext(org.hornetq.core.persistence.OperationContext)
+ */
+ public void setContext(OperationContext context)
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -1131,7 +1131,7 @@
return null;
}
- public int decrementRefCount(MessageReference reference) throws Exception
+ public int decrementRefCount(MessageReference reference)
{
// TODO Auto-generated method stub
return 0;
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -17,6 +17,8 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -37,6 +39,8 @@
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -57,11 +61,23 @@
// Constructors --------------------------------------------------
+ ExecutorService executor;
+
+ ExecutorFactory factory;
+
@Override
protected void tearDown() throws Exception
{
super.tearDown();
+ executor.shutdown();
}
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ executor = Executors.newSingleThreadExecutor();
+ factory = new OrderedExecutorFactory(executor);
+ }
// Public --------------------------------------------------------
@@ -86,7 +102,7 @@
ScheduledExecutorService scheduledThreadPool =
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
- journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
+ journal = new JournalStorageManager(configuration, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
@@ -110,7 +126,7 @@
journal.stop();
- journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
+ journal = new JournalStorageManager(configuration, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
@@ -138,7 +154,7 @@
mapDups.clear();
- journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
+ journal = new JournalStorageManager(configuration, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -493,4 +493,13 @@
return false;
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference,
java.util.concurrent.Executor)
+ */
+ public boolean checkDLQ(MessageReference ref, Executor ioExecutor) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-23
23:28:33 UTC (rev 8388)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -43,7 +43,19 @@
{
// The tests ----------------------------------------------------------------
- private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
+ private ScheduledExecutorService scheduledExecutor;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ scheduledExecutor.shutdown();
+ super.tearDown();
+ }
private static final SimpleString queue1 = new SimpleString("queue1");
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2009-11-23
23:28:33 UTC (rev 8388)
+++
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2009-11-24
02:53:23 UTC (rev 8389)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.server.impl.fakes;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -32,8 +33,8 @@
*/
public class FakeQueueFactory implements QueueFactory
{
- private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
-
+ private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
+
private PostOffice postOffice;
public Queue createQueue(long persistenceID, final SimpleString address, SimpleString
name, Filter filter,
@@ -47,5 +48,10 @@
this.postOffice = postOffice;
}
+
+ public void stop() throws Exception
+ {
+ scheduledExecutor.shutdown();
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-11-23 23:28:33 UTC (rev
8388)
+++ trunk/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-11-24 02:53:23 UTC (rev
8389)
@@ -50,6 +50,8 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -647,6 +649,8 @@
@Override
protected void tearDown() throws Exception
{
+ OperationContextImpl.clearContext();
+
deleteDirectory(new File(getTestDir()));
assertEquals(0, InVMRegistry.instance.size());