Author: clebert.suconic(a)jboss.com
Date: 2009-11-21 11:48:36 -0500 (Sat, 21 Nov 2009)
New Revision: 8361
Added:
branches/ClebertCallback/src/main/org/hornetq/core/journal/IOAsyncTask.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/SimpleClientTest.java
Modified:
branches/ClebertCallback/native/bin/libHornetQAIO64.so
branches/ClebertCallback/native/src/JNICallbackAdapter.cpp
branches/ClebertCallback/native/src/JNICallbackAdapter.h
branches/ClebertCallback/native/src/JNI_AsynchronousFileImpl.cpp
branches/ClebertCallback/native/src/Version.h
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/IOCompletion.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/DummyCallback.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/paging/PagingStore.java
branches/ClebertCallback/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationContext.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java
branches/ClebertCallback/src/main/org/hornetq/core/server/ServerMessage.java
branches/ClebertCallback/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
branches/ClebertCallback/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/transaction/TransactionOperation.java
branches/ClebertCallback/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/ClebertCallback/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
Log:
Updating my branch with trunk
Modified: branches/ClebertCallback/native/bin/libHornetQAIO64.so
===================================================================
(Binary files differ)
Modified: branches/ClebertCallback/native/src/JNICallbackAdapter.cpp
===================================================================
--- branches/ClebertCallback/native/src/JNICallbackAdapter.cpp 2009-11-21 15:48:01 UTC
(rev 8360)
+++ branches/ClebertCallback/native/src/JNICallbackAdapter.cpp 2009-11-21 16:48:36 UTC
(rev 8361)
@@ -18,13 +18,20 @@
jobject nullObj = NULL;
-JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jobject _callback,
jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
+JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jlong _sequence,
jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) :
CallbackAdapter()
{
controller = _controller;
+
+ sequence = _sequence;
+
callback = _callback;
+
fileController = _fileController;
+
bufferReference = _bufferReference;
+
isRead = _isRead;
+
}
JNICallbackAdapter::~JNICallbackAdapter()
@@ -33,15 +40,19 @@
void JNICallbackAdapter::done(THREAD_CONTEXT threadContext)
{
- JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback,
isRead ? nullObj : bufferReference);
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback,
sequence, isRead ? nullObj : bufferReference);
+
release(threadContext);
}
void JNICallbackAdapter::onError(THREAD_CONTEXT threadContext, long errorCode,
std::string error)
{
controller->log(threadContext, 0, "Libaio event generated errors, callback
object was informed about it");
+
jstring strError = JNI_ENV(threadContext)->NewStringUTF(error.data());
- JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error,
callback, isRead ? nullObj : bufferReference, (jint)errorCode, strError);
+
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error,
callback, sequence, isRead ? nullObj : bufferReference, (jint)errorCode, strError);
+
release(threadContext);
}
Modified: branches/ClebertCallback/native/src/JNICallbackAdapter.h
===================================================================
--- branches/ClebertCallback/native/src/JNICallbackAdapter.h 2009-11-21 15:48:01 UTC (rev
8360)
+++ branches/ClebertCallback/native/src/JNICallbackAdapter.h 2009-11-21 16:48:36 UTC (rev
8361)
@@ -24,10 +24,17 @@
class JNICallbackAdapter : public CallbackAdapter
{
private:
+
AIOController * controller;
+
jobject callback;
+
jobject fileController;
+
jobject bufferReference;
+
+ jlong sequence;
+
// Is this a read operation
short isRead;
@@ -43,7 +50,7 @@
public:
// _ob must be a global Reference (use createGloblReferente before calling the
constructor)
- JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject
_fileController, jobject _bufferReference, short _isRead);
+ JNICallbackAdapter(AIOController * _controller, jlong sequence, jobject _callback,
jobject _fileController, jobject _bufferReference, short _isRead);
virtual ~JNICallbackAdapter();
void done(THREAD_CONTEXT threadContext);
Modified: branches/ClebertCallback/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- branches/ClebertCallback/native/src/JNI_AsynchronousFileImpl.cpp 2009-11-21 15:48:01
UTC (rev 8360)
+++ branches/ClebertCallback/native/src/JNI_AsynchronousFileImpl.cpp 2009-11-21 16:48:36
UTC (rev 8361)
@@ -48,10 +48,10 @@
std::string fileName = convertJavaString(env, jstrFileName);
controller = new AIOController(fileName, (int) maxIO);
- controller->done =
env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;)V");
+ controller->done =
env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;)V");
if (!controller->done) return 0;
- controller->error = env->GetMethodID(clazz, "callbackError",
"(Lorg/hornetq/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;ILjava/lang/String;)V");
+ controller->error = env->GetMethodID(clazz, "callbackError",
"(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;ILjava/lang/String;)V");
if (!controller->error) return 0;
jclass loggerClass = env->GetObjectClass(logger);
@@ -97,7 +97,7 @@
return;
}
- CallbackAdapter * adapter = new JNICallbackAdapter(controller,
env->NewGlobalRef(callback), env->NewGlobalRef(objThis),
env->NewGlobalRef(jbuffer), true);
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, -1,
env->NewGlobalRef(callback), env->NewGlobalRef(objThis),
env->NewGlobalRef(jbuffer), true);
controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
}
@@ -166,7 +166,7 @@
}
JNIEXPORT void JNICALL Java_org_hornetq_core_asyncio_impl_AsynchronousFileImpl_write
- (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size,
jobject jbuffer, jobject callback)
+ (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong sequence, jlong position,
jlong size, jobject jbuffer, jobject callback)
{
try
{
@@ -180,7 +180,7 @@
}
- CallbackAdapter * adapter = new JNICallbackAdapter(controller,
env->NewGlobalRef(callback), env->NewGlobalRef(objThis),
env->NewGlobalRef(jbuffer), false);
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, sequence,
env->NewGlobalRef(callback), env->NewGlobalRef(objThis),
env->NewGlobalRef(jbuffer), false);
controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
}
Modified: branches/ClebertCallback/native/src/Version.h
===================================================================
--- branches/ClebertCallback/native/src/Version.h 2009-11-21 15:48:01 UTC (rev 8360)
+++ branches/ClebertCallback/native/src/Version.h 2009-11-21 16:48:36 UTC (rev 8361)
@@ -1,5 +1,5 @@
#ifndef _VERSION_NATIVE_AIO
-#define _VERSION_NATIVE_AIO 25
+#define _VERSION_NATIVE_AIO 26
#endif
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -14,11 +14,14 @@
package org.hornetq.core.asyncio.impl;
import java.nio.ByteBuffer;
+import java.util.PriorityQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.asyncio.AIOCallback;
@@ -46,8 +49,21 @@
private static boolean loaded = false;
- private static int EXPECTED_NATIVE_VERSION = 25;
+ private static int EXPECTED_NATIVE_VERSION = 26;
+ /** Used to determine the next writing sequence */
+ private AtomicLong nextWritingSequence = new AtomicLong(0);
+
+ /** Used to determine the next writing sequence.
+ * This is accessed from a single thread (the Poller Thread) */
+ private long nextReadSequence = 0;
+
+ /**
+ * AIO can't guarantee ordering over callbacks.
+ * We use thie PriorityQueue to hold values until they are in order
+ */
+ private PriorityQueue<CallbackHolder> pendingCallbacks = new
PriorityQueue<CallbackHolder>();
+
public static void addMax(final int io)
{
totalMaxIO.addAndGet(io);
@@ -124,6 +140,10 @@
private String fileName;
+ /** Used while inside the callbackDone and callbackError
+ **/
+ private final Lock callbackLock = new ReentrantLock();
+
private final VariableLatch pollerLatch = new VariableLatch();
private volatile Runnable poller;
@@ -131,7 +151,7 @@
private int maxIO;
private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
-
+
private final VariableLatch pendingWrites = new VariableLatch();
private Semaphore writeSemaphore;
@@ -189,10 +209,10 @@
if (e.getCode() == HornetQException.NATIVE_ERROR_CANT_INITIALIZE_AIO)
{
ex = new HornetQException(e.getCode(),
- "Can't initialize AIO. Currently AIO
in use = " + totalMaxIO.get() +
- ", trying to allocate more
" +
- maxIO,
- e);
+ "Can't initialize AIO. Currently AIO in
use = " + totalMaxIO.get() +
+ ", trying to allocate more "
+
+ maxIO,
+ e);
}
else
{
@@ -202,6 +222,8 @@
}
opened = true;
addMax(this.maxIO);
+ nextWritingSequence.set(0);
+ nextReadSequence = 0;
}
finally
{
@@ -222,7 +244,7 @@
{
log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.fileName);
}
-
+
while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
{
log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.fileName);
@@ -263,7 +285,7 @@
{
startPoller();
}
-
+
pendingWrites.up();
if (writeExecutor != null)
@@ -274,17 +296,23 @@
{
writeSemaphore.acquireUninterruptibly();
+ long sequence = nextWritingSequence.getAndIncrement();
+
try
{
- write(handler, position, size, directByteBuffer, aioCallback);
+ write(handler, sequence, position, size, directByteBuffer,
aioCallback);
}
catch (HornetQException e)
{
- callbackError(aioCallback, directByteBuffer, e.getCode(),
e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer, e.getCode(),
e.getMessage());
}
catch (RuntimeException e)
{
- callbackError(aioCallback, directByteBuffer,
HornetQException.INTERNAL_ERROR, e.getMessage());
+ callbackError(aioCallback,
+ sequence,
+ directByteBuffer,
+ HornetQException.INTERNAL_ERROR,
+ e.getMessage());
}
}
});
@@ -293,17 +321,19 @@
{
writeSemaphore.acquireUninterruptibly();
+ long sequence = nextWritingSequence.getAndIncrement();
+
try
{
- write(handler, position, size, directByteBuffer, aioCallback);
+ write(handler, sequence, position, size, directByteBuffer, aioCallback);
}
catch (HornetQException e)
{
- callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer, e.getCode(),
e.getMessage());
}
catch (RuntimeException e)
{
- callbackError(aioCallback, directByteBuffer, HornetQException.INTERNAL_ERROR,
e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer,
HornetQException.INTERNAL_ERROR, e.getMessage());
}
}
@@ -397,9 +427,9 @@
resetBuffer(buffer, buffer.limit());
buffer.position(0);
}
-
+
// Protected
-------------------------------------------------------------------------
-
+
protected void finalize()
{
if (opened)
@@ -410,32 +440,109 @@
// Private
---------------------------------------------------------------------------
- /** The JNI layer will call this method, so we could use it to unlock readWriteLocks
held in the java layer */
+ /** */
@SuppressWarnings("unused")
- // Called by the JNI layer.. just ignore the
- // warning
- private void callbackDone(final AIOCallback callback, final ByteBuffer buffer)
+ private void callbackDone(final AIOCallback callback, final long sequence, final
ByteBuffer buffer)
{
writeSemaphore.release();
+
pendingWrites.down();
- callback.done();
-
- // The buffer is not sent on callback for read operations
- if (bufferCallback != null && buffer != null)
+
+ callbackLock.lock();
+
+ try
{
- bufferCallback.bufferDone(buffer);
+
+ if (sequence == -1)
+ {
+ callback.done();
+ }
+ else
+ {
+ if (sequence == nextReadSequence)
+ {
+ nextReadSequence++;
+ callback.done();
+ flushCallbacks();
+ }
+ else
+ {
+ // System.out.println("Buffering callback");
+ pendingCallbacks.add(new CallbackHolder(sequence, callback));
+ }
+ }
+
+ // The buffer is not sent on callback for read operations
+ if (bufferCallback != null && buffer != null)
+ {
+ bufferCallback.bufferDone(buffer);
+ }
}
+ finally
+ {
+ callbackLock.unlock();
+ }
}
+ private void flushCallbacks()
+ {
+ while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence ==
nextReadSequence)
+ {
+ CallbackHolder holder = pendingCallbacks.poll();
+ if (holder.isError())
+ {
+ ErrorCallback error = (ErrorCallback) holder;
+ holder.callback.onError(error.errorCode, error.message);
+ }
+ else
+ {
+ holder.callback.done();
+ }
+ nextReadSequence++;
+ }
+ }
+
// Called by the JNI layer.. just ignore the
// warning
- private void callbackError(final AIOCallback callback, final ByteBuffer buffer, final
int errorCode, final String errorMessage)
+ private void callbackError(final AIOCallback callback,
+ final long sequence,
+ final ByteBuffer buffer,
+ final int errorCode,
+ final String errorMessage)
{
log.warn("CallbackError: " + errorMessage);
+
writeSemaphore.release();
+
pendingWrites.down();
- callback.onError(errorCode, errorMessage);
+ callbackLock.lock();
+
+ try
+ {
+ if (sequence == -1)
+ {
+ callback.onError(errorCode, errorMessage);
+ }
+ else
+ {
+ if (sequence == nextReadSequence)
+ {
+ nextReadSequence++;
+ callback.onError(errorCode, errorMessage);
+ flushCallbacks();
+ }
+ else
+ {
+ pendingCallbacks.add(new ErrorCallback(sequence, callback, errorCode,
errorMessage));
+ }
+ }
+ }
+ finally
+ {
+ callbackLock.unlock();
+ }
+
// The buffer is not sent on callback for read operations
if (bufferCallback != null && buffer != null)
{
@@ -504,10 +611,10 @@
private static native void resetBuffer(ByteBuffer directByteBuffer, int size);
public static native void destroyBuffer(ByteBuffer buffer);
-
+
/** Instead of passing the nanoSeconds through the stack call every time, we set it
statically inside the native method */
public static native void setNanoSleepInterval(int nanoseconds);
-
+
public static native void nanoSleep();
private static native ByteBuffer newNativeBuffer(long size);
@@ -516,7 +623,12 @@
private native long size0(long handle) throws HornetQException;
- private native void write(long handle, long position, long size, ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
+ private native void write(long handle,
+ long sequence,
+ long position,
+ long size,
+ ByteBuffer buffer,
+ AIOCallback aioPackage) throws HornetQException;
private native void read(long handle, long position, long size, ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
@@ -529,11 +641,63 @@
/** A native method that does nothing, and just validate if the ELF dependencies are
loaded and on the correct platform as this binary format */
private static native int getNativeVersion();
- /** Poll asynchrounous events from internal queues */
+ /** Poll asynchronous events from internal queues */
private static native void internalPollEvents(long handler);
// Inner classes
---------------------------------------------------------------------
+ private static class CallbackHolder implements Comparable<CallbackHolder>
+ {
+ final long sequence;
+
+ final AIOCallback callback;
+
+ public boolean isError()
+ {
+ return false;
+ }
+
+ public CallbackHolder(final long sequence, final AIOCallback callback)
+ {
+ this.sequence = sequence;
+ this.callback = callback;
+ }
+
+ public int compareTo(CallbackHolder o)
+ {
+ // It shouldn't be equals in any case
+ if (sequence <= o.sequence)
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+ }
+
+ private static class ErrorCallback extends CallbackHolder
+ {
+ final int errorCode;
+
+ final String message;
+
+ public boolean isError()
+ {
+ return true;
+ }
+
+ public ErrorCallback(final long sequence, final AIOCallback callback, int
errorCode, String message)
+ {
+ super(sequence, callback);
+
+ this.errorCode = errorCode;
+
+ this.message = message;
+ }
+ }
+
private class PollerRunnable implements Runnable
{
PollerRunnable()
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -1024,6 +1024,7 @@
}
catch (HornetQException e)
{
+ log.warn(e.getMessage(), e);
// This should never occur
throw new XAException(XAException.XAER_RMERR);
}
Added: branches/ClebertCallback/src/main/org/hornetq/core/journal/IOAsyncTask.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/IOAsyncTask.java
(rev 0)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/IOAsyncTask.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+import org.hornetq.core.asyncio.AIOCallback;
+
+/**
+ *
+ * This class is just a direct extension of AIOCallback.
+ * Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the
journal.
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
+ *
+ */
+public interface IOAsyncTask extends AIOCallback
+{
+}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/IOCompletion.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/IOCompletion.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -13,17 +13,14 @@
package org.hornetq.core.journal;
-import org.hornetq.core.asyncio.AIOCallback;
-
/**
- *
- * This class is just a direct extension of AIOCallback.
- * Just to avoid the direct dependency of org.hornetq.core.asynciio.AIOCallback from the
journal.
- *
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
+ * A IOCompletion
*
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
*/
-public interface IOCompletion extends AIOCallback
+public interface IOCompletion extends IOAsyncTask
{
- void waitCompletion() throws Exception;
+ void lineUp();
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java 2009-11-21
15:48:01 UTC (rev 8360)
+++ branches/ClebertCallback/src/main/org/hornetq/core/journal/Journal.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -19,8 +19,10 @@
/**
*
- * A Journal
+ * Most methods on the journal provide a blocking version where you select the sync mode
and a non blocking mode where you pass a completion callback as a parameter.
*
+ * Notice also that even on the callback methods it's possible to pass the sync mode.
That will only make sense on the NIO operations.
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
@@ -31,14 +33,24 @@
void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws
Exception;
+ void appendAddRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception;
+
void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync)
throws Exception;
+ void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync,
IOCompletion completionCallback) throws Exception;
+
void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws
Exception;
+ void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception;
+
void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean
sync) throws Exception;
+ void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean
sync, IOCompletion completionCallback) throws Exception;
+
void appendDeleteRecord(long id, boolean sync) throws Exception;
+ void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws
Exception;
+
// Transactional operations
void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record)
throws Exception;
@@ -57,6 +69,8 @@
void appendCommitRecord(long txID, boolean sync) throws Exception;
+ void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws
Exception;
+
/**
*
* <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
@@ -70,10 +84,16 @@
*/
void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync)
throws Exception;
+ void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync,
IOCompletion callback) throws Exception;
+
void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws
Exception;
+ void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion
callback) throws Exception;
+
void appendRollbackRecord(long txID, boolean sync) throws Exception;
+ void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws
Exception;
+
// Load
JournalLoadInformation load(LoaderCallback reloadManager) throws Exception;
Modified: branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -56,17 +56,17 @@
void delete() throws Exception;
- void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws
Exception;
+ void write(HornetQBuffer bytes, boolean sync, IOAsyncTask callback) throws Exception;
void write(HornetQBuffer bytes, boolean sync) throws Exception;
/** Write directly to the file without using any buffer */
- void writeDirect(ByteBuffer bytes, boolean sync, IOCompletion callback);
+ void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
/** Write directly to the file without using any buffer */
void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
- int read(ByteBuffer bytes, IOCompletion callback) throws Exception;
+ int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
int read(ByteBuffer bytes) throws Exception;
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -15,21 +15,17 @@
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.asyncio.AIOCallback;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.journal.IOCompletion;
-import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
/**
*
@@ -50,14 +46,12 @@
private final BufferCallback bufferCallback;
- /** A context switch on AIO would make it to synchronize the disk before
- switching to the new thread, what would cause
- serious performance problems. Because of that we make all the writes on
- AIO using a single thread. */
- private final Executor executor;
-
/** The pool for Thread pollers */
private final Executor pollerExecutor;
+
+ /** Context switch on AIO could fire unnecessary flushes, so we use a single thread
for write */
+ private final Executor writerExecutor;
+
public AIOSequentialFile(final SequentialFileFactory factory,
final int bufferSize,
@@ -66,13 +60,13 @@
final String fileName,
final int maxIO,
final BufferCallback bufferCallback,
- final Executor executor,
+ final Executor writerExecutor,
final Executor pollerExecutor)
{
super(directory, new File(directory + "/" + fileName), factory);
this.maxIO = maxIO;
+ this.writerExecutor = writerExecutor;
this.bufferCallback = bufferCallback;
- this.executor = executor;
this.pollerExecutor = pollerExecutor;
}
@@ -99,7 +93,7 @@
public SequentialFile copy()
{
- return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(),
maxIO, bufferCallback, executor, pollerExecutor);
+ return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(),
maxIO, bufferCallback, writerExecutor, pollerExecutor);
}
public synchronized void close() throws Exception
@@ -114,7 +108,7 @@
final CountDownLatch donelatch = new CountDownLatch(1);
- executor.execute(new Runnable()
+ writerExecutor.execute(new Runnable()
{
public void run()
{
@@ -202,7 +196,7 @@
public synchronized void open(final int currentMaxIO) throws Exception
{
opened = true;
- aioFile = newFile();
+ aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor);
aioFile.open(getFile().getAbsolutePath(), currentMaxIO);
position.set(0);
aioFile.setBufferCallback(bufferCallback);
@@ -214,7 +208,7 @@
aioFile.setBufferCallback(callback);
}
- public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
{
int bytesToRead = bytes.limit();
@@ -229,7 +223,7 @@
public int read(final ByteBuffer bytes) throws Exception
{
- IOCompletion waitCompletion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback();
int bytesRead = read(bytes, waitCompletion);
@@ -268,20 +262,12 @@
// Protected methods
//
-----------------------------------------------------------------------------------------------------
- /**
- * An extension point for tests
- */
- protected AsynchronousFile newFile()
- {
- return new AsynchronousFileImpl(executor, pollerExecutor);
- }
-
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
{
if (sync)
{
- IOCompletion completion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
writeDirect(bytes, true, completion);
@@ -298,7 +284,7 @@
*
* @param sync Not used on AIO
* */
- public void writeDirect(final ByteBuffer bytes, final boolean sync, IOCompletion
callback)
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, IOAsyncTask
callback)
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -36,15 +36,19 @@
public class AIOSequentialFileFactory extends AbstractSequentialFactory
{
- // Timeout used to wait executors to shutdown
- private static final int EXECUTOR_TIMEOUT = 60;
-
private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
private static final boolean trace = log.isTraceEnabled();
private final ReuseBuffersController buffersControl = new ReuseBuffersController();
+ /** A single AIO write executor for every AIO File.
+ * This is used only for AIO & instant operations. We only need one
executor-thread for the entire journal as we always have only one active file.
+ * And even if we had multiple files at a given moment, this should still be ok, as
we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
+ private ExecutorService writeExecutor;
+
+ private ExecutorService pollerExecutor;
+
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
@@ -53,13 +57,6 @@
log.trace(message);
}
- /** A single AIO write executor for every AIO File.
- * This is used only for AIO & instant operations. We only need one
executor-thread for the entire journal as we always have only one active file.
- * And even if we had multiple files at a given moment, this should still be ok, as
we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
- private ExecutorService writeExecutor;
-
- private ExecutorService pollerExecutor;
-
public AIOSequentialFileFactory(final String journalDir)
{
this(journalDir,
@@ -163,8 +160,6 @@
@Override
public void stop()
{
- super.stop();
-
buffersControl.stop();
writeExecutor.shutdown();
@@ -192,6 +187,8 @@
catch (InterruptedException e)
{
}
+
+ super.stop();
}
@Override
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -19,10 +19,14 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
/**
*
@@ -34,17 +38,27 @@
*/
public abstract class AbstractSequentialFactory implements SequentialFileFactory
{
+
+ // Timeout used to wait executors to shutdown
+ protected static final int EXECUTOR_TIMEOUT = 60;
+
private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
+ /**
+ *
+ * We can't execute callbacks directly from any of the IO module. We need to do it
through another thread,
+ * So, we will use an executor for this.
+ * */
+ protected ExecutorService callbacksExecutor;
+
protected final String journalDir;
protected final TimedBuffer timedBuffer;
-
+
protected final int bufferSize;
protected final long bufferTimeout;
-
public AbstractSequentialFactory(final String journalDir,
final boolean buffered,
final int bufferSize,
@@ -71,6 +85,22 @@
{
timedBuffer.stop();
}
+
+ if (callbacksExecutor != null)
+ {
+ callbacksExecutor.shutdown();
+
+ try
+ {
+ if (!callbacksExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ {
+ log.warn("Timed out on AIO writer shutdown", new
Exception("Timed out on AIO writer shutdown"));
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
}
public void start()
@@ -79,6 +109,17 @@
{
timedBuffer.start();
}
+
+ if (isSupportsCallbacks())
+ {
+ callbacksExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-callbacks" + System.identityHashCode(this),
+
true));
+ }
+ else
+ {
+ callbacksExecutor = null;
+ }
+
}
/* (non-Javadoc)
@@ -99,7 +140,7 @@
}
}
}
-
+
public void flush()
{
if (timedBuffer != null)
@@ -117,7 +158,6 @@
}
}
-
public void releaseBuffer(ByteBuffer buffer)
{
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -16,9 +16,10 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -42,7 +43,7 @@
private File file;
private final String directory;
-
+
protected final SequentialFileFactory factory;
protected long fileSize = 0;
@@ -159,7 +160,7 @@
}
- public void write(final HornetQBuffer bytes, final boolean sync, final IOCompletion
callback) throws Exception
+ public void write(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask
callback) throws Exception
{
if (timedBuffer != null)
{
@@ -178,7 +179,7 @@
{
if (sync)
{
- IOCompletion completion = SimpleWaitIOCallback.getInstance();
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
write(bytes, true, completion);
@@ -203,18 +204,18 @@
// Inner classes -------------------------------------------------
- protected static class DelegateCallback implements IOCompletion
+ protected static class DelegateCallback implements IOAsyncTask
{
- final List<IOCompletion> delegates;
+ final List<IOAsyncTask> delegates;
- DelegateCallback(final List<IOCompletion> delegates)
+ DelegateCallback(final List<IOAsyncTask> delegates)
{
this.delegates = delegates;
}
public void done()
{
- for (IOCompletion callback : delegates)
+ for (IOAsyncTask callback : delegates)
{
try
{
@@ -229,7 +230,7 @@
public void onError(final int errorCode, final String errorMessage)
{
- for (IOCompletion callback : delegates)
+ for (IOAsyncTask callback : delegates)
{
try
{
@@ -249,7 +250,7 @@
protected class LocalBufferObserver implements TimedBufferObserver
{
- public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final
List<IOCompletion> callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final
List<IOAsyncTask> callbacks)
{
buffer.flip();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/DummyCallback.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -14,7 +14,6 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
/**
@@ -24,13 +23,13 @@
*
*
*/
-public class DummyCallback implements IOCompletion
+class DummyCallback extends SyncIOCompletion
{
private static DummyCallback instance = new DummyCallback();
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
- public static IOCompletion getInstance()
+ public static DummyCallback getInstance()
{
return instance;
}
@@ -47,5 +46,12 @@
public void waitCompletion() throws Exception
{
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.IOCompletion#linedUp()
+ */
+ public void lineUp()
+ {
+ }
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -44,6 +44,7 @@
import org.hornetq.core.buffers.ChannelBuffer;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
@@ -84,7 +85,7 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = false;
+ private static final boolean trace = log.isTraceEnabled();
/** This is to be set to true at DEBUG & development only */
private static final boolean LOAD_TRACE = false;
@@ -95,6 +96,7 @@
private static final void trace(final String message)
{
log.trace(message);
+ //System.out.println("JournalImpl::" + message);
}
// The sizes of primitive types
@@ -845,15 +847,40 @@
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendAddRecord(final long id, final byte recordType, final byte[] record,
final boolean sync, final IOCompletion callback) throws Exception
+ {
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+ }
+
public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync) throws Exception
{
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendAddRecord(id, recordType, record, sync, callback);
+
+ // We only wait on explicit callbacks
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+ {
+ if (LOAD_TRACE)
+ {
+ trace("appendAddRecord id = " + id + ", recordType = " +
recordType);
+ }
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
+
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
- IOCompletion callback = null;
-
compactingLock.readLock().lock();
try
@@ -864,8 +891,6 @@
writeAddRecord(-1, id, recordType, record, size, bb); // fileID will be filled
later
- callback = getSyncCallback(sync);
-
lockAppend.lock();
try
{
@@ -882,11 +907,6 @@
{
compactingLock.readLock().unlock();
}
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
}
public void appendUpdateRecord(final long id, final byte recordType, final byte[]
record, final boolean sync) throws Exception
@@ -894,15 +914,40 @@
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[]
record, final boolean sync, final IOCompletion callback) throws Exception
+ {
+ appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+ }
+
public void appendUpdateRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync) throws Exception
{
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendUpdateRecord(id, recordType, record, sync, callback);
+
+ // We only wait on explicit callbacks
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ public void appendUpdateRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+ {
+ if (LOAD_TRACE)
+ {
+ trace("appendUpdateRecord id = " + id + ", recordType = " +
recordType);
+ }
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
+
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
- IOCompletion callback = null;
-
compactingLock.readLock().lock();
try
@@ -924,8 +969,6 @@
writeUpdateRecord(-1, id, recordType, record, size, bb);
- callback = getSyncCallback(sync);
-
lockAppend.lock();
try
{
@@ -951,24 +994,40 @@
{
compactingLock.readLock().unlock();
}
+ }
+
+ public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+ {
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendDeleteRecord(id, sync, callback);
+
+ // We only wait on explicit callbacks
if (callback != null)
{
callback.waitCompletion();
}
}
-
- public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+
+ public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion
callback) throws Exception
{
+ if (LOAD_TRACE)
+ {
+ trace("appendDeleteRecord id = " + id);
+ }
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
+
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
compactingLock.readLock().lock();
- IOCompletion callback = null;
-
try
{
@@ -988,8 +1047,6 @@
writeDeleteRecord(-1, id, size, bb);
- callback = getSyncCallback(sync);
-
lockAppend.lock();
try
{
@@ -1016,11 +1073,6 @@
{
compactingLock.readLock().unlock();
}
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
}
public void appendAddRecordTransactional(final long txID, final long id, final byte
recordType, final byte[] record) throws Exception
@@ -1034,6 +1086,10 @@
final byte recordType,
final EncodingSupport record) throws
Exception
{
+ if (LOAD_TRACE)
+ {
+ trace("appendAddRecordTransactional txID " + txID + ", id =
" + id + ", recordType = " + recordType);
+ }
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -1083,6 +1139,10 @@
final byte recordType,
final EncodingSupport record) throws
Exception
{
+ if (LOAD_TRACE)
+ {
+ trace("appendUpdateRecordTransactional txID " + txID + ", id =
" + id + ", recordType = " + recordType);
+ }
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -1126,6 +1186,11 @@
public void appendDeleteRecordTransactional(final long txID, final long id, final
EncodingSupport record) throws Exception
{
+ if (LOAD_TRACE)
+ {
+ trace("appendDeleteRecordTransactional txID " + txID + ", id =
" + id);
+ }
+
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -1165,6 +1230,12 @@
{
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
+
+
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion completion) throws Exception
+ {
+ appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync,
completion);
+ }
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
@@ -1174,6 +1245,18 @@
appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
}
+ public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendPrepareRecord(txID, transactionData, sync, syncCompletion);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+ }
+
/**
*
* <p>If the system crashed after a prepare was called, it should store
information that is required to bring the transaction
@@ -1187,22 +1270,27 @@
* @param transactionData - extra user data for the prepare
* @throws Exception
*/
- public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync) throws Exception
+ public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync, IOCompletion callback) throws Exception
{
+ if (LOAD_TRACE)
+ {
+ trace("appendPrepareRecord txID " + txID);
+ }
+
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
compactingLock.readLock().lock();
JournalTransaction tx = getTransactionInfo(txID);
- if (sync)
- {
- tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
- }
-
try
{
@@ -1214,7 +1302,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
tx.prepare(usedFile);
}
@@ -1228,11 +1316,23 @@
{
compactingLock.readLock().unlock();
}
-
- // We should wait this outside of the lock, to increase throughput
- tx.waitCompletion();
}
+
+
+
+ public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendCommitRecord(txID, sync, syncCompletion);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+ }
+
/**
* <p>A transaction record (Commit or Prepare), will hold the number of elements
the transaction has on each file.</p>
* <p>For example, a transaction was spread along 3 journal files with 10
pendingTransactions on each file.
@@ -1250,13 +1350,20 @@
*
* @see JournalImpl#writeTransaction(byte, long,
org.hornetq.core.journal.impl.JournalImpl.JournalTransaction, EncodingSupport)
*/
- public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+
+
+ public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion
callback) throws Exception
{
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
compactingLock.readLock().lock();
JournalTransaction tx = transactions.remove(txID);
@@ -1266,7 +1373,10 @@
if (tx == null)
{
- throw new IllegalStateException("Cannot find tx with id " + txID);
+ log.warn("Commit being called on an empty transaction, ignoring call. ID
= " + txID);
+ // Commit being called on an empty transaction
+ callback.done();
+ return;
}
ChannelBuffer bb = newBuffer(SIZE_COMPLETE_TRANSACTION_RECORD);
@@ -1282,7 +1392,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
tx.commit(usedFile);
}
@@ -1296,21 +1406,34 @@
{
compactingLock.readLock().unlock();
}
+ }
- if (sync)
+
+ public void appendRollbackRecord(final long txID, final boolean sync) throws
Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendRollbackRecord(txID, sync, syncCompletion);
+
+ if (syncCompletion != null)
{
- // We should wait this outside of the lock, to increase throuput
- tx.waitCompletion();
+ syncCompletion.waitCompletion();
}
+
}
-
- public void appendRollbackRecord(final long txID, final boolean sync) throws
Exception
+
+ public void appendRollbackRecord(final long txID, final boolean sync, final
IOCompletion callback) throws Exception
{
if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
+ if (callback != null)
+ {
+ callback.lineUp();
+ }
+
compactingLock.readLock().lock();
JournalTransaction tx = null;
@@ -1331,7 +1454,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, false, sync, tx, callback);
tx.rollback(usedFile);
}
@@ -1345,14 +1468,6 @@
{
compactingLock.readLock().unlock();
}
-
- // We should wait this outside of the lock, to increase throuput
-
- if (sync)
- {
- tx.waitCompletion();
- }
-
}
public int getAlignment() throws Exception
@@ -2833,7 +2948,7 @@
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
- IOCompletion callback) throws Exception
+ final IOAsyncTask parameterCallback) throws
Exception
{
try
{
@@ -2841,6 +2956,8 @@
{
throw new IllegalStateException("The journal is not loaded " +
state);
}
+
+ final IOAsyncTask callback;
int size = bb.capacity();
@@ -2874,25 +2991,29 @@
if (tx != null)
{
- if (callback != null)
- {
- // sanity check, it should not happen.
- throw new IllegalArgumentException("Invalid callback parameter. Use
of tx is mutually exclusive with the callback");
- }
-
// The callback of a transaction has to be taken inside the lock,
// when we guarantee the currentFile will not be changed,
// since we individualize the callback per file
if (fileFactory.isSupportsCallbacks())
{
- callback = tx.getCallback(currentFile);
+ // Set the delegated callback as a parameter
+ TransactionCallback txcallback = tx.getCallback(currentFile);
+ if (parameterCallback != null)
+ {
+ txcallback.setDelegateCompletion(parameterCallback);
+ }
+ callback = txcallback;
}
+ else
+ {
+ callback = null;
+ }
if (sync)
{
- // 99 % of the times this will be already synced, as previous files should
be closed already.
- // This is to have 100% guarantee the transaction will be persisted and no
loss of information would
- // happen
+ // In an edge case the transaction could still have pending data from
previous files.
+ // This shouldn't cause any blocking issues, as this is here to
guarantee we cover all possibilities
+ // on guaranteeing the data is on the disk
tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
}
@@ -2903,6 +3024,10 @@
tx.fillNumberOfRecords(currentFile, bb);
}
}
+ else
+ {
+ callback = parameterCallback;
+ }
// Adding fileID
bb.writerIndex(DataConstants.SIZE_BYTE);
@@ -3233,13 +3358,13 @@
return tx;
}
- private IOCompletion getSyncCallback(final boolean sync)
+ private SyncIOCompletion getSyncCallback(final boolean sync)
{
if (fileFactory.isSupportsCallbacks())
{
if (sync)
{
- return SimpleWaitIOCallback.getInstance();
+ return new SimpleWaitIOCallback();
}
else
{
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -18,8 +18,9 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.Executor;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -133,7 +134,7 @@
return read(bytes, null);
}
- public int read(final ByteBuffer bytes, final IOCompletion callback) throws Exception
+ public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
{
try
{
@@ -197,7 +198,7 @@
return new NIOSequentialFile(factory, getFile());
}
- public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCompletion
callback)
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask
callback)
{
if (callback == null)
{
@@ -226,7 +227,7 @@
* @throws IOException
* @throws Exception
*/
- private void internalWrite(final ByteBuffer bytes, final boolean sync, final
IOCompletion callback) throws Exception
+ private void internalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
{
position.addAndGet(bytes.limit());
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -14,9 +14,9 @@
package org.hornetq.core.journal.impl;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
/**
@@ -26,7 +26,7 @@
*
*
*/
-public class SimpleWaitIOCallback implements IOCompletion
+public class SimpleWaitIOCallback extends SyncIOCompletion
{
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
@@ -37,12 +37,6 @@
private volatile int errorCode = 0;
- public static IOCompletion getInstance()
- {
- return new SimpleWaitIOCallback();
- }
-
-
public void done()
{
latch.countDown();
@@ -68,4 +62,16 @@
}
return;
}
+
+ public boolean waitCompletion(final long timeout) throws Exception
+ {
+ return latch.await(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.IOCompletion#linedUp()
+ */
+ public void lineUp()
+ {
+ }
}
Added:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java
(rev 0)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/SyncIOCompletion.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.journal.IOCompletion;
+
+/**
+ * Internal class used to manage explicit syncs on the Journal through callbacks.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class SyncIOCompletion implements IOCompletion
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public abstract void waitCompletion() throws Exception;
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
@@ -22,7 +23,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.VariableLatch;
@@ -56,7 +57,7 @@
private int bufferLimit = 0;
- private List<IOCompletion> callbacks;
+ private List<IOAsyncTask> callbacks;
private final Lock lock = new ReentrantReadWriteLock().writeLock();
@@ -106,7 +107,7 @@
buffer.clear();
bufferLimit = 0;
- callbacks = new ArrayList<IOCompletion>();
+ callbacks = new ArrayList<IOAsyncTask>();
this.flushOnSync = flushOnSync;
latchTimer.up();
this.timeout = timeout;
@@ -225,7 +226,7 @@
}
}
- public synchronized void addBytes(final byte[] bytes, final boolean sync, final
IOCompletion callback)
+ public synchronized void addBytes(final byte[] bytes, final boolean sync, final
IOAsyncTask callback)
{
if (buffer.writerIndex() == 0)
{
@@ -280,7 +281,7 @@
bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
- callbacks = new ArrayList<IOCompletion>();
+ callbacks = new LinkedList<IOAsyncTask>();
active = false;
pendingSync = false;
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -17,7 +17,7 @@
import java.nio.ByteBuffer;
import java.util.List;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
/**
* A TimedBufferObserver
@@ -39,7 +39,7 @@
// Public --------------------------------------------------------
- public void flushBuffer(ByteBuffer buffer, boolean syncRequested,
List<IOCompletion> callbacks);
+ public void flushBuffer(ByteBuffer buffer, boolean syncRequested,
List<IOAsyncTask> callbacks);
/** Return the number of remaining bytes that still fit on the observer (file) */
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TransactionCallback.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TransactionCallback.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -14,7 +14,7 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.utils.VariableLatch;
/**
@@ -24,22 +24,34 @@
*
*
*/
-public class TransactionCallback implements IOCompletion
+public class TransactionCallback implements IOAsyncTask
{
private final VariableLatch countLatch = new VariableLatch();
private volatile String errorMessage = null;
private volatile int errorCode = 0;
+
+ private volatile int up = 0;
+
+ private volatile int done = 0;
+
+ private volatile IOAsyncTask delegateCompletion;
public void countUp()
{
+ up++;
countLatch.up();
}
public void done()
{
countLatch.down();
+ if (++done == up && delegateCompletion != null)
+ {
+ delegateCompletion.done();
+ delegateCompletion = null;
+ }
}
public void waitCompletion() throws InterruptedException
@@ -59,9 +71,30 @@
this.errorCode = errorCode;
countLatch.down();
+
+ if (delegateCompletion != null)
+ {
+ delegateCompletion.onError(errorCode, errorMessage);
+ }
}
/**
+ * @return the delegateCompletion
+ */
+ public IOAsyncTask getDelegateCompletion()
+ {
+ return delegateCompletion;
+ }
+
+ /**
+ * @param delegateCompletion the delegateCompletion to set
+ */
+ public void setDelegateCompletion(IOAsyncTask delegateCompletion)
+ {
+ this.delegateCompletion = delegateCompletion;
+ }
+
+ /**
* @return the errorMessage
*/
public String getErrorMessage()
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -438,6 +438,7 @@
{
Transaction transaction = resourceManager.removeTransaction(xid);
transaction.commit();
+ server.getStorageManager().waitOnOperations(-1);
long recordID = server.getStorageManager().storeHeuristicCompletion(xid,
true);
resourceManager.putHeuristicCompletion(recordID, xid, true);
return true;
@@ -456,6 +457,7 @@
{
Transaction transaction = resourceManager.removeTransaction(xid);
transaction.rollback();
+ server.getStorageManager().completeOperations();
long recordID = server.getStorageManager().storeHeuristicCompletion(xid,
false);
resourceManager.putHeuristicCompletion(recordID, xid, false);
return true;
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -21,7 +21,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.NotificationBroadcasterSupport;
@@ -39,6 +41,7 @@
import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.AcceptorControl;
import org.hornetq.core.management.BridgeControl;
@@ -56,6 +59,7 @@
import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.spi.Acceptor;
@@ -736,6 +740,29 @@
}
}
}
+
+ // TODO: Talk to Andy and Jeff about a better way to sync this...
+ System.out.println("Waiting");
+ final CountDownLatch latch = new CountDownLatch(1);
+ OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+ {
+
+ public void done()
+ {
+ System.out.println("Done on management");
+ latch.countDown();
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ });
+
+ OperationContextImpl.getContext().complete();
+
+ latch.await(5, TimeUnit.SECONDS);
+ System.out.println("Done");
}
public void enableNotifications(boolean enabled)
Modified: branches/ClebertCallback/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/paging/PagingStore.java 2009-11-21
15:48:01 UTC (rev 8360)
+++ branches/ClebertCallback/src/main/org/hornetq/core/paging/PagingStore.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -67,14 +67,14 @@
* @param message
* @throws Exception
*/
- void addSize(ServerMessage message, boolean add) throws Exception;
+ void addSize(ServerMessage message, boolean add);
/**
*
* @param reference
* @throws Exception
*/
- void addSize(MessageReference reference, boolean add) throws Exception;
+ void addSize(MessageReference reference, boolean add);
/**
*
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -127,14 +127,16 @@
// Static --------------------------------------------------------
- private static final boolean isTrace = log.isTraceEnabled();
+ //private static final boolean isTrace = log.isTraceEnabled();
+ private static final boolean isTrace = true;
// This is just a debug tool method.
// During debugs you could make log.trace as log.info, and change the
// variable isTrace above
private static void trace(final String message)
{
- log.trace(message);
+ System.out.println("PagingStoreImpl::" + message);
+ // log.trace(message);
}
// Constructors --------------------------------------------------
@@ -279,7 +281,7 @@
checkReleaseProducerFlowControlCredits(-credits);
}
- public void addSize(final ServerMessage message, final boolean add) throws Exception
+ public void addSize(final ServerMessage message, final boolean add)
{
long size = message.getMemoryEstimate();
@@ -297,7 +299,7 @@
}
}
- public void addSize(final MessageReference reference, final boolean add) throws
Exception
+ public void addSize(final MessageReference reference, final boolean add)
{
long size = MessageReferenceImpl.getMemoryEstimate();
@@ -477,7 +479,7 @@
}
}
- public boolean startPaging() throws Exception
+ public boolean startPaging()
{
if (!running)
{
@@ -508,7 +510,17 @@
{
if (currentPage == null)
{
- openNewPage();
+ try
+ {
+ openNewPage();
+ }
+ catch (Exception e)
+ {
+ // If not possible to starting page due to an IO error, we will just
consider it non paging.
+ // This shouldn't happen anyway
+ log.warn("IO Error, impossible to start paging", e);
+ return false;
+ }
return true;
}
@@ -699,7 +711,7 @@
}
}
- private void addSize(final long size) throws Exception
+ private void addSize(final long size)
{
if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
{
@@ -996,10 +1008,10 @@
}
depageTransaction.commit();
+
+ // TODO: If we implement ordering on AIO, we won't need to block here
+ storageManager.waitOnOperations();
- // StorageManager does the check: if (replicated) -> do the proper cleanup
already
- storageManager.completeReplication();
-
if (isTrace)
{
trace("Depage committed, running = " + running);
Added:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
(rev 0)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence;
+
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
+
+
+/**
+ * This represents a set of operations done as part of replication.
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface OperationContext extends IOCompletion
+{
+
+ boolean hasReplication();
+
+ void executeOnCompletion(IOAsyncTask runnable);
+
+ void replicationLineUp();
+
+ void replicationDone();
+
+ /** To be called when there are no more operations pending */
+ void complete();
+
+ /** Is this a special operation to sync replication. */
+ boolean isSync();
+
+}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -18,6 +18,7 @@
import javax.transaction.xa.Xid;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -56,14 +57,19 @@
boolean isReplicated();
- void afterReplicated(Runnable run);
+ void afterCompleteOperations(IOAsyncTask run);
- /** Block until the replication is done.
+ /** Block until the operations are done.
* @throws Exception */
- void waitOnReplication(long timeout) throws Exception;
+ void waitOnOperations(long timeout) throws Exception;
- void completeReplication();
+ /** Block until the operations are done.
+ * @throws Exception */
+ void waitOnOperations() throws Exception;
+ /** To close the OperationsContext */
+ void completeOperations();
+
UUID getPersistentID();
void setPersistentID(UUID id) throws Exception;
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -176,7 +176,7 @@
this.delayDeletionCount.incrementAndGet();
}
- public synchronized void decrementDelayDeletionCount() throws Exception
+ public synchronized void decrementDelayDeletionCount()
{
int count = this.delayDeletionCount.decrementAndGet();
@@ -191,7 +191,7 @@
return new DecodingContext();
}
- private void checkDelete() throws Exception
+ private void checkDelete()
{
if (getRefCount() <= 0)
{
@@ -220,7 +220,7 @@
}
@Override
- public synchronized int decrementRefCount(MessageReference reference) throws
Exception
+ public synchronized int decrementRefCount(MessageReference reference)
{
int currentRefCount = super.decrementRefCount(reference);
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -26,9 +26,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import javax.transaction.xa.Xid;
@@ -37,6 +35,8 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -47,6 +47,7 @@
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.SimpleWaitIOCallback;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -291,12 +292,16 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
if (replicator != null)
{
replicator.closeContext();
}
+ else
+ {
+ OperationContextImpl.getContext().complete();
+ }
}
public boolean isReplicated()
@@ -304,21 +309,26 @@
return replicator != null;
}
+
+ public void waitOnOperations() throws Exception
+ {
+ waitOnOperations(-1);
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
*/
- public void waitOnReplication(final long timeout) throws Exception
+ public void waitOnOperations(final long timeout) throws Exception
{
- final CountDownLatch latch = new CountDownLatch(1);
- afterReplicated(new Runnable()
+ SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
+ afterCompleteOperations(waitCallback);
+ completeOperations();
+ if (timeout <= 0)
{
- public void run()
- {
- latch.countDown();
- }
- });
- completeReplication();
- if (!latch.await(timeout, TimeUnit.MILLISECONDS))
+ waitCallback.waitCompletion();
+ }
+ else
+ if (!waitCallback.waitCompletion(timeout))
{
throw new IllegalStateException("no response received from
replication");
}
@@ -363,13 +373,9 @@
// TODO: shouldn't those page methods be on the PageManager? ^^^^
- public void afterReplicated(Runnable run)
+ public void afterCompleteOperations(IOAsyncTask run)
{
- if (replicator == null)
- {
- throw new IllegalStateException("StorageManager is not replicated");
- }
- replicator.afterReplicated(run);
+ OperationContextImpl.getContext().executeOnCompletion(run);
}
public UUID getPersistentID()
@@ -452,27 +458,27 @@
messageJournal.appendAddRecord(message.getMessageID(),
ADD_LARGE_MESSAGE,
new
LargeMessageEncoding((LargeServerMessage)message),
- false);
+ false, getIOContext());
}
else
{
- messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message,
false);
+ messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message,
false, getIOContext());
}
}
public void storeReference(final long queueID, final long messageID) throws Exception
{
- messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID),
syncNonTransactional);
+ messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID),
syncNonTransactional, getIOContext());
}
public void storeAcknowledge(final long queueID, final long messageID) throws
Exception
{
- messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new
RefEncoding(queueID), syncNonTransactional);
+ messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new
RefEncoding(queueID), syncNonTransactional, getIOContext());
}
public void deleteMessage(final long messageID) throws Exception
{
- messageJournal.appendDeleteRecord(messageID, syncNonTransactional);
+ messageJournal.appendDeleteRecord(messageID, syncNonTransactional,
getIOContext());
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
@@ -483,19 +489,19 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
encoding,
- syncNonTransactional);
+ syncNonTransactional, getIOContext());
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final
long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding,
syncNonTransactional);
+ messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding,
syncNonTransactional, getIOContext());
}
public void deleteDuplicateID(long recordID) throws Exception
{
- messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
+ messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getIOContext());
}
public void sync()
@@ -559,13 +565,13 @@
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
{
long id = generateUniqueID();
- messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new
HeuristicCompletionEncoding(xid, isCommit), true);
+ messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new
HeuristicCompletionEncoding(xid, isCommit), true, getIOContext());
return id;
}
public void deleteHeuristicCompletion(long id) throws Exception
{
- messageJournal.appendDeleteRecord(id, true);
+ messageJournal.appendDeleteRecord(id, true, getIOContext());
}
public void deletePageTransactional(final long txID, final long recordID) throws
Exception
@@ -591,17 +597,17 @@
public void prepare(final long txID, final Xid xid) throws Exception
{
- messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional);
+ messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional,
getIOContext());
}
public void commit(final long txID) throws Exception
{
- messageJournal.appendCommitRecord(txID, syncTransactional);
+ messageJournal.appendCommitRecord(txID, syncTransactional, getIOContext());
}
public void rollback(final long txID) throws Exception
{
- messageJournal.appendRollbackRecord(txID, syncTransactional);
+ messageJournal.appendRollbackRecord(txID, syncTransactional, getIOContext());
}
public void storeDuplicateIDTransactional(final long txID,
@@ -639,7 +645,7 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
UPDATE_DELIVERY_COUNT,
updateInfo,
- syncNonTransactional);
+ syncNonTransactional, getIOContext());
}
private static final class AddMessageRecord
@@ -1323,7 +1329,7 @@
return info;
}
-
+
// Public
-----------------------------------------------------------------------------------
public Journal getMessageJournal()
@@ -1384,6 +1390,11 @@
}
// Private
----------------------------------------------------------------------------------
+
+ private IOCompletion getIOContext()
+ {
+ return OperationContextImpl.getContext();
+ }
private void checkAndCreateDir(final String dir, final boolean create)
{
@@ -1874,11 +1885,11 @@
}
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
Added:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
(rev 0)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -0,0 +1,179 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.persistence.OperationContext;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ */
+public class OperationContextImpl implements OperationContext
+{
+ private static final ThreadLocal<OperationContext> tlContext = new
ThreadLocal<OperationContext>();
+
+ public static OperationContext getContext()
+ {
+ OperationContext token = tlContext.get();
+ if (token == null)
+ {
+ token = new OperationContextImpl();
+ tlContext.set(token);
+ }
+ return token;
+ }
+
+ private List<TaskHolder> tasks;
+
+ private volatile int storeLineUp = 0;
+
+ private volatile int replicationLineUp = 0;
+
+ private int minimalStore = Integer.MAX_VALUE;
+
+ private int minimalReplicated = Integer.MAX_VALUE;
+
+ private int stored = 0;
+
+ private int replicated = 0;
+
+ private boolean empty = false;
+
+ /**
+ * @param executor
+ */
+ public OperationContextImpl()
+ {
+ super();
+ }
+
+ /** To be called by the replication manager, when new replication is added to the
queue */
+ public void lineUp()
+ {
+ storeLineUp++;
+ }
+
+ public void replicationLineUp()
+ {
+ replicationLineUp++;
+ }
+
+ public synchronized void replicationDone()
+ {
+ replicated++;
+ checkTasks();
+ }
+
+ public boolean hasReplication()
+ {
+ return replicationLineUp > 0;
+ }
+
+ /** You may have several actions to be done after a replication operation is
completed. */
+ public synchronized void executeOnCompletion(IOAsyncTask completion)
+ {
+ if (tasks == null)
+ {
+ tasks = new LinkedList<TaskHolder>();
+ minimalReplicated = replicationLineUp;
+ minimalStore = storeLineUp;
+ }
+
+ if (replicationLineUp == replicated && storeLineUp == stored)
+ {
+ completion.done();
+ }
+ else
+ {
+ tasks.add(new TaskHolder(completion));
+ }
+ }
+
+ /** To be called by the storage manager, when data is confirmed on the channel */
+ public synchronized void done()
+ {
+ stored++;
+ checkTasks();
+ }
+
+ private void checkTasks()
+ {
+ if (stored >= minimalStore && replicated >= minimalReplicated)
+ {
+ Iterator<TaskHolder> iter = tasks.iterator();
+ while (iter.hasNext())
+ {
+ TaskHolder holder = iter.next();
+ if (!holder.executed && stored >= holder.storeLined &&
replicated >= holder.replicationLined)
+ {
+ holder.executed = true;
+ holder.task.done();
+ iter.remove();
+ }
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationToken#complete()
+ */
+ public void complete()
+ {
+ tlContext.set(null);
+ }
+
+ public boolean isSync()
+ {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
+ */
+ public void onError(int errorCode, String errorMessage)
+ {
+ if (tasks != null)
+ {
+ for (TaskHolder run : tasks)
+ {
+ run.task.onError(errorCode, errorMessage);
+ }
+ }
+ }
+
+ class TaskHolder
+ {
+ int storeLined;
+
+ int replicationLined;
+
+ boolean executed;
+
+ IOAsyncTask task;
+
+ TaskHolder(IOAsyncTask task)
+ {
+ this.storeLined = storeLineUp;
+ this.replicationLined = replicationLineUp;
+ this.task = task;
+ }
+ }
+
+}
Added:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
(rev 0)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.persistence.OperationContext;
+
+/**
+ * A SyncOperation
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SyncOperation implements OperationContext
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ OperationContext ctx;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SyncOperation (OperationContext ctx)
+ {
+ this.ctx = ctx;
+ }
+
+ // Public --------------------------------------------------------
+
+ /**
+ *
+ * @see org.hornetq.core.persistence.OperationContext#complete()
+ */
+ public void complete()
+ {
+ ctx.complete();
+ }
+
+ /**
+ *
+ * @see org.hornetq.core.asyncio.AIOCallback#done()
+ */
+ public void done()
+ {
+ ctx.done();
+ }
+
+ /**
+ * @param runnable
+ * @see
org.hornetq.core.persistence.OperationContext#executeOnCompletion(org.hornetq.core.journal.IOAsyncTask)
+ */
+ public void executeOnCompletion(IOAsyncTask runnable)
+ {
+ ctx.executeOnCompletion(runnable);
+ }
+
+ /**
+ * @return
+ * @see org.hornetq.core.persistence.OperationContext#hasReplication()
+ */
+ public boolean hasReplication()
+ {
+ return ctx.hasReplication();
+ }
+
+ /**
+ * @return
+ * @see org.hornetq.core.persistence.OperationContext#isSync()
+ */
+ public boolean isSync()
+ {
+ return true;
+ }
+
+ /**
+ *
+ * @see org.hornetq.core.journal.IOCompletion#lineUp()
+ */
+ public void lineUp()
+ {
+ ctx.lineUp();
+ }
+
+ /**
+ * @param errorCode
+ * @param errorMessage
+ * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
+ */
+ public void onError(int errorCode, String errorMessage)
+ {
+ ctx.onError(errorCode, errorMessage);
+ }
+
+ /**
+ *
+ * @see org.hornetq.core.persistence.OperationContext#replicationDone()
+ */
+ public void replicationDone()
+ {
+ ctx.replicationDone();
+ }
+
+ /**
+ *
+ * @see org.hornetq.core.persistence.OperationContext#replicationLineUp()
+ */
+ public void replicationLineUp()
+ {
+ ctx.replicationLineUp();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -138,7 +138,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.LargeServerMessage#decrementDelayDeletionCount()
*/
- public void decrementDelayDeletionCount() throws Exception
+ public void decrementDelayDeletionCount()
{
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -20,6 +20,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -282,14 +283,6 @@
}
/* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
- */
- public void afterReplicated(Runnable run)
- {
- run.run();
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#isReplicated()
*/
public boolean isReplicated()
@@ -300,7 +293,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
}
@@ -336,7 +329,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
*/
- public void waitOnReplication(long timeout) throws Exception
+ public void waitOnOperations(long timeout) throws Exception
{
}
@@ -348,4 +341,19 @@
throw new IllegalStateException("Null Persistence should never be used as
replicated");
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
+ */
+ public void afterCompleteOperations(IOAsyncTask run)
+ {
+ run.done();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#waitOnOperations()
+ */
+ public void waitOnOperations() throws Exception
+ {
+ }
+
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Executor;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
@@ -58,11 +59,14 @@
private final StorageManager storageManager;
private final boolean persist;
+
+ private final Executor executor;
public DuplicateIDCacheImpl(final SimpleString address,
final int size,
final StorageManager storageManager,
- final boolean persist)
+ final boolean persist,
+ final Executor executor)
{
this.address = address;
@@ -73,6 +77,8 @@
this.storageManager = storageManager;
this.persist = persist;
+
+ this.executor = executor;
}
public void load(final List<Pair<byte[], Long>> theIds) throws Exception
@@ -138,7 +144,7 @@
storageManager.storeDuplicateID(address, duplID, recordID);
}
- addToCacheInMemory(duplID, recordID);
+ addToCacheInMemory(duplID, recordID, null);
}
else
{
@@ -155,11 +161,12 @@
}
}
- private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID)
throws Exception
+
+ private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID,
final Executor journalExecutor) throws Exception
{
cache.add(new ByteArrayHolder(duplID));
- Pair<ByteArrayHolder, Long> id;
+ final Pair<ByteArrayHolder, Long> id;
if (pos < ids.size())
{
@@ -173,7 +180,28 @@
// reclaimed
id.a = new ByteArrayHolder(duplID);
- storageManager.deleteDuplicateID(id.b);
+ if (journalExecutor != null)
+ {
+ // We can't execute any IO inside the Journal callback, so taking it
outside
+ journalExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ storageManager.deleteDuplicateID(id.b);
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on deleting duplicate cache");
+ }
+ }
+ });
+ }
+ else
+ {
+ storageManager.deleteDuplicateID(id.b);
+ }
id.b = recordID;
}
@@ -205,12 +233,18 @@
this.recordID = recordID;
}
- private void process() throws Exception
+ private void process()
{
if (!done)
{
- addToCacheInMemory(duplID, recordID);
-
+ try
+ {
+ addToCacheInMemory(duplID, recordID, executor);
+ }
+ catch (Exception shouldNotHappen)
+ {
+ // if you pass an executor to addtoCache, an exception will never happen
here
+ }
done = true;
}
}
@@ -227,17 +261,17 @@
{
}
- public void afterCommit(final Transaction tx) throws Exception
+ public void afterCommit(final Transaction tx)
{
process();
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
process();
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -28,6 +28,7 @@
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
@@ -742,7 +743,9 @@
if (cache == null)
{
- cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager,
persistIDCache);
+ // TODO: What's the right executor?
+ // Is there another way
+ cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager,
persistIDCache, redistributorExecutorFactory.getExecutor());
DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);
@@ -931,20 +934,18 @@
}
else
{
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- storageManager.afterReplicated(new Runnable()
+ public void onError(int errorCode, String errorMessage)
{
- public void run()
- {
- addReferences(refs);
- }
- });
- }
- else
- {
- addReferences(refs);
- }
+ log.warn("It wasn't possible to add references due to an IO error
code " + errorCode + " message = " + errorMessage);
+ }
+
+ public void done()
+ {
+ addReferences(refs);
+ }
+ });
}
}
@@ -1120,11 +1121,11 @@
}
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
@@ -1231,11 +1232,11 @@
}
}
- public void afterPrepare(Transaction tx) throws Exception
+ public void afterPrepare(Transaction tx)
{
}
- public void afterRollback(Transaction tx) throws Exception
+ public void afterRollback(Transaction tx)
{
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -243,6 +243,8 @@
log.warn("Connection failure has been detected: " + me.getMessage() +
" [code=" + me.getCode() + "]");
+ System.out.println("Fail on RemotingConnectio");
+
// Then call the listeners
callFailureListeners(me);
@@ -399,6 +401,7 @@
for (final FailureListener listener : listenersClone)
{
+ System.out.println("Calling failure listener: " +
listener.getClass().getName());
try
{
listener.connectionFailed(me);
@@ -419,6 +422,8 @@
for (final CloseListener listener : listenersClone)
{
+ System.out.println("Calling listener -> " + listener);
+ System.out.println("Calling listener " +
listener.getClass().getName());
try
{
listener.connectionClosed();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationContext.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationContext.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication;
-
-
-/**
- * This represents a set of operations done as part of replication.
- * When the entire set is done a group of Runnables can be executed.
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface ReplicationContext
-{
- /** To be called by the replication manager, when new replication is added to the
queue */
- void linedUp();
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- void replicated();
-
- void addReplicationAction(Runnable runnable);
-
- /** To be called when there are no more operations pending */
- void complete();
-
- /** Flush all pending callbacks on the Context */
- void flush();
-
-}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -19,6 +19,7 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.utils.SimpleString;
@@ -49,13 +50,10 @@
void appendRollbackRecord(byte journalID, long txID) throws Exception;
- /** Add an action to be executed after the pending replications */
- void afterReplicated(Runnable runnable);
-
void closeContext();
/** A list of tokens that are still waiting for replications to be completed */
- Set<ReplicationContext> getActiveTokens();
+ Set<OperationContext> getActiveTokens();
/**
* @param storeName
@@ -87,7 +85,7 @@
* @throws HornetQException
*/
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
-
+
void sync();
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -16,6 +16,7 @@
import java.util.List;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
@@ -27,7 +28,6 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
-
/**
* Used by the {@link JournalStorageManager} to replicate journal calls.
*
@@ -46,7 +46,7 @@
// Attributes ----------------------------------------------------
private static final boolean trace = false;
-
+
private static void trace(String message)
{
System.out.println("ReplicatedJournal::" + message);
@@ -58,9 +58,7 @@
private final byte journalID;
- public ReplicatedJournal(final byte journaID,
- final Journal localJournal,
- final ReplicationManager replicationManager)
+ public ReplicatedJournal(final byte journaID, final Journal localJournal, final
ReplicationManager replicationManager)
{
super();
journalID = journaID;
@@ -69,11 +67,11 @@
}
// Static --------------------------------------------------------
-
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+
/**
* @param id
* @param recordType
@@ -87,6 +85,21 @@
this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync) throws Exception
+ {
+ if (trace)
+ {
+ trace("Append record id = " + id + " recordType = " +
recordType);
+ }
+ replicationManager.appendAddRecord(journalID, id, recordType, record);
+ localJournal.appendAddRecord(id, recordType, record, sync);
+ }
+
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception
+ {
+ this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync,
completionCallback);
+ }
+
/**
* @param id
* @param recordType
@@ -95,14 +108,18 @@
* @throws Exception
* @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean)
*/
- public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync) throws Exception
+ public void appendAddRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync,
+ IOCompletion completionCallback) throws Exception
{
if (trace)
{
trace("Append record id = " + id + " recordType = " +
recordType);
}
replicationManager.appendAddRecord(journalID, id, recordType, record);
- localJournal.appendAddRecord(id, recordType, record, sync);
+ localJournal.appendAddRecord(id, recordType, record, sync, completionCallback);
}
/**
@@ -155,6 +172,19 @@
localJournal.appendCommitRecord(txID, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws
Exception
+ {
+ if (trace)
+ {
+ trace("AppendCommit " + txID);
+ }
+ replicationManager.appendCommitRecord(journalID, txID);
+ localJournal.appendCommitRecord(txID, sync, callback);
+ }
+
/**
* @param id
* @param sync
@@ -171,6 +201,19 @@
localJournal.appendDeleteRecord(id, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback)
throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendDelete " + id);
+ }
+ replicationManager.appendDeleteRecord(journalID, id);
+ localJournal.appendDeleteRecord(id, sync, completionCallback);
+ }
+
/**
* @param txID
* @param id
@@ -245,6 +288,27 @@
localJournal.appendPrepareRecord(txID, transactionData, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long,
org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync, IOCompletion callback) throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendPrepare txID=" + txID);
+ }
+ replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+ localJournal.appendPrepareRecord(txID, transactionData, sync, callback);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion callback) throws Exception
+ {
+ this.appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync,
callback);
+ }
+
/**
* @param txID
* @param sync
@@ -261,6 +325,19 @@
localJournal.appendRollbackRecord(txID, sync);
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendRollback " + txID);
+ }
+ replicationManager.appendRollbackRecord(journalID, txID);
+ localJournal.appendRollbackRecord(txID, sync, callback);
+ }
+
/**
* @param id
* @param recordType
@@ -291,7 +368,34 @@
replicationManager.appendUpdateRecord(journalID, id, recordType, record);
localJournal.appendUpdateRecord(id, recordType, record, sync);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[],
boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception
+ {
+ this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync,
completionCallback);
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ if (trace)
+ {
+ trace("AppendUpdateRecord id = " + id + " , recordType = " +
recordType);
+ }
+ replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+ localJournal.appendUpdateRecord(id, recordType, record, sync, completionCallback);
+ }
+
+
+
/**
* @param txID
* @param id
@@ -338,8 +442,8 @@
* @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List,
org.hornetq.core.journal.TransactionFailureCallback)
*/
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
- final List<PreparedTransactionInfo> preparedTransactions,
- final TransactionFailureCallback transactionFailure) throws
Exception
+ final List<PreparedTransactionInfo>
preparedTransactions,
+ final TransactionFailureCallback
transactionFailure) throws Exception
{
return localJournal.load(committedRecords, preparedTransactions,
transactionFailure);
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -1,105 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.replication.ReplicationContext;
-
-/**
- * A ReplicationToken
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationContextImpl implements ReplicationContext
-{
- private List<Runnable> tasks;
-
- private AtomicInteger pendings = new AtomicInteger(0);
-
- private volatile boolean complete = false;
-
- /**
- * @param executor
- */
- public ReplicationContextImpl()
- {
- super();
- }
-
- /** To be called by the replication manager, when new replication is added to the
queue */
- public void linedUp()
- {
- pendings.incrementAndGet();
- }
-
- /** You may have several actions to be done after a replication operation is
completed. */
- public void addReplicationAction(Runnable runnable)
- {
- if (complete)
- {
- // Sanity check, this shouldn't happen
- throw new IllegalStateException("The Replication Context is complete, and
no more tasks are accepted");
- }
-
- if (tasks == null)
- {
- // No need to use Concurrent, we only add from a single thread.
- // We don't add any more Runnables after it is complete
- tasks = new ArrayList<Runnable>();
- }
-
- tasks.add(runnable);
- }
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- public synchronized void replicated()
- {
- if (pendings.decrementAndGet() == 0 && complete)
- {
- flush();
- }
- }
-
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationToken#complete()
- */
- public synchronized void complete()
- {
- complete = true;
- if (pendings.get() == 0 && complete)
- {
- flush();
- }
- }
-
- public synchronized void flush()
- {
- if (tasks != null)
- {
- for (Runnable run : tasks)
- {
- run.run();
- }
- tasks.clear();
- }
- }
-
-
-}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -13,6 +13,9 @@
package org.hornetq.core.replication.impl;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -24,6 +27,9 @@
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.SyncOperation;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
@@ -34,7 +40,6 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationSyncContextMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -44,9 +49,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.replication.ReplicationContext;
import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.SimpleString;
/**
@@ -80,12 +83,8 @@
private final Object replicationLock = new Object();
- private final ThreadLocal<ReplicationContext> tlReplicationContext = new
ThreadLocal<ReplicationContext>();
+ private final Queue<OperationContext> pendingTokens = new
ConcurrentLinkedQueue<OperationContext>();
- private final Queue<ReplicationContext> pendingTokens = new
ConcurrentLinkedQueue<ReplicationContext>();
-
- private final ConcurrentHashSet<ReplicationContext> activeContexts = new
ConcurrentHashSet<ReplicationContext>();
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -278,14 +277,6 @@
sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId));
}
}
-
- public void sync()
- {
- if (enabled)
- {
- sendReplicatePacket(new ReplicationSyncContextMessage());
- }
- }
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long,
byte[])
@@ -351,9 +342,9 @@
log.warn(e.getMessage(), e);
}
}
-
+
public void beforeReconnect(HornetQException me)
- {
+ {
}
});
@@ -374,14 +365,14 @@
enabled = false;
- for (ReplicationContext ctx : activeContexts)
+ // The same context will be replicated on the pending tokens...
+ // as the multiple operations will be replicated on the same context
+ while (!pendingTokens.isEmpty())
{
- ctx.complete();
- ctx.flush();
+ OperationContext ctx = pendingTokens.poll();
+ ctx.replicationDone();
}
-
- activeContexts.clear();
-
+
if (replicatingChannel != null)
{
replicatingChannel.close();
@@ -401,63 +392,70 @@
started = false;
}
- public ReplicationContext getContext()
- {
- ReplicationContext token = tlReplicationContext.get();
- if (token == null)
- {
- token = new ReplicationContextImpl();
- activeContexts.add(token);
- tlReplicationContext.set(token);
- }
- return token;
- }
-
/* (non-Javadoc)
- * @see
org.hornetq.core.replication.ReplicationManager#addReplicationAction(java.lang.Runnable)
- */
- public void afterReplicated(final Runnable runnable)
- {
- getContext().addReplicationAction(runnable);
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#completeToken()
*/
public void closeContext()
{
- final ReplicationContext token = tlReplicationContext.get();
-
+ final OperationContext token = getContext();
+
if (token != null)
{
- // Disassociate thread local
- tlReplicationContext.set(null);
// Remove from pending tokens as soon as this is complete
- token.addReplicationAction(new Runnable()
+ if (!token.hasReplication())
{
- public void run()
- {
- activeContexts.remove(token);
- }
- });
+ sync(token);
+ }
token.complete();
}
}
- /* (non-Javadoc)
+
+ /* method for testcases only
* @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
*/
- public Set<ReplicationContext> getActiveTokens()
+ public Set<OperationContext> getActiveTokens()
{
+
+ LinkedHashSet<OperationContext> activeContexts = new
LinkedHashSet<OperationContext>();
+
+ // The same context will be replicated on the pending tokens...
+ // as the multiple operations will be replicated on the same context
+
+ for (OperationContext ctx : pendingTokens)
+ {
+ activeContexts.add(ctx);
+ }
+
return activeContexts;
+
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
+ */
+ public void compareJournals(JournalLoadInformation[] journalInfo) throws
HornetQException
+ {
+ replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
+ }
+
+ public void sync()
+ {
+ sync(OperationContextImpl.getContext());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
private void sendReplicatePacket(final Packet packet)
{
boolean runItNow = false;
- ReplicationContext repliToken = getContext();
- repliToken.linedUp();
+ OperationContext repliToken = getContext();
+ repliToken.replicationLineUp();
synchronized (replicationLock)
{
@@ -479,38 +477,98 @@
if (runItNow)
{
- repliToken.replicated();
+ repliToken.replicationDone();
}
}
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
- */
- public void compareJournals(JournalLoadInformation[] journalInfo) throws
HornetQException
+
+ private void replicated()
{
- replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
+ List<OperationContext> tokensToExecute = getTokens();
+
+ for (OperationContext ctx : tokensToExecute)
+ {
+ ctx.replicationDone();
+ }
}
-
- private void replicated()
+ private void sync(OperationContext context)
{
- ReplicationContext tokenPolled = pendingTokens.poll();
- if (tokenPolled == null)
+ boolean executeNow = false;
+ synchronized (replicationLock)
{
- throw new IllegalStateException("Missing replication token on the
queue.");
+ context.replicationLineUp();
+ if (pendingTokens.isEmpty())
+ {
+ // this means the list is empty and we should process it now
+ executeNow = true;
+ }
+ else
+ {
+ // adding the sync to be executed in order
+ // as soon as the reponses are back from the backup
+ this.pendingTokens.add(new SyncOperation(context));
+ }
}
- else
+ if (executeNow)
{
- tokenPolled.replicated();
+ context.replicationDone();
}
}
- // Package protected ---------------------------------------------
+
+ public OperationContext getContext()
+ {
+ return OperationContextImpl.getContext();
+ }
- // Protected -----------------------------------------------------
+ /**
+ * This method will first get all the sync tokens (that won't go to the backup
node)
+ * Then it will get the round trip tokens.
+ * At last, if the list is empty, it will verify if there are any future tokens that
are sync tokens, to avoid a case where no more replication is done due to inactivity.
+ * @return
+ */
+ private List<OperationContext> getTokens()
+ {
+ List<OperationContext> retList = new LinkedList<OperationContext>();
- // Private -------------------------------------------------------
+ OperationContext tokenPolled = null;
+ // First will get all the non replicated tokens up to the first one that is not
replicated
+ do
+ {
+ tokenPolled = pendingTokens.poll();
+
+ if (tokenPolled == null)
+ {
+ throw new IllegalStateException("Missing replication token on the
queue.");
+ }
+
+ retList.add(tokenPolled);
+
+ }
+ while (tokenPolled.isSync());
+
+ // This is to avoid a situation where we won't have more replicated packets
+ // We need to make sure we process any pending sync packet up to the next non empty
packet
+ synchronized (replicationLock)
+ {
+ while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
+ {
+ tokenPolled = pendingTokens.poll();
+ if (!tokenPolled.isSync())
+ {
+ throw new IllegalStateException("Replicatoin context is not a
roundtrip token as expected");
+ }
+
+ retList.add(tokenPolled);
+
+ }
+ }
+
+ return retList;
+ }
+
+
// Inner classes -------------------------------------------------
protected class ResponseHandler implements ChannelHandler
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -42,5 +42,5 @@
void incrementDelayDeletionCount();
- void decrementDelayDeletionCount() throws Exception;
+ void decrementDelayDeletionCount();
}
Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java 2009-11-21
15:48:01 UTC (rev 8360)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -125,6 +125,10 @@
Collection<Consumer> getConsumers();
+ /** We can't execute IO operation when inside the IOCallback /
TransactionCallback.
+ * This method will will perform IO operations in a second thread */
+ boolean checkDLQ(MessageReference ref, Executor ioExecutor) throws Exception;
+
boolean checkDLQ(MessageReference ref) throws Exception;
void lockDelivery();
Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -33,7 +33,7 @@
int incrementRefCount(MessageReference reference) throws Exception;
- int decrementRefCount(MessageReference reference) throws Exception;
+ int decrementRefCount(MessageReference reference);
int incrementDurableRefCount();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -16,6 +16,7 @@
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -144,21 +145,21 @@
tx.commit();
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- storageManager.afterReplicated(new Runnable()
+
+ public void onError(int errorCode, String errorMessage)
{
- public void run()
- {
- execPrompter();
- }
- });
- storageManager.completeReplication();
- }
- else
- {
- execPrompter();
- }
+ log.warn("IO Error during redistribution, errorCode = " + errorCode
+ " message = " + errorMessage);
+ }
+
+ public void done()
+ {
+ execPrompter();
+ }
+ });
+
+ storageManager.completeOperations();
}
private void execPrompter()
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -87,10 +87,7 @@
}
newList.add(groupBinding);
storageManager.addGrouping(groupBinding);
- if (storageManager.isReplicated())
- {
- storageManager.waitOnReplication(timeout);
- }
+ storageManager.waitOnOperations(timeout);
return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
}
else
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -414,6 +414,12 @@
{
log.debug("Waiting for " + task);
}
+
+ if (replicationManager != null)
+ {
+ replicationManager.stop();
+ replicationManager = null;
+ }
threadPool.shutdown();
@@ -1031,7 +1037,7 @@
configuration.getManagementClusterPassword(),
managementService);
- queueFactory = new QueueFactoryImpl(scheduledPool, addressSettingsRepository,
storageManager);
+ queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool,
addressSettingsRepository, storageManager);
pagingManager = createPagingManager();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -14,6 +14,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.core.filter.Filter;
@@ -50,6 +51,7 @@
final Filter filter,
final boolean durable,
final boolean temporary,
+ final Executor executor,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@@ -61,6 +63,7 @@
filter,
durable,
temporary,
+ executor,
scheduledExecutor,
postOffice,
storageManager,
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -22,6 +22,7 @@
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -42,11 +43,16 @@
private PostOffice postOffice;
private final StorageManager storageManager;
+
+ private final ExecutorFactory executorFactory;
- public QueueFactoryImpl(final ScheduledExecutorService scheduledExecutor,
+ public QueueFactoryImpl(final ExecutorFactory executorFactory,
+ final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<AddressSettings>
addressSettingsRepository,
final StorageManager storageManager)
{
+ this.executorFactory = executorFactory;
+
this.addressSettingsRepository = addressSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
@@ -77,6 +83,7 @@
filter,
durable,
temporary,
+ executorFactory.getExecutor(),
scheduledExecutor,
postOffice,
storageManager,
@@ -90,6 +97,7 @@
filter,
durable,
temporary,
+ executorFactory.getExecutor(),
scheduledExecutor,
postOffice,
storageManager,
Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -114,6 +114,9 @@
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
private final ScheduledExecutorService scheduledExecutor;
+
+ /** We can't perform any operation on the journal while inside the Transactional
operations. */
+ private final Executor journalExecutor;
private final SimpleString address;
@@ -139,6 +142,7 @@
final Filter filter,
final boolean durable,
final boolean temporary,
+ final Executor executor,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@@ -163,6 +167,8 @@
this.addressSettingsRepository = addressSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
+
+ this.journalExecutor = executor;
direct = true;
@@ -621,6 +627,8 @@
{
acknowledge(ref);
}
+
+ storageManager.completeOperations();
}
public void setExpiryAddress(final SimpleString expiryAddress)
@@ -643,40 +651,46 @@
return deleteMatchingReferences(null);
}
- public synchronized int deleteMatchingReferences(final Filter filter) throws
Exception
+ public int deleteMatchingReferences(final Filter filter) throws Exception
{
int count = 0;
-
- Transaction tx = new TransactionImpl(storageManager);
-
- Iterator<MessageReference> iter = messageReferences.iterator();
-
- while (iter.hasNext())
+
+ synchronized(this)
{
- MessageReference ref = iter.next();
-
- if (filter == null || filter.match(ref.getMessage()))
+
+ Transaction tx = new TransactionImpl(storageManager);
+
+ Iterator<MessageReference> iter = messageReferences.iterator();
+
+ while (iter.hasNext())
{
- deliveringCount.incrementAndGet();
- acknowledge(tx, ref);
- iter.remove();
- count++;
+ MessageReference ref = iter.next();
+
+ if (filter == null || filter.match(ref.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, ref);
+ iter.remove();
+ count++;
+ }
}
- }
-
- List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
- for (MessageReference messageReference : cancelled)
- {
- if (filter == null || filter.match(messageReference.getMessage()))
+
+ List<MessageReference> cancelled = scheduledDeliveryHandler.cancel();
+ for (MessageReference messageReference : cancelled)
{
- deliveringCount.incrementAndGet();
- acknowledge(tx, messageReference);
- count++;
+ if (filter == null || filter.match(messageReference.getMessage()))
+ {
+ deliveringCount.incrementAndGet();
+ acknowledge(tx, messageReference);
+ count++;
+ }
}
+
+ tx.commit();
}
+
+ storageManager.waitOnOperations(-1);
- tx.commit();
-
return count;
}
@@ -925,11 +939,37 @@
public boolean checkDLQ(final MessageReference reference) throws Exception
{
+ return checkDLQ(reference, null);
+ }
+
+ public boolean checkDLQ(final MessageReference reference, Executor ioExecutor) throws
Exception
+ {
ServerMessage message = reference.getMessage();
if (message.isDurable() && durable)
{
- storageManager.updateDeliveryCount(reference);
+ if (ioExecutor != null)
+ {
+ ioExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ storageManager.updateDeliveryCount(reference);
+ storageManager.completeOperations();
+ }
+ catch (Exception e)
+ {
+ log.warn("Can't update delivery count on checkDLQ",
e);
+ }
+ }
+ });
+ }
+ else
+ {
+ storageManager.updateDeliveryCount(reference);
+ }
}
AddressSettings addressSettings =
addressSettingsRepository.getMatch(address.toString());
@@ -938,7 +978,28 @@
if (maxDeliveries > 0 && reference.getDeliveryCount() >=
maxDeliveries)
{
- sendToDeadLetterAddress(reference);
+ if (ioExecutor != null)
+ {
+ ioExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ sendToDeadLetterAddress(reference);
+ storageManager.completeOperations();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on DLQ send", e);
+ }
+ }
+ });
+ }
+ else
+ {
+ sendToDeadLetterAddress(reference);
+ }
return false;
}
@@ -950,7 +1011,28 @@
{
reference.setScheduledDeliveryTime(System.currentTimeMillis() +
redeliveryDelay);
- storageManager.updateScheduledDeliveryTime(reference);
+ if (ioExecutor != null)
+ {
+ ioExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ storageManager.updateScheduledDeliveryTime(reference);
+ storageManager.completeOperations();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on DLQ send", e);
+ }
+ }
+ });
+ }
+ else
+ {
+ storageManager.updateScheduledDeliveryTime(reference);
+ }
}
deliveringCount.decrementAndGet();
@@ -1381,7 +1463,7 @@
return status;
}
- private void removeExpiringReference(final MessageReference ref) throws Exception
+ private void removeExpiringReference(final MessageReference ref)
{
if (ref.getMessage().getExpiration() > 0)
{
@@ -1389,9 +1471,9 @@
}
}
- private void postAcknowledge(final MessageReference ref) throws Exception
+ private void postAcknowledge(final MessageReference ref)
{
- ServerMessage message = ref.getMessage();
+ final ServerMessage message = ref.getMessage();
QueueImpl queue = (QueueImpl)ref.getQueue();
@@ -1413,14 +1495,23 @@
// also note then when this happens as part of a trasaction its the tx commt
of the ack that is important
// not this
- try
+
+ // and this has to happen in a different thread
+
+ journalExecutor.execute(new Runnable()
{
- storageManager.deleteMessage(message.getMessageID());
- }
- catch (Exception e)
- {
- log.warn("Unable to remove message id = " +
message.getMessageID() + " please remove manually");
- }
+ public void run()
+ {
+ try
+ {
+ storageManager.deleteMessage(message.getMessageID());
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to remove message id = " +
message.getMessageID() + " please remove manually");
+ }
+ }
+ });
}
}
@@ -1431,7 +1522,7 @@
message.decrementRefCount(ref);
}
- void postRollback(final LinkedList<MessageReference> refs) throws Exception
+ void postRollback(final LinkedList<MessageReference> refs)
{
synchronized (this)
{
@@ -1481,29 +1572,38 @@
{
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
Map<QueueImpl, LinkedList<MessageReference>> queueMap = new
HashMap<QueueImpl, LinkedList<MessageReference>>();
for (MessageReference ref : refsToAck)
{
- if (ref.getQueue().checkDLQ(ref))
+ try
{
- LinkedList<MessageReference> toCancel =
queueMap.get(ref.getQueue());
-
- if (toCancel == null)
+ if (ref.getQueue().checkDLQ(ref, journalExecutor))
{
- toCancel = new LinkedList<MessageReference>();
-
- queueMap.put((QueueImpl)ref.getQueue(), toCancel);
+ LinkedList<MessageReference> toCancel =
queueMap.get(ref.getQueue());
+
+ if (toCancel == null)
+ {
+ toCancel = new LinkedList<MessageReference>();
+
+ queueMap.put((QueueImpl)ref.getQueue(), toCancel);
+ }
+
+ toCancel.addFirst(ref);
}
-
- toCancel.addFirst(ref);
}
+ catch (Exception e)
+ {
+ // checkDLQ here will be using an executor, this shouldn't happen
+ // don't you just hate checked exceptions in java?
+ log.warn("Error on checkDLQ", e);
+ }
}
for (Map.Entry<QueueImpl, LinkedList<MessageReference>> entry :
queueMap.entrySet())
@@ -1519,7 +1619,7 @@
}
}
- public void afterCommit(final Transaction tx) throws Exception
+ public void afterCommit(final Transaction tx)
{
for (MessageReference ref : refsToAck)
{
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -119,7 +119,7 @@
return count;
}
- public int decrementRefCount(final MessageReference reference) throws Exception
+ public int decrementRefCount(final MessageReference reference)
{
int count = refCount.decrementAndGet();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -1,5 +1,5 @@
/*
- * Copyright 2009 Red Hat, Inc.
+x * Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -34,11 +34,13 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.postoffice.Bindings;
@@ -765,6 +767,7 @@
public void handleRollback(final RollbackMessage packet)
{
+ new Exception("Rollback").printStackTrace();
Packet response = null;
try
@@ -1077,6 +1080,7 @@
public void handleXARollback(final SessionXARollbackMessage packet)
{
+ System.out.println("XARollback");
Packet response = null;
Xid xid = packet.getXid();
@@ -1457,7 +1461,7 @@
public void handleSend(final SessionSendMessage packet)
{
Packet response = null;
-
+
ServerMessage message = packet.getServerMessage();
try
@@ -1718,23 +1722,25 @@
final boolean flush,
final boolean closeChannel)
{
- if (storageManager.isReplicated())
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- storageManager.afterReplicated(new Runnable()
+
+ public void onError(int errorCode, String errorMessage)
{
- public void run()
- {
- doSendResponse(confirmPacket, response, flush, closeChannel);
- }
+ log.warn("Error processing IOCallback code = " + errorCode + "
message = " + errorMessage);
- });
+ HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new
HornetQException(errorCode, errorMessage));
+
+ doSendResponse(confirmPacket, exceptionMessage, flush, closeChannel);
+ }
- storageManager.completeReplication();
- }
- else
- {
- doSendResponse(confirmPacket, response, flush, closeChannel);
- }
+ public void done()
+ {
+ doSendResponse(confirmPacket, response, flush, closeChannel);
+ }
+ });
+
+ storageManager.completeOperations();
}
/**
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/transaction/TransactionOperation.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/transaction/TransactionOperation.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -29,9 +29,9 @@
void beforeRollback(Transaction tx) throws Exception;
- void afterPrepare(Transaction tx) throws Exception;
+ void afterPrepare(Transaction tx);
- void afterCommit(Transaction tx) throws Exception;
+ void afterCommit(Transaction tx);
- void afterRollback(Transaction tx) throws Exception;
+ void afterRollback(Transaction tx);
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -19,6 +19,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -98,7 +99,7 @@
{
this.containsPersistent = true;
}
-
+
public long getID()
{
return id;
@@ -146,14 +147,38 @@
storageManager.prepare(id, xid);
state = State.PREPARED;
+ // We use the Callback even for non persistence
+ // If we are using non-persistence with replication, the replication manager
will have
+ // to execute this runnable in the correct order
+ storageManager.afterCompleteOperations(new IOAsyncTask()
+ {
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
+ public void onError(int errorCode, String errorMessage)
{
- operation.afterPrepare(this);
+ log.warn("IO Error completing the transaction, code = " +
errorCode + ", message = " + errorMessage);
}
- }
+
+ public void done()
+ {
+ if (operations != null)
+ {
+ System.out.println("Prepare was executed fine");
+ for (TransactionOperation operation : operations)
+ {
+ try
+ {
+ operation.afterPrepare(TransactionImpl.this);
+ }
+ catch (Exception e)
+ {
+ //
https://jira.jboss.org/jira/browse/HORNETQ-188
+ // After commit shouldn't throw an exception
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ });
}
}
@@ -185,7 +210,11 @@
{
if (state == State.ACTIVE)
{
- prepare();
+ // Why do we need a prepare record on the onePhase optimization?
+ // Why we can't just go straight to commit, if we are doing one
phase anyway?
+ state = State.PREPARED;
+// System.out.println("Adding Prepare");
+// prepare();
}
}
if (state != State.PREPARED)
@@ -209,13 +238,29 @@
}
}
- Runnable execAfterCommit = null;
+ if (containsPersistent || (xid != null && state == State.PREPARED))
+ {
+ System.out.println("Adding commit");
+ storageManager.commit(id);
- if (operations != null)
+ state = State.COMMITTED;
+ }
+
+ // We use the Callback even for non persistence
+ // If we are using non-persistence with replication, the replication manager
will have
+ // to execute this runnable in the correct order
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- execAfterCommit = new Runnable()
+
+ public void onError(int errorCode, String errorMessage)
{
- public void run()
+ log.warn("IO Error completing the transaction, code = " +
errorCode + ", message = " + errorMessage);
+ }
+
+ public void done()
+ {
+ System.out.println("Commit was executed fine");
+ if (operations != null)
{
for (TransactionOperation operation : operations)
{
@@ -231,31 +276,9 @@
}
}
}
- };
- }
+ }
+ });
- if (containsPersistent || (xid != null && state == State.PREPARED))
- {
- storageManager.commit(id);
-
- state = State.COMMITTED;
-
- if (execAfterCommit != null)
- {
- if (storageManager.isReplicated())
- {
- storageManager.afterReplicated(execAfterCommit);
- }
- else
- {
- execAfterCommit.run();
- }
- }
- }
- else if (execAfterCommit != null)
- {
- execAfterCommit.run();
- }
}
}
@@ -290,13 +313,38 @@
state = State.ROLLEDBACK;
- if (operations != null)
+ // We use the Callback even for non persistence
+ // If we are using non-persistence with replication, the replication manager
will have
+ // to execute this runnable in the correct order
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- for (TransactionOperation operation : operations)
+
+ public void onError(int errorCode, String errorMessage)
{
- operation.afterRollback(this);
+ log.warn("IO Error completing the transaction, code = " +
errorCode + ", message = " + errorMessage);
}
- }
+
+ public void done()
+ {
+ if (operations != null)
+ {
+ System.out.println("Rollback was executed fine");
+ for (TransactionOperation operation : operations)
+ {
+ try
+ {
+ operation.afterRollback(TransactionImpl.this);
+ }
+ catch (Exception e)
+ {
+ //
https://jira.jboss.org/jira/browse/HORNETQ-188
+ // After commit shouldn't throw an exception
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ });
}
}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -40,8 +40,20 @@
{
private static final Logger log = Logger.getLogger(QueueTest.class);
- private QueueFactory queueFactory = new FakeQueueFactory();
+ private FakeQueueFactory queueFactory = new FakeQueueFactory();
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ queueFactory = new FakeQueueFactory();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ queueFactory.stop();
+ super.tearDown();
+ }
+
/*
* Concurrent set consumer not busy, busy then, call deliver while messages are being
added and consumed
*/
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/DeadLetterAddressTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -179,6 +179,14 @@
clientSession.rollback();
}
+ long timeout = System.currentTimeMillis() + 5000;
+
+ // DLA transfer is asynchronous fired on the rollback
+ while (System.currentTimeMillis() < timeout &&
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount() != 0)
+ {
+ Thread.sleep(1);
+ }
+
assertEquals(0,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
ClientMessage m = clientConsumer.receiveImmediate();
assertNull(m);
Added:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
(rev 0)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -0,0 +1,207 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.SessionFailureListener;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A OrderTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OrderTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ server = createServer(true, true);
+ server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
+ server.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ super.tearDown();
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testLoop() throws Exception
+ {
+ for (int i = 0 ; i < 50; i ++)
+ {
+ testSimpleOrder();
+ tearDown();
+ setUp();
+ }
+ }
+
+ public void testSimpleOrder() throws Exception
+ {
+ ClientSessionFactory sf = createNettyFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ try
+ {
+ session.createQueue("queue", "queue", true);
+
+ ClientProducer prod = session.createProducer("queue");
+
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = session.createClientMessage(i == 0);
+ msg.setBody(session.createBuffer(new byte[1024]));
+ msg.putIntProperty("id", i);
+ prod.send(msg);
+ }
+
+ session.close();
+
+ boolean started = false;
+
+ for (int start = 0; start < 3; start++)
+ {
+
+
+ if (start == 20)
+ {
+ started = true;
+ server.stop();
+ server.start();
+ }
+
+ session = sf.createSession(true, true);
+
+ session.start();
+
+// fail(session);
+
+ ClientConsumer cons = session.createConsumer("queue");
+
+ for (int i = 0; i < 100; i++)
+ {
+ if (!started || started && i % 2 == 0)
+ {
+ ClientMessage msg = cons.receive(10000);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ }
+ }
+
+ cons.close();
+
+ cons = session.createConsumer("queue");
+
+ for (int i = 0; i < 100; i++)
+ {
+ if (!started || started && i % 2 == 0)
+ {
+ ClientMessage msg = cons.receive(10000);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ }
+ }
+
+ session.close();
+ }
+
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+
+ }
+
+
+ private void fail(ClientSession session) throws InterruptedException
+ {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener implements SessionFailureListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch.countDown();
+ }
+
+ public void beforeReconnect(HornetQException exception)
+ {
+ }
+ }
+
+ MyListener listener = new MyListener();
+ session.addFailureListener(listener);
+
+
+
+ RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+ // Simulate failure on connection
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ // Wait to be informed of failure
+
+ boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+ assertTrue(ok);
+
+ session.removeFailureListener(listener);
+ }
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -158,15 +158,18 @@
for (int i = 0; i < numberOfMessages; i++)
{
+ System.out.println("Message " + i + " of " +
numberOfMessages);
ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
assertNotNull(message2);
- assertEquals(i, ((Integer)message2.getObjectProperty(new
SimpleString("id"))).intValue());
+ assertEquals(i, message2.getIntProperty("id").intValue());
message2.acknowledge();
assertNotNull(message2);
+
+ session.commit();
try
{
@@ -968,9 +971,6 @@
assertNull(consumerPaged.receiveImmediate());
-
assertFalse(server.getPostOffice().getPagingManager().getPageStore(PAGED_ADDRESS).isPaging());
-
assertFalse(server.getPostOffice().getPagingManager().getPageStore(NON_PAGED_ADDRESS).isPaging());
-
session.close();
}
Added:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/SimpleClientTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/SimpleClientTest.java
(rev 0)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/SimpleClientTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * The basic test possible. Useful to validate basic stuff.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * TODO: Don't commit this test on the main branch
+ */
+public class SimpleClientTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testSimpleTestNoTransacted() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory factory = createInVMFactory();
+
+ ClientSession session = null;
+
+ try
+ {
+ session = factory.createSession(false, true, true);
+
+ session.createQueue("A", "A", true);
+ ClientProducer producer = session.createProducer("A");
+ producer.send(session.createClientMessage(true));
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer("A");
+
+ ClientMessage msg = cons.receive(10000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ }
+ finally
+ {
+ session.close();
+ factory.close();
+ server.stop();
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -169,7 +169,6 @@
assertEquals(0, sf.numConnections());
}
-
/** It doesn't fail, but it restart both servers, live and backup, and the data
should be received after the restart,
* and the servers should be able to connect without any problems. */
public void testRestartServers() throws Exception
@@ -1670,6 +1669,73 @@
assertEquals(0, sf.numConnections());
}
+ public void testSimpleSendAfterFailover() throws Exception
+ {
+ ClientSessionFactoryInternal sf = getSessionFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener extends BaseListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ session.addFailureListener(new MyListener());
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 100;
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ fail(session, latch);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(i % 2 == 0);
+
+ setBody(i, message);
+
+ System.out.println("Durable = " + message.isDurable());
+
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertNotNull(message);
+
+ assertMessageBody(i, message);
+
+ assertEquals(i, message.getIntProperty("counter").intValue());
+
+ message.acknowledge();
+ }
+
+ session.close();
+
+ assertEquals(0, sf.numSessions());
+
+ assertEquals(0, sf.numConnections());
+ }
+
public void testForceBlockingReturn() throws Exception
{
ClientSessionFactoryInternal sf = this.getSessionFactory();
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -80,7 +80,7 @@
ClientSessionFactory csf = new
ClientSessionFactoryImpl(getConnectorTransportConfiguration(true));
csf.setBlockOnNonPersistentSend(false);
- csf.setBlockOnPersistentSend(false);
+ csf.setBlockOnPersistentSend(true);
ClientSession session = null;
if (transactional)
{
@@ -92,7 +92,7 @@
}
session.createQueue(address, queue, true);
ClientProducer producer = session.createProducer(address);
- boolean durable = true;
+ boolean durable = false;
for (int i = 0; i < NUM; i++)
{
ClientMessage msg = session.createClientMessage(durable);
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -37,6 +37,8 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.LoaderCallback;
@@ -50,6 +52,7 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
@@ -272,20 +275,8 @@
replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
replicatedJournal.appendRollbackRecord(3, false);
- assertEquals(1, manager.getActiveTokens().size());
-
blockOnReplication(manager);
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
assertEquals(0, manager.getActiveTokens().size());
ServerMessage msg = new ServerMessageImpl();
@@ -386,10 +377,15 @@
}
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
{
- public void run()
+
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
latch.countDown();
}
});
@@ -413,14 +409,17 @@
private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
{
- public void run()
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
latch.countDown();
}
-
});
manager.closeContext();
@@ -469,32 +468,23 @@
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
{
- public void run()
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
latch.countDown();
}
-
});
- assertEquals(1, manager.getActiveTokens().size());
-
manager.closeContext();
assertTrue(latch.await(1, TimeUnit.SECONDS));
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
assertEquals(0, manager.getActiveTokens().size());
manager.stop();
}
@@ -528,13 +518,13 @@
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
int numberOfAdds = 200;
-
+
final CountDownLatch latch = new CountDownLatch(numberOfAdds);
-
+
for (int i = 0; i < numberOfAdds; i++)
{
final int nAdd = i;
-
+
if (i % 2 == 0)
{
replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
@@ -544,40 +534,30 @@
manager.sync();
}
-
- manager.afterReplicated(new Runnable()
+ OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
{
- public void run()
+ public void onError(int errorCode, String errorMessage)
{
+ }
+
+ public void done()
+ {
executions.add(nAdd);
latch.countDown();
}
-
});
manager.closeContext();
}
-
+
assertTrue(latch.await(10, TimeUnit.SECONDS));
-
for (int i = 0; i < numberOfAdds; i++)
{
assertEquals(i, executions.get(i).intValue());
}
-
- for (int i = 0; i < 100; i++)
- {
- // This is asynchronous. Have to wait completion
- if (manager.getActiveTokens().size() == 0)
- {
- break;
- }
- Thread.sleep(1);
- }
-
assertEquals(0, manager.getActiveTokens().size());
manager.stop();
}
@@ -906,5 +886,80 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[],
boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendAddRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion
completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long,
org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync, IOCompletion callback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion callback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[],
boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ byte[] record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte,
org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void appendUpdateRecord(long id,
+ byte recordType,
+ EncodingSupport record,
+ boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ }
+
}
}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -538,15 +538,15 @@
{
}
- public void afterPrepare(Transaction tx) throws Exception
+ public void afterPrepare(Transaction tx)
{
}
- public void afterCommit(Transaction tx) throws Exception
+ public void afterCommit(Transaction tx)
{
}
- public void afterRollback(Transaction tx) throws Exception
+ public void afterRollback(Transaction tx)
{
latch.countDown();
}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -15,6 +15,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -41,17 +43,23 @@
private static final Logger log = Logger.getLogger(QueueImplTest.class);
private ScheduledExecutorService scheduledExecutor;
+
+ private ExecutorService executor;
public void setUp() throws Exception
{
super.setUp();
scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+
+ executor = Executors.newSingleThreadExecutor();
}
public void tearDown() throws Exception
{
scheduledExecutor.shutdownNow();
+
+ executor.shutdown();
super.tearDown();
}
@@ -70,7 +78,7 @@
public void testScheduledNoConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, scheduledExecutor, null, null,
null);
+ Queue queue = new QueueImpl(1, new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, executor, scheduledExecutor, null,
null, null);
//Send one scheduled
@@ -136,7 +144,7 @@
private void testScheduled(boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1,new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, scheduledExecutor, null, null,
null);
+ Queue queue = new QueueImpl(1,new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, executor, scheduledExecutor, null,
null, null);
FakeConsumer consumer = null;
@@ -243,7 +251,7 @@
return HandleStatus.HANDLED;
}
};
- Queue queue = new QueueImpl(1, new SimpleString("address1"), queue1,
null, false, true, scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, new SimpleString("address1"), queue1,
null, false, true, executor, scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -15,6 +15,8 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -84,20 +86,36 @@
protected static class CountDownCallback implements AIOCallback
{
private final CountDownLatch latch;
+
+ private final List<Integer> outputList;
+
+ private final int order;
+
+ private final AtomicInteger errors;
- public CountDownCallback(final CountDownLatch latch)
+ public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors,
final List<Integer> outputList, final int order)
{
this.latch = latch;
+
+ this.outputList = outputList;
+
+ this.order = order;
+
+ this.errors = errors;
}
volatile boolean doneCalled = false;
- volatile boolean errorCalled = false;
+ volatile int errorCalled = 0;
final AtomicInteger timesDoneCalled = new AtomicInteger(0);
public void done()
{
+ if (outputList != null)
+ {
+ outputList.add(order);
+ }
doneCalled = true;
timesDoneCalled.incrementAndGet();
if (latch != null)
@@ -108,7 +126,15 @@
public void onError(final int errorCode, final String errorMessage)
{
- errorCalled = true;
+ errorCalled++;
+ if (outputList != null)
+ {
+ outputList.add(order);
+ }
+ if (errors != null)
+ {
+ errors.incrementAndGet();
+ }
if (latch != null)
{
// even thought an error happened, we need to inform the latch,
@@ -116,6 +142,16 @@
latch.countDown();
}
}
+
+ public static void checkResults(final int numberOfElements, final
ArrayList<Integer> result)
+ {
+ assertEquals(numberOfElements, result.size());
+ int i = 0;
+ for (Integer resultI : result)
+ {
+ assertEquals(i++, resultI.intValue());
+ }
+ }
}
}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -136,6 +136,11 @@
int numberOfLines = 1000;
int size = 1024;
+
+ ArrayList<Integer> listResult1 = new ArrayList<Integer>();
+ ArrayList<Integer> listResult2 = new ArrayList<Integer>();
+
+ AtomicInteger errors = new AtomicInteger(0);
ByteBuffer buffer = null;
try
@@ -154,41 +159,43 @@
for (int i = 0; i < numberOfLines; i++)
{
- list.add(new CountDownCallback(latchDone));
- list2.add(new CountDownCallback(latchDone2));
+ list.add(new CountDownCallback(latchDone, errors, listResult1, i));
+ list2.add(new CountDownCallback(latchDone2, errors, listResult2, i));
}
- long valueInitial = System.currentTimeMillis();
-
int counter = 0;
+
Iterator<CountDownCallback> iter2 = list2.iterator();
- for (CountDownCallback tmp : list)
+ for (CountDownCallback cb1 : list)
{
- CountDownCallback tmp2 = iter2.next();
+ CountDownCallback cb2 = iter2.next();
- controller.write(counter * size, size, buffer, tmp);
- controller.write(counter * size, size, buffer, tmp2);
+ controller.write(counter * size, size, buffer, cb1);
+ controller2.write(counter * size, size, buffer, cb2);
++counter;
}
latchDone.await();
latchDone2.await();
+
+ CountDownCallback.checkResults(numberOfLines, listResult1);
+ CountDownCallback.checkResults(numberOfLines, listResult2);
for (CountDownCallback callback : list)
{
assertEquals(1, callback.timesDoneCalled.get());
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
}
for (CountDownCallback callback : list2)
{
assertEquals(1, callback.timesDoneCalled.get());
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
}
+
+ assertEquals(0, errors.get());
controller.close();
}
@@ -358,7 +365,7 @@
controller.setBufferCallback(bufferCallback);
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
+ ArrayList<Integer> result = new ArrayList<Integer>();
for (int i = 0; i < NUMBER_LINES; i++)
{
ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -367,6 +374,7 @@
{
buffer.put((byte)(j % Byte.MAX_VALUE));
}
+ CountDownCallback aio = new CountDownCallback(latch, null, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -379,7 +387,7 @@
controller.close();
closed = true;
- assertEquals(NUMBER_LINES, buffers.size());
+ CountDownCallback.checkResults(NUMBER_LINES, result);
// Make sure all the buffers are unique
ByteBuffer lineOne = null;
@@ -439,7 +447,6 @@
controller.setBufferCallback(bufferCallback);
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
buffer = AsynchronousFileImpl.newBuffer(SIZE);
buffer.rewind();
@@ -448,8 +455,11 @@
buffer.put((byte)(j % Byte.MAX_VALUE));
}
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
for (int i = 0; i < NUMBER_LINES; i++)
{
+ CountDownCallback aio = new CountDownCallback(latch, null, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -462,6 +472,8 @@
controller.close();
closed = true;
+ CountDownCallback.checkResults(NUMBER_LINES, result);
+
assertEquals(NUMBER_LINES, buffers.size());
// Make sure all the buffers are unique
@@ -517,7 +529,9 @@
{
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
+ AtomicInteger errors = new AtomicInteger(0);
for (int i = 0; i < NUMBER_LINES; i++)
{
@@ -531,12 +545,15 @@
buffer.put(getSamplebyte(j));
}
+ CountDownCallback aio = new CountDownCallback(latch, errors, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
latch.await();
- assertFalse(aio.errorCalled);
- assertEquals(NUMBER_LINES, aio.timesDoneCalled.get());
+
+ assertEquals(0, errors.get());
+
+ CountDownCallback.checkResults(NUMBER_LINES, result);
}
// If you call close you're supposed to wait events to finish before
@@ -557,12 +574,13 @@
AsynchronousFileImpl.clearBuffer(readBuffer);
CountDownLatch latch = new CountDownLatch(1);
- CountDownCallback aio = new CountDownCallback(latch);
+ AtomicInteger errors = new AtomicInteger(0);
+ CountDownCallback aio = new CountDownCallback(latch, errors, null, 0);
controller.read(i * SIZE, SIZE, readBuffer, aio);
latch.await();
- assertFalse(aio.errorCalled);
+ assertEquals(0, errors.get());
assertTrue(aio.doneCalled);
byte bytesRead[] = new byte[SIZE];
@@ -634,7 +652,7 @@
}
buffer.put((byte)'\n');
- CountDownCallback aio = new CountDownCallback(readLatch);
+ CountDownCallback aio = new CountDownCallback(readLatch, null, null, 0);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -663,10 +681,10 @@
newBuffer.put((byte)'\n');
CountDownLatch latch = new CountDownLatch(1);
- CountDownCallback aio = new CountDownCallback(latch);
+ CountDownCallback aio = new CountDownCallback(latch, null, null, 0);
controller.read(i * SIZE, SIZE, buffer, aio);
latch.await();
- assertFalse(aio.errorCalled);
+ assertEquals(0, aio.errorCalled);
assertTrue(aio.doneCalled);
byte bytesRead[] = new byte[SIZE];
@@ -720,9 +738,11 @@
ArrayList<CountDownCallback> list = new
ArrayList<CountDownCallback>();
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
for (int i = 0; i < numberOfLines; i++)
{
- list.add(new CountDownCallback(latchDone));
+ list.add(new CountDownCallback(latchDone, null, result, i));
}
long valueInitial = System.currentTimeMillis();
@@ -743,6 +763,9 @@
latchDone.await();
long timeTotal = System.currentTimeMillis() - valueInitial;
+
+ CountDownCallback.checkResults(numberOfLines, result);
+
debug("After completions time = " + timeTotal +
" for " +
numberOfLines +
@@ -759,7 +782,7 @@
{
assertEquals(1, tmp.timesDoneCalled.get());
assertTrue(tmp.doneCalled);
- assertFalse(tmp.errorCalled);
+ assertEquals(0, tmp.errorCalled);
}
controller.close();
@@ -799,11 +822,11 @@
for (int i = 0; i < NUMBER_LINES; i++)
{
CountDownLatch latchDone = new CountDownLatch(1);
- CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null,
0);
controller.write(i * 512, 512, buffer, aioBlock);
latchDone.await();
assertTrue(aioBlock.doneCalled);
- assertFalse(aioBlock.errorCalled);
+ assertEquals(0, aioBlock.errorCalled);
}
long timeTotal = System.currentTimeMillis() - startTime;
@@ -850,12 +873,12 @@
CountDownLatch latchDone = new CountDownLatch(1);
- CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null, 0);
controller.write(11, 512, buffer, aioBlock);
latchDone.await();
- assertTrue(aioBlock.errorCalled);
+ assertTrue(aioBlock.errorCalled != 0);
assertFalse(aioBlock.doneCalled);
}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -40,7 +40,7 @@
* */
public class MultiThreadAsynchronousFileTest extends AIOTestBase
{
-
+
public static TestSuite suite()
{
return createAIOTestSuite(MultiThreadAsynchronousFileTest.class);
@@ -56,37 +56,33 @@
static final int NUMBER_OF_LINES = 1000;
- // Executor exec
-
ExecutorService executor;
-
+
ExecutorService pollerExecutor;
-
private static void debug(final String msg)
{
log.debug(msg);
}
-
-
protected void setUp() throws Exception
{
super.setUp();
- pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
false));
+ pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
+ false));
executor = Executors.newSingleThreadExecutor();
}
-
+
protected void tearDown() throws Exception
{
executor.shutdown();
pollerExecutor.shutdown();
super.tearDown();
}
-
+
public void testMultipleASynchronousWrites() throws Throwable
{
- executeTest(false);
+ executeTest(false);
}
public void testMultipleSynchronousWrites() throws Throwable
@@ -132,15 +128,15 @@
long endTime = System.currentTimeMillis();
debug((sync ? "Sync result:" : "Async result:") + "
Records/Second = " +
- NUMBER_OF_THREADS *
- NUMBER_OF_LINES *
- 1000 /
- (endTime - startTime) +
- " total time = " +
- (endTime - startTime) +
- " total number of records = " +
- NUMBER_OF_THREADS *
- NUMBER_OF_LINES);
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES *
+ 1000 /
+ (endTime - startTime) +
+ " total time = " +
+ (endTime - startTime) +
+ " total number of records = " +
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES);
}
finally
{
@@ -180,9 +176,8 @@
{
super.run();
-
ByteBuffer buffer = null;
-
+
synchronized (MultiThreadAsynchronousFileTest.class)
{
buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -222,7 +217,7 @@
{
latchFinishThread = new CountDownLatch(1);
}
- CountDownCallback callback = new CountDownCallback(latchFinishThread);
+ CountDownCallback callback = new CountDownCallback(latchFinishThread,
null, null, 0);
if (!sync)
{
list.add(callback);
@@ -232,7 +227,7 @@
{
latchFinishThread.await();
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
}
if (!sync)
@@ -243,13 +238,13 @@
for (CountDownCallback callback : list)
{
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
for (CountDownCallback callback : list)
{
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -18,7 +18,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.impl.TimedBuffer;
import org.hornetq.core.journal.impl.TimedBufferObserver;
import org.hornetq.tests.util.UnitTestCase;
@@ -42,7 +42,7 @@
// Public --------------------------------------------------------
- IOCompletion dummyCallback = new IOCompletion()
+ IOAsyncTask dummyCallback = new IOAsyncTask()
{
public void done()
@@ -64,7 +64,7 @@
final AtomicInteger flushTimes = new AtomicInteger(0);
class TestObserver implements TimedBufferObserver
{
- public void flushBuffer(final ByteBuffer buffer, final boolean sync, final
List<IOCompletion> callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final boolean sync, final
List<IOAsyncTask> callbacks)
{
buffers.add(buffer);
flushTimes.incrementAndGet();
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -20,7 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.core.asyncio.BufferCallback;
-import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.TimedBuffer;
@@ -241,11 +241,11 @@
final ByteBuffer bytes;
- final IOCompletion callback;
+ final IOAsyncTask callback;
volatile boolean sendError;
- CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final
IOCompletion callback)
+ CallbackRunnable(final FakeSequentialFile file, final ByteBuffer bytes, final
IOAsyncTask callback)
{
this.file = file;
this.bytes = bytes;
@@ -399,7 +399,7 @@
return read(bytes, null);
}
- public int read(final ByteBuffer bytes, final IOCompletion callback) throws
Exception
+ public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws
Exception
{
if (!open)
{
@@ -439,7 +439,7 @@
return data.position();
}
- public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync,
final IOCompletion callback)
+ public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync,
final IOAsyncTask callback)
{
if (!open)
{
@@ -605,7 +605,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.remoting.spi.HornetQBuffer,
boolean, org.hornetq.core.journal.IOCallback)
*/
- public void write(HornetQBuffer bytes, boolean sync, IOCompletion callback) throws
Exception
+ public void write(HornetQBuffer bytes, boolean sync, IOAsyncTask callback) throws
Exception
{
writeDirect(ByteBuffer.wrap(bytes.array()), sync, callback);
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -29,6 +29,7 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -1155,7 +1156,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
*/
- public void afterReplicated(Runnable run)
+ public void afterCompleteOperations(Runnable run)
{
}
@@ -1163,7 +1164,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#completeReplication()
*/
- public void completeReplication()
+ public void completeOperations()
{
}
@@ -1221,7 +1222,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
*/
- public void waitOnReplication(long timeout) throws Exception
+ public void waitOnOperations(long timeout) throws Exception
{
}
@@ -1232,6 +1233,20 @@
{
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
+ */
+ public void afterCompleteOperations(IOAsyncTask run)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#waitOnOperations()
+ */
+ public void waitOnOperations() throws Exception
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -1131,7 +1131,7 @@
return null;
}
- public int decrementRefCount(MessageReference reference) throws Exception
+ public int decrementRefCount(MessageReference reference)
{
// TODO Auto-generated method stub
return 0;
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -17,6 +17,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -57,11 +58,20 @@
// Constructors --------------------------------------------------
+ ExecutorService executor;
+
@Override
protected void tearDown() throws Exception
{
super.tearDown();
+ executor.shutdown();
}
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ executor = Executors.newSingleThreadExecutor();
+ }
// Public --------------------------------------------------------
@@ -101,7 +111,7 @@
assertEquals(0, mapDups.size());
- DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal,
true);
+ DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal,
true, executor);
for (int i = 0; i < 100; i++)
{
@@ -126,7 +136,7 @@
assertEquals(10, values.size());
- cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
+ cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true, executor);
cacheID.load(values);
for (int i = 0; i < 100; i++)
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -493,4 +493,13 @@
return false;
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference,
java.util.concurrent.Executor)
+ */
+ public boolean checkDLQ(MessageReference ref, Executor ioExecutor) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
\ No newline at end of file
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -17,6 +17,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -43,7 +44,23 @@
{
// The tests ----------------------------------------------------------------
- private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
+ private ScheduledExecutorService scheduledExecutor;
+
+ private ExecutorService executor;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ scheduledExecutor.shutdown();
+ executor.shutdown();
+ super.tearDown();
+ }
private static final SimpleString queue1 = new SimpleString("queue1");
@@ -53,18 +70,18 @@
{
final SimpleString name = new SimpleString("oobblle");
- Queue queue = new QueueImpl(1, address1, name, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, name, null, false, true, executor,
scheduledExecutor, null, null, null);
assertEquals(name, queue.getName());
}
public void testDurable()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, false,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, false, executor,
scheduledExecutor, null, null, null);
assertFalse(queue.isDurable());
- queue = new QueueImpl(1, address1, queue1, null, true, false, scheduledExecutor,
null, null, null);
+ queue = new QueueImpl(1, address1, queue1, null, true, false, executor,
scheduledExecutor, null, null, null);
assertTrue(queue.isDurable());
}
@@ -77,7 +94,7 @@
Consumer cons3 = new FakeConsumer();
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
assertEquals(0, queue.getConsumerCount());
@@ -118,7 +135,7 @@
public void testGetFilter()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
assertNull(queue.getFilter());
@@ -135,7 +152,7 @@
}
};
- queue = new QueueImpl(1, address1, queue1, filter, false, true, scheduledExecutor,
null, null, null);
+ queue = new QueueImpl(1, address1, queue1, filter, false, true, executor,
scheduledExecutor, null, null, null);
assertEquals(filter, queue.getFilter());
@@ -143,7 +160,7 @@
public void testSimpleadd()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -162,7 +179,7 @@
public void testSimpleDirectDelivery() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer();
@@ -190,7 +207,7 @@
public void testSimpleNonDirectDelivery() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -228,7 +245,7 @@
public void testBusyConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer();
@@ -272,7 +289,7 @@
public void testBusyConsumerThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer();
@@ -339,7 +356,7 @@
public void testAddFirstadd() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -399,7 +416,7 @@
null,
false,
true,
- scheduledExecutor,
+ executor, scheduledExecutor,
new FakePostOffice(),
null,
null);
@@ -556,7 +573,7 @@
public void testConsumerReturningNull() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
class NullConsumer implements Consumer
{
@@ -589,7 +606,7 @@
public void testRoundRobinWithQueueing() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -632,7 +649,7 @@
public void testRoundRobinDirect() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -673,7 +690,7 @@
public void testWithPriorities() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -740,7 +757,7 @@
public void testConsumerWithFilterAddAndRemove()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
Filter filter = new FakeFilter("fruit", "orange");
@@ -749,7 +766,7 @@
public void testList()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 20;
@@ -773,7 +790,7 @@
public void testListWithFilter()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 20;
@@ -815,7 +832,7 @@
null,
false,
true,
- scheduledExecutor,
+ executor, scheduledExecutor,
new FakePostOffice(),
null,
null);
@@ -887,7 +904,7 @@
public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color =
'green'"));
@@ -928,7 +945,7 @@
public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color =
'green'"));
@@ -1002,7 +1019,7 @@
public void testConsumerWithFilterThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -1072,7 +1089,7 @@
null,
false,
true,
- scheduledExecutor,
+ executor, scheduledExecutor,
new FakePostOffice(),
null,
null);
@@ -1164,7 +1181,7 @@
public void testMessageOrder() throws Exception
{
FakeConsumer consumer = new FakeConsumer();
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1184,7 +1201,7 @@
public void testMessagesAdded() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1196,7 +1213,7 @@
public void testGetReference() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1209,7 +1226,7 @@
public void testGetNonExistentReference() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1226,7 +1243,7 @@
*/
public void testPauseAndResumeWithAsync() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
// pauses the queue
queue.pause();
@@ -1281,7 +1298,7 @@
public void testPauseAndResumeWithDirect() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
// Now add a consumer
FakeConsumer consumer = new FakeConsumer();
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2009-11-21
15:48:01 UTC (rev 8360)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2009-11-21
16:48:36 UTC (rev 8361)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.server.impl.fakes;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -32,14 +33,16 @@
*/
public class FakeQueueFactory implements QueueFactory
{
- private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
-
+ private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
+
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
private PostOffice postOffice;
public Queue createQueue(long persistenceID, final SimpleString address, SimpleString
name, Filter filter,
boolean durable, boolean temporary)
{
- return new QueueImpl(persistenceID, address, name, filter, durable, temporary,
scheduledExecutor, postOffice, null, null);
+ return new QueueImpl(persistenceID, address, name, filter, durable, temporary,
executor, scheduledExecutor, postOffice, null, null);
}
public void setPostOffice(PostOffice postOffice)
@@ -47,5 +50,12 @@
this.postOffice = postOffice;
}
+
+ public void stop() throws Exception
+ {
+ scheduledExecutor.shutdown();
+
+ executor.shutdown();
+ }
}