JBoss hornetq SVN: r8351 - branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-20 16:21:28 -0500 (Fri, 20 Nov 2009)
New Revision: 8351
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
Log:
AIO order
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20 18:16:16 UTC (rev 8350)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20 21:21:28 UTC (rev 8351)
@@ -14,12 +14,14 @@
package org.hornetq.core.asyncio.impl;
import java.nio.ByteBuffer;
+import java.util.PriorityQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.asyncio.AIOCallback;
@@ -48,14 +50,20 @@
private static boolean loaded = false;
private static int EXPECTED_NATIVE_VERSION = 26;
-
+
/** Used to determine the next writing sequence */
private AtomicLong nextWritingSequence = new AtomicLong(0);
/** Used to determine the next writing sequence.
* This is accessed from a single thread (the Poller Thread) */
- private long readSequence = 0;
+ private long nextReadSequence = 0;
+ /**
+ * AIO can't guarantee ordering over callbacks.
+ * We use thie PriorityQueue to hold values until they are in order
+ */
+ private PriorityQueue<CallbackHolder> pendingCallbacks = new PriorityQueue<CallbackHolder>();
+
public static void addMax(final int io)
{
totalMaxIO.addAndGet(io);
@@ -132,6 +140,10 @@
private String fileName;
+ /** Used while inside the callbackDone and callbackError
+ **/
+ private final Lock callbackLock = new ReentrantLock();
+
private final VariableLatch pollerLatch = new VariableLatch();
private volatile Runnable poller;
@@ -139,7 +151,7 @@
private int maxIO;
private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
-
+
private final VariableLatch pendingWrites = new VariableLatch();
private Semaphore writeSemaphore;
@@ -156,7 +168,7 @@
// serious performance problems. Because of that we make all the writes on
// AIO using a single thread.
private final Executor writeExecutor;
-
+
private final Executor pollerExecutor;
// AsynchronousFile implementation ---------------------------------------------------
@@ -197,10 +209,10 @@
if (e.getCode() == HornetQException.NATIVE_ERROR_CANT_INITIALIZE_AIO)
{
ex = new HornetQException(e.getCode(),
- "Can't initialize AIO. Currently AIO in use = " + totalMaxIO.get() +
- ", trying to allocate more " +
- maxIO,
- e);
+ "Can't initialize AIO. Currently AIO in use = " + totalMaxIO.get() +
+ ", trying to allocate more " +
+ maxIO,
+ e);
}
else
{
@@ -211,7 +223,7 @@
opened = true;
addMax(this.maxIO);
nextWritingSequence.set(0);
- readSequence = 0;
+ nextReadSequence = 0;
}
finally
{
@@ -232,7 +244,7 @@
{
log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
}
-
+
while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
{
log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
@@ -273,7 +285,7 @@
{
startPoller();
}
-
+
pendingWrites.up();
if (writeExecutor != null)
@@ -283,7 +295,7 @@
public void run()
{
writeSemaphore.acquireUninterruptibly();
-
+
long sequence = nextWritingSequence.getAndIncrement();
try
@@ -296,7 +308,11 @@
}
catch (RuntimeException e)
{
- callbackError(aioCallback, sequence, directByteBuffer, HornetQException.INTERNAL_ERROR, e.getMessage());
+ callbackError(aioCallback,
+ sequence,
+ directByteBuffer,
+ HornetQException.INTERNAL_ERROR,
+ e.getMessage());
}
}
});
@@ -411,9 +427,9 @@
resetBuffer(buffer, buffer.limit());
buffer.position(0);
}
-
+
// Protected -------------------------------------------------------------------------
-
+
protected void finalize()
{
if (opened)
@@ -424,32 +440,109 @@
// Private ---------------------------------------------------------------------------
- /** The JNI layer will call this method, so we could use it to unlock readWriteLocks held in the java layer */
+ /** */
@SuppressWarnings("unused")
- // Called by the JNI layer.. just ignore the
- // warning
private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
{
writeSemaphore.release();
+
pendingWrites.down();
- callback.done();
-
- // The buffer is not sent on callback for read operations
- if (bufferCallback != null && buffer != null)
+
+ callbackLock.lock();
+
+ try
{
- bufferCallback.bufferDone(buffer);
+
+ if (sequence == -1)
+ {
+ callback.done();
+ }
+ else
+ {
+ if (sequence == nextReadSequence)
+ {
+ nextReadSequence++;
+ callback.done();
+ flushCallbacks();
+ }
+ else
+ {
+ // System.out.println("Buffering callback");
+ pendingCallbacks.add(new CallbackHolder(sequence, callback));
+ }
+ }
+
+ // The buffer is not sent on callback for read operations
+ if (bufferCallback != null && buffer != null)
+ {
+ bufferCallback.bufferDone(buffer);
+ }
}
+ finally
+ {
+ callbackLock.unlock();
+ }
}
+ private void flushCallbacks()
+ {
+ while (!pendingCallbacks.isEmpty() && pendingCallbacks.peek().sequence == nextReadSequence)
+ {
+ CallbackHolder holder = pendingCallbacks.poll();
+ if (holder.isError())
+ {
+ ErrorCallback error = (ErrorCallback) holder;
+ holder.callback.onError(error.errorCode, error.message);
+ }
+ else
+ {
+ holder.callback.done();
+ }
+ nextReadSequence++;
+ }
+ }
+
// Called by the JNI layer.. just ignore the
// warning
- private void callbackError(final AIOCallback callback, final long sequence, final ByteBuffer buffer, final int errorCode, final String errorMessage)
+ private void callbackError(final AIOCallback callback,
+ final long sequence,
+ final ByteBuffer buffer,
+ final int errorCode,
+ final String errorMessage)
{
log.warn("CallbackError: " + errorMessage);
+
writeSemaphore.release();
+
pendingWrites.down();
- callback.onError(errorCode, errorMessage);
+ callbackLock.lock();
+
+ try
+ {
+ if (sequence == -1)
+ {
+ callback.onError(errorCode, errorMessage);
+ }
+ else
+ {
+ if (sequence == nextReadSequence)
+ {
+ nextReadSequence++;
+ callback.onError(errorCode, errorMessage);
+ flushCallbacks();
+ }
+ else
+ {
+ pendingCallbacks.add(new ErrorCallback(sequence, callback, errorCode, errorMessage));
+ }
+ }
+ }
+ finally
+ {
+ callbackLock.unlock();
+ }
+
// The buffer is not sent on callback for read operations
if (bufferCallback != null && buffer != null)
{
@@ -518,10 +611,10 @@
private static native void resetBuffer(ByteBuffer directByteBuffer, int size);
public static native void destroyBuffer(ByteBuffer buffer);
-
+
/** Instead of passing the nanoSeconds through the stack call every time, we set it statically inside the native method */
public static native void setNanoSleepInterval(int nanoseconds);
-
+
public static native void nanoSleep();
private static native ByteBuffer newNativeBuffer(long size);
@@ -530,7 +623,12 @@
private native long size0(long handle) throws HornetQException;
- private native void write(long handle, long sequence, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
+ private native void write(long handle,
+ long sequence,
+ long position,
+ long size,
+ ByteBuffer buffer,
+ AIOCallback aioPackage) throws HornetQException;
private native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
@@ -548,6 +646,53 @@
// Inner classes ---------------------------------------------------------------------
+ private static class CallbackHolder implements Comparable<CallbackHolder>
+ {
+ final long sequence;
+
+ final AIOCallback callback;
+
+ public boolean isError()
+ {
+ return false;
+ }
+
+ public CallbackHolder(final long sequence, final AIOCallback callback)
+ {
+ this.sequence = sequence;
+ this.callback = callback;
+ }
+
+ public int compareTo(CallbackHolder o)
+ {
+ // It shouldn't be equals in any case
+ if (sequence <= o.sequence)
+ {
+ return -1;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+ }
+
+ private static class ErrorCallback extends CallbackHolder
+ {
+ final int errorCode;
+
+ final String message;
+
+ public ErrorCallback(final long sequence, final AIOCallback callback, int errorCode, String message)
+ {
+ super(sequence, callback);
+
+ this.errorCode = errorCode;
+
+ this.message = message;
+ }
+ }
+
private class PollerRunnable implements Runnable
{
PollerRunnable()
15 years, 1 month
JBoss hornetq SVN: r8350 - in branches/ClebertTemporary: src/main/org/hornetq/core/asyncio/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-20 13:16:16 -0500 (Fri, 20 Nov 2009)
New Revision: 8350
Modified:
branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp
branches/ClebertTemporary/native/src/JNICallbackAdapter.h
branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
Log:
AIO order
Modified: branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp
===================================================================
--- branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp 2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp 2009-11-20 18:16:16 UTC (rev 8350)
@@ -18,7 +18,7 @@
jobject nullObj = NULL;
-JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jint _sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
+JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jlong _sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
{
controller = _controller;
Modified: branches/ClebertTemporary/native/src/JNICallbackAdapter.h
===================================================================
--- branches/ClebertTemporary/native/src/JNICallbackAdapter.h 2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/native/src/JNICallbackAdapter.h 2009-11-20 18:16:16 UTC (rev 8350)
@@ -33,7 +33,7 @@
jobject bufferReference;
- jint sequence;
+ jlong sequence;
// Is this a read operation
short isRead;
@@ -50,7 +50,7 @@
public:
// _ob must be a global Reference (use createGloblReferente before calling the constructor)
- JNICallbackAdapter(AIOController * _controller, jint sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead);
+ JNICallbackAdapter(AIOController * _controller, jlong sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead);
virtual ~JNICallbackAdapter();
void done(THREAD_CONTEXT threadContext);
Modified: branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp 2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp 2009-11-20 18:16:16 UTC (rev 8350)
@@ -48,10 +48,10 @@
std::string fileName = convertJavaString(env, jstrFileName);
controller = new AIOController(fileName, (int) maxIO);
- controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;ILjava/nio/ByteBuffer;)V");
+ controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;)V");
if (!controller->done) return 0;
- controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/hornetq/core/asyncio/AIOCallback;ILjava/nio/ByteBuffer;ILjava/lang/String;)V");
+ controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;ILjava/lang/String;)V");
if (!controller->error) return 0;
jclass loggerClass = env->GetObjectClass(logger);
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20 18:16:16 UTC (rev 8350)
@@ -18,6 +18,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -49,11 +50,11 @@
private static int EXPECTED_NATIVE_VERSION = 26;
/** Used to determine the next writing sequence */
- private AtomicInteger nextWritingSequence = new AtomicInteger(0);
+ private AtomicLong nextWritingSequence = new AtomicLong(0);
/** Used to determine the next writing sequence.
* This is accessed from a single thread (the Poller Thread) */
- private int readSequence = 0;
+ private long readSequence = 0;
public static void addMax(final int io)
{
@@ -283,7 +284,7 @@
{
writeSemaphore.acquireUninterruptibly();
- int sequence = nextWritingSequence.getAndIncrement();
+ long sequence = nextWritingSequence.getAndIncrement();
try
{
@@ -304,7 +305,7 @@
{
writeSemaphore.acquireUninterruptibly();
- int sequence = nextWritingSequence.getAndIncrement();
+ long sequence = nextWritingSequence.getAndIncrement();
try
{
@@ -427,7 +428,7 @@
@SuppressWarnings("unused")
// Called by the JNI layer.. just ignore the
// warning
- private void callbackDone(final AIOCallback callback, final int sequence, final ByteBuffer buffer)
+ private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
{
writeSemaphore.release();
pendingWrites.down();
@@ -442,7 +443,7 @@
// Called by the JNI layer.. just ignore the
// warning
- private void callbackError(final AIOCallback callback, final int sequence, final ByteBuffer buffer, final int errorCode, final String errorMessage)
+ private void callbackError(final AIOCallback callback, final long sequence, final ByteBuffer buffer, final int errorCode, final String errorMessage)
{
log.warn("CallbackError: " + errorMessage);
writeSemaphore.release();
@@ -529,7 +530,7 @@
private native long size0(long handle) throws HornetQException;
- private native void write(long handle, int sequence, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
+ private native void write(long handle, long sequence, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
private native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-20 18:16:16 UTC (rev 8350)
@@ -15,6 +15,8 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -84,20 +86,36 @@
protected static class CountDownCallback implements AIOCallback
{
private final CountDownLatch latch;
+
+ private final List<Integer> outputList;
+
+ private final int order;
+
+ private final AtomicInteger errors;
- public CountDownCallback(final CountDownLatch latch)
+ public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order)
{
this.latch = latch;
+
+ this.outputList = outputList;
+
+ this.order = order;
+
+ this.errors = errors;
}
volatile boolean doneCalled = false;
- volatile boolean errorCalled = false;
+ volatile int errorCalled = 0;
final AtomicInteger timesDoneCalled = new AtomicInteger(0);
public void done()
{
+ if (outputList != null)
+ {
+ outputList.add(order);
+ }
doneCalled = true;
timesDoneCalled.incrementAndGet();
if (latch != null)
@@ -108,7 +126,15 @@
public void onError(final int errorCode, final String errorMessage)
{
- errorCalled = true;
+ errorCalled++;
+ if (outputList != null)
+ {
+ outputList.add(order);
+ }
+ if (errors != null)
+ {
+ errors.incrementAndGet();
+ }
if (latch != null)
{
// even thought an error happened, we need to inform the latch,
@@ -116,6 +142,16 @@
latch.countDown();
}
}
+
+ public static void checkResults(final int numberOfElements, final ArrayList<Integer> result)
+ {
+ assertEquals(numberOfElements, result.size());
+ int i = 0;
+ for (Integer resultI : result)
+ {
+ assertEquals(i++, resultI.intValue());
+ }
+ }
}
}
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-20 18:16:16 UTC (rev 8350)
@@ -136,6 +136,11 @@
int numberOfLines = 1000;
int size = 1024;
+
+ ArrayList<Integer> listResult1 = new ArrayList<Integer>();
+ ArrayList<Integer> listResult2 = new ArrayList<Integer>();
+
+ AtomicInteger errors = new AtomicInteger(0);
ByteBuffer buffer = null;
try
@@ -154,41 +159,43 @@
for (int i = 0; i < numberOfLines; i++)
{
- list.add(new CountDownCallback(latchDone));
- list2.add(new CountDownCallback(latchDone2));
+ list.add(new CountDownCallback(latchDone, errors, listResult1, i));
+ list2.add(new CountDownCallback(latchDone2, errors, listResult2, i));
}
- long valueInitial = System.currentTimeMillis();
-
int counter = 0;
+
Iterator<CountDownCallback> iter2 = list2.iterator();
- for (CountDownCallback tmp : list)
+ for (CountDownCallback cb1 : list)
{
- CountDownCallback tmp2 = iter2.next();
+ CountDownCallback cb2 = iter2.next();
- controller.write(counter * size, size, buffer, tmp);
- controller.write(counter * size, size, buffer, tmp2);
+ controller.write(counter * size, size, buffer, cb1);
+ controller2.write(counter * size, size, buffer, cb2);
++counter;
}
latchDone.await();
latchDone2.await();
+
+ CountDownCallback.checkResults(numberOfLines, listResult1);
+ CountDownCallback.checkResults(numberOfLines, listResult2);
for (CountDownCallback callback : list)
{
assertEquals(1, callback.timesDoneCalled.get());
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
}
for (CountDownCallback callback : list2)
{
assertEquals(1, callback.timesDoneCalled.get());
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
}
+
+ assertEquals(0, errors.get());
controller.close();
}
@@ -358,7 +365,7 @@
controller.setBufferCallback(bufferCallback);
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
+ ArrayList<Integer> result = new ArrayList<Integer>();
for (int i = 0; i < NUMBER_LINES; i++)
{
ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -367,6 +374,7 @@
{
buffer.put((byte)(j % Byte.MAX_VALUE));
}
+ CountDownCallback aio = new CountDownCallback(latch, null, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -379,7 +387,7 @@
controller.close();
closed = true;
- assertEquals(NUMBER_LINES, buffers.size());
+ CountDownCallback.checkResults(NUMBER_LINES, result);
// Make sure all the buffers are unique
ByteBuffer lineOne = null;
@@ -439,7 +447,6 @@
controller.setBufferCallback(bufferCallback);
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
buffer = AsynchronousFileImpl.newBuffer(SIZE);
buffer.rewind();
@@ -448,8 +455,11 @@
buffer.put((byte)(j % Byte.MAX_VALUE));
}
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
for (int i = 0; i < NUMBER_LINES; i++)
{
+ CountDownCallback aio = new CountDownCallback(latch, null, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -462,6 +472,8 @@
controller.close();
closed = true;
+ CountDownCallback.checkResults(NUMBER_LINES, result);
+
assertEquals(NUMBER_LINES, buffers.size());
// Make sure all the buffers are unique
@@ -517,7 +529,9 @@
{
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
+ AtomicInteger errors = new AtomicInteger(0);
for (int i = 0; i < NUMBER_LINES; i++)
{
@@ -531,12 +545,15 @@
buffer.put(getSamplebyte(j));
}
+ CountDownCallback aio = new CountDownCallback(latch, errors, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
latch.await();
- assertFalse(aio.errorCalled);
- assertEquals(NUMBER_LINES, aio.timesDoneCalled.get());
+
+ assertEquals(0, errors.get());
+
+ CountDownCallback.checkResults(NUMBER_LINES, result);
}
// If you call close you're supposed to wait events to finish before
@@ -557,12 +574,13 @@
AsynchronousFileImpl.clearBuffer(readBuffer);
CountDownLatch latch = new CountDownLatch(1);
- CountDownCallback aio = new CountDownCallback(latch);
+ AtomicInteger errors = new AtomicInteger(0);
+ CountDownCallback aio = new CountDownCallback(latch, errors, null, 0);
controller.read(i * SIZE, SIZE, readBuffer, aio);
latch.await();
- assertFalse(aio.errorCalled);
+ assertEquals(0, errors.get());
assertTrue(aio.doneCalled);
byte bytesRead[] = new byte[SIZE];
@@ -634,7 +652,7 @@
}
buffer.put((byte)'\n');
- CountDownCallback aio = new CountDownCallback(readLatch);
+ CountDownCallback aio = new CountDownCallback(readLatch, null, null, 0);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -663,10 +681,10 @@
newBuffer.put((byte)'\n');
CountDownLatch latch = new CountDownLatch(1);
- CountDownCallback aio = new CountDownCallback(latch);
+ CountDownCallback aio = new CountDownCallback(latch, null, null, 0);
controller.read(i * SIZE, SIZE, buffer, aio);
latch.await();
- assertFalse(aio.errorCalled);
+ assertEquals(0, aio.errorCalled);
assertTrue(aio.doneCalled);
byte bytesRead[] = new byte[SIZE];
@@ -720,9 +738,11 @@
ArrayList<CountDownCallback> list = new ArrayList<CountDownCallback>();
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
for (int i = 0; i < numberOfLines; i++)
{
- list.add(new CountDownCallback(latchDone));
+ list.add(new CountDownCallback(latchDone, null, result, i));
}
long valueInitial = System.currentTimeMillis();
@@ -743,6 +763,9 @@
latchDone.await();
long timeTotal = System.currentTimeMillis() - valueInitial;
+
+ CountDownCallback.checkResults(numberOfLines, result);
+
debug("After completions time = " + timeTotal +
" for " +
numberOfLines +
@@ -759,7 +782,7 @@
{
assertEquals(1, tmp.timesDoneCalled.get());
assertTrue(tmp.doneCalled);
- assertFalse(tmp.errorCalled);
+ assertEquals(0, tmp.errorCalled);
}
controller.close();
@@ -799,11 +822,11 @@
for (int i = 0; i < NUMBER_LINES; i++)
{
CountDownLatch latchDone = new CountDownLatch(1);
- CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null, 0);
controller.write(i * 512, 512, buffer, aioBlock);
latchDone.await();
assertTrue(aioBlock.doneCalled);
- assertFalse(aioBlock.errorCalled);
+ assertEquals(0, aioBlock.errorCalled);
}
long timeTotal = System.currentTimeMillis() - startTime;
@@ -850,12 +873,12 @@
CountDownLatch latchDone = new CountDownLatch(1);
- CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null, 0);
controller.write(11, 512, buffer, aioBlock);
latchDone.await();
- assertTrue(aioBlock.errorCalled);
+ assertTrue(aioBlock.errorCalled != 0);
assertFalse(aioBlock.doneCalled);
}
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-11-20 18:16:16 UTC (rev 8350)
@@ -40,7 +40,7 @@
* */
public class MultiThreadAsynchronousFileTest extends AIOTestBase
{
-
+
public static TestSuite suite()
{
return createAIOTestSuite(MultiThreadAsynchronousFileTest.class);
@@ -57,34 +57,32 @@
static final int NUMBER_OF_LINES = 1000;
ExecutorService executor;
-
+
ExecutorService pollerExecutor;
-
private static void debug(final String msg)
{
log.debug(msg);
}
-
-
protected void setUp() throws Exception
{
super.setUp();
- pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this), false));
+ pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
+ false));
executor = Executors.newSingleThreadExecutor();
}
-
+
protected void tearDown() throws Exception
{
executor.shutdown();
pollerExecutor.shutdown();
super.tearDown();
}
-
+
public void testMultipleASynchronousWrites() throws Throwable
{
- executeTest(false);
+ executeTest(false);
}
public void testMultipleSynchronousWrites() throws Throwable
@@ -130,15 +128,15 @@
long endTime = System.currentTimeMillis();
debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
- NUMBER_OF_THREADS *
- NUMBER_OF_LINES *
- 1000 /
- (endTime - startTime) +
- " total time = " +
- (endTime - startTime) +
- " total number of records = " +
- NUMBER_OF_THREADS *
- NUMBER_OF_LINES);
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES *
+ 1000 /
+ (endTime - startTime) +
+ " total time = " +
+ (endTime - startTime) +
+ " total number of records = " +
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES);
}
finally
{
@@ -178,9 +176,8 @@
{
super.run();
-
ByteBuffer buffer = null;
-
+
synchronized (MultiThreadAsynchronousFileTest.class)
{
buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -220,7 +217,7 @@
{
latchFinishThread = new CountDownLatch(1);
}
- CountDownCallback callback = new CountDownCallback(latchFinishThread);
+ CountDownCallback callback = new CountDownCallback(latchFinishThread, null, null, 0);
if (!sync)
{
list.add(callback);
@@ -230,7 +227,7 @@
{
latchFinishThread.await();
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
}
if (!sync)
@@ -241,13 +238,13 @@
for (CountDownCallback callback : list)
{
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
for (CountDownCallback callback : list)
{
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
}
15 years, 1 month
JBoss hornetq SVN: r8349 - in branches/20-optimisation: src/main/org/hornetq/core/client and 17 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-20 12:41:40 -0500 (Fri, 20 Nov 2009)
New Revision: 8349
Modified:
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java
branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
WIBBLE WOBBLE WABBLE WOOOO
Modified: branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -376,6 +376,9 @@
public HornetQBuffer copy()
{
- return new HornetQByteBufferBackedChannelBuffer(ByteBuffer.wrap(buffer.array().clone()));
+ ByteBuffer newBuffer = ByteBuffer.allocate(buffer.remaining());
+ newBuffer.put(buffer);
+ newBuffer.flip();
+ return new HornetQByteBufferBackedChannelBuffer(newBuffer);
}
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -36,8 +36,6 @@
void acknowledge() throws HornetQException;
- void decode(HornetQBuffer buffer);
-
void resetBuffer();
//FIXME - the following are only used for large messages - they should be put somewhere else:
@@ -56,8 +54,6 @@
* @throws HornetQException
*/
boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException;
-
- void decodeHeadersAndProperties(HornetQBuffer buffer);
-
+
void setBodyInputStream(InputStream bodyInputStream);
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -581,7 +581,7 @@
* @parameter discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet
* */
public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException
- {
+ {
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -132,30 +132,8 @@
@Override
public void afterSend()
{
- //temp hack
-
-// ChannelBuffer cb = (ChannelBuffer)buffer.getUnderlyingBuffer();
-//
-// ChannelBuffer cbCopy = cb.copy(0, cb.capacity());
-//
-// this.buffer = new ChannelBufferWrapper(cbCopy);
-
- // resetBuffer();
-
-
}
-
- public void resetBuffer()
- {
- //There is a bug in Netty which requires us to initially write a byte
- if (buffer.capacity() == 0)
- {
- buffer.writeByte((byte)0);
- }
-
- buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT);
- }
-
+
@Override
public String toString()
{
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -696,7 +696,7 @@
{
log.trace("Setting up flowControlSize to " + message.getPacketSize() + " on message = " + clMessage);
}
-
+
clMessage.setFlowControlSize(message.getPacketSize());
consumer.handleMessage(message.getClientMessage());
@@ -1372,7 +1372,7 @@
// consumer
if (windowSize != 0)
- {
+ {
channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -700,7 +700,7 @@
}
ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID(),
- HornetQChannelBuffers.EMPTY_BUFFER);
+ HornetQChannelBuffers.dynamicBuffer(1500));
// Notification messages are always durable so the user can choose whether to add a durable queue to
// consume
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -192,5 +192,11 @@
void afterSend();
boolean isBufferWritten();
+
+ boolean isEncodedToBuffer();
+
+ void decodeFromWire(HornetQBuffer buffer);
+
+ void decodeHeadersAndProperties(HornetQBuffer buffer);
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -29,7 +29,9 @@
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.Message;
import org.hornetq.core.message.PropertyConversionException;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -91,7 +93,12 @@
protected byte priority;
protected HornetQBuffer buffer;
-
+
+ private int encodeSize;
+
+ //This means does the buffer contain an accurate encoding of the message?
+ protected boolean encodedToBuffer;
+
// Constructors --------------------------------------------------
protected MessageImpl()
@@ -141,7 +148,16 @@
return false;
}
- private int encodeSize;
+ public void resetBuffer()
+ {
+ //There is a bug in Netty which requires us to initially write a byte
+ if (buffer.capacity() == 0)
+ {
+ buffer.writeByte((byte)0);
+ }
+
+ buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT);
+ }
public int getEncodeSize()
{
@@ -154,12 +170,9 @@
}
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
- {
- //log.info("starting encode message at " + buffer.writerIndex());
- buffer.writeLong(messageID);
- // log.info("encoded id " + messageID + " at index " + buffer.writerIndex());
- buffer.writeSimpleString(destination);
- //log.info("encoded destination " + destination + " at index " + buffer.writerIndex());
+ {
+ buffer.writeLong(messageID);
+ buffer.writeSimpleString(destination);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
@@ -169,20 +182,24 @@
encodeSize = buffer.writerIndex();
}
- public void decode(final HornetQBuffer buffer)
+ public void decodeFromWire(final HornetQBuffer buffer)
{
decodeHeadersAndProperties(buffer);
this.buffer = buffer;
+
+ this.encodedToBuffer = true;
}
-
+
+ public boolean isEncodedToBuffer()
+ {
+ return this.encodedToBuffer;
+ }
+
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
- {
- // log.info("starting decode at " + buffer.readerIndex());
- messageID = buffer.readLong();
- // log.info("decoded message id " + messageID + " at index " + buffer.readerIndex());
- destination = buffer.readSimpleString();
- // log.info("decoded destination " + destination + " at index " + buffer.readerIndex());
+ {
+ messageID = buffer.readLong();
+ destination = buffer.readSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
Modified: branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -74,7 +74,7 @@
{
message = storage.createLargeMessage();
HornetQBuffer buffer = HornetQChannelBuffers.dynamicBuffer(largeMessageLazyData);
- message.decode(buffer);
+ message.decodeHeadersAndProperties(buffer);
largeMessageLazyData = null;
}
return message;
@@ -107,7 +107,7 @@
message = new ServerMessageImpl();
- message.decode(buffer);
+ message.decodeHeadersAndProperties(buffer);
}
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -165,7 +165,7 @@
// }
@Override
- public void decode(final HornetQBuffer buffer)
+ public void decodeHeadersAndProperties(final HornetQBuffer buffer)
{
file = null;
decodeHeadersAndProperties(buffer);
Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -451,6 +451,8 @@
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
}
+ log.info("storing message");
+
// Note that we don't sync, the add reference that comes immediately after will sync if appropriate
if (message.isLargeMessage())
@@ -1674,7 +1676,7 @@
*/
public void decode(final HornetQBuffer buffer)
{
- message.decode(buffer);
+ message.decodeHeadersAndProperties(buffer);
}
/* (non-Javadoc)
Modified: branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -780,8 +780,7 @@
{
// First send a reset message
- ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), HornetQChannelBuffers.EMPTY_BUFFER);
- // message.setDurable(true);
+ ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), HornetQChannelBuffers.dynamicBuffer(50));
message.setDestination(queueName);
message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
@@ -1000,7 +999,7 @@
private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
{
- ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), HornetQChannelBuffers.EMPTY_BUFFER);
+ ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), HornetQChannelBuffers.dynamicBuffer(100));
message.setDestination(queueName);
// message.setDurable(true);
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -329,7 +329,7 @@
// ----------------------------------------------------
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
- {
+ {
final Packet packet = decoder.decode(buffer);
if (executor == null || packet.getType() == PacketImpl.PING)
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -83,13 +83,36 @@
{
return deliveryCount;
}
-
+
@Override
public HornetQBuffer encode(final RemotingConnection connection)
{
//We re-use the same packet buffer - but we need to change the extra data
+
HornetQBuffer buffer = serverMessage.getBuffer();
-
+
+ if (serverMessage.isEncodedToBuffer())
+ {
+ //It's already encoded - we just need to change the extra data at the end
+ //so we need to jump to the after body position
+
+ buffer.setIndex(0, serverMessage.getEndMessagePosition());
+ }
+ else
+ {
+ int afterBody = buffer.writerIndex();
+
+ //Message hasn't been encoded yet - probably it's something like a notification message generated on the server
+
+ // We now write the message headers and properties
+ serverMessage.encodeHeadersAndProperties(buffer);
+
+ serverMessage.setEndMessagePosition(buffer.writerIndex());
+
+ //Now we need to fill in the afterBody
+ buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, afterBody);
+ }
+
buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
@@ -106,35 +129,49 @@
buffer.writeLong(channelID);
//And fill in the message id, since this was set on the server side so won't already be in the buffer
- buffer.setIndex(0, buffer.writerIndex() + DataConstants.SIZE_INT);
+
+ buffer.writerIndex(buffer.readInt(PacketImpl.PACKET_HEADERS_SIZE));
buffer.writeLong(serverMessage.getMessageID());
+ //Set the reader and writer position to be read fully by remoting
buffer.setIndex(0, size);
return buffer;
}
- public void decodeRest(final HornetQBuffer buffer)
+ @Override
+ public void decode(final HornetQBuffer buffer)
{
+ channelID = buffer.readLong();
+
clientMessage = new ClientMessageImpl();
// We read the position of the end of the body - this is where the message headers and properties are stored
int afterBody = buffer.readInt();
+ //At this point standard headers have been decoded and we are positioned at the beginning of the body
+ int bodyStart = buffer.readerIndex();
+
// We now read message headers/properties
- buffer.setIndex(afterBody, buffer.writerIndex());
+ buffer.readerIndex(afterBody);
- clientMessage.decode(buffer);
+ clientMessage.decodeFromWire(buffer);
+ // And read the extra data
+
consumerID = buffer.readLong();
-
+
deliveryCount = buffer.readInt();
clientMessage.setDeliveryCount(deliveryCount);
- buffer.resetReaderIndex();
+ size = buffer.readerIndex();
+
+ // Set reader index back to beginning of body
+ buffer.readerIndex(bodyStart);
+
clientMessage.setBuffer(buffer);
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -114,10 +114,12 @@
*
*
*/
+
HornetQBuffer buffer = sentMessage.getBuffer();
// The body will already be written (if any) at this point, so we take note of the position of the end of the
// body
+
int afterBody = buffer.writerIndex();
// We now write the message headers and properties
@@ -173,20 +175,19 @@
buffer.setIndex(afterBody, buffer.writerIndex());
- receivedMessage.decode(buffer);
+ receivedMessage.decodeFromWire(buffer);
+
+ //We store the position of the end of the encoded message, where the extra data starts - this
+ //will be needed if we re-deliver this packet, since we need to reset to there to rewrite the extra data
+ //for the different packet
+ receivedMessage.setEndMessagePosition(buffer.readerIndex());
// And we read extra data in the packet
requiresResponse = buffer.readBoolean();
-
- // We set reader index back to the beginning of the buffer so it can be easily read if then delivered
- // to a client, and we set writer index to just after where the headers/properties were encoded so that it can
- // be fileld in with extra data required when delivering the packet to the client (e.g. delivery count, consumer
- // id)
-
- buffer.setIndex(0, buffer.writerIndex() - DataConstants.SIZE_BOOLEAN);
}
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -60,4 +60,11 @@
boolean page(long transactionID, boolean duplicateDetection) throws Exception;
boolean storeIsPaging();
+
+ void setNeedsEncoding();
+
+
+ void setEndMessagePosition(int pos);
+
+ int getEndMessagePosition();
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -605,7 +605,7 @@
}
public synchronized void cancel(final MessageReference reference) throws Exception
- {
+ {
if (checkDLQ(reference))
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference))
@@ -1109,7 +1109,7 @@
* Attempt to deliver all the messages in the queue
*/
private synchronized void deliver()
- {
+ {
if (paused || handlers.isEmpty())
{
return;
@@ -1213,7 +1213,7 @@
}
private synchronized boolean directDeliver(final MessageReference reference)
- {
+ {
if (paused || handlers.isEmpty())
{
return false;
@@ -1308,13 +1308,13 @@
boolean add = false;
if (direct && !paused)
- {
+ {
// Deliver directly
boolean delivered = directDeliver(ref);
if (!delivered)
- {
+ {
add = true;
direct = false;
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -184,9 +184,9 @@
}
public HandleStatus handle(final MessageReference ref) throws Exception
- {
+ {
if (availableCredits != null && availableCredits.get() <= 0)
- {
+ {
return HandleStatus.BUSY;
}
@@ -347,7 +347,7 @@
promptDelivery(false);
ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(),
- HornetQChannelBuffers.EMPTY_BUFFER);
+ HornetQChannelBuffers.dynamicBuffer(100));
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
forcedDeliveryMessage.setDestination(messageQueue.getName());
@@ -409,7 +409,7 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
// No flow control
@@ -582,7 +582,7 @@
channel.send(packet);
if (availableCredits != null)
- {
+ {
availableCredits.addAndGet(-packet.getPacketSize());
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -16,6 +16,7 @@
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.core.buffers.HornetQChannelBuffers;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
import org.hornetq.core.message.impl.MessageImpl;
@@ -25,6 +26,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -62,8 +64,10 @@
* Construct a MessageImpl from storage
*/
public ServerMessageImpl(final long messageID)
- {
+ {
super(messageID);
+
+ log.info("creating server message from storage, with id " + messageID);
}
/*
@@ -74,8 +78,11 @@
super(messageID);
this.buffer = buffer;
+
+ //Must align the body after the packet headers
+ resetBuffer();
}
-
+
/*
* Copy constructor
*/
@@ -253,6 +260,8 @@
putLongProperty(HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
}
+
+ setNeedsEncoding();
}
public void setPagingStore(final PagingStore pagingStore)
@@ -305,15 +314,8 @@
}
}
- // EncodingSupport implementation
+
- // Used when storing to/from journal
-
- public void encode(HornetQBuffer buffer)
- {
-
- }
-
@Override
public String toString()
{
@@ -332,5 +334,83 @@
{
return null;
}
+
+
+
+ // Encoding stuff
+
+
+ public void setNeedsEncoding()
+ {
+ //This wil force the message to be re-encoded if it gets sent to a client
+ //Typically this is called after properties or headers are changed on the server side
+ this.encodedToBuffer = false;
+ }
+
+ private int endMessagePosition;
+
+ public void setEndMessagePosition(int pos)
+ {
+ this.endMessagePosition = pos;
+ }
+
+ public int getEndMessagePosition()
+ {
+ return this.endMessagePosition;
+ }
+
+ // EncodingSupport implementation
+
+ // Used when storing to/from journal
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ //Encode the message to a buffer for storage in the journal
+
+ if (this.encodedToBuffer)
+ {
+ //The body starts after the standard packet headers
+ int bodyStart = PacketImpl.PACKET_HEADERS_SIZE;
+
+ int end = this.endMessagePosition;
+
+ buffer.writeBytes(this.buffer, bodyStart, end);
+ }
+ else
+ {
+ //encodeToBuffer();
+
+ throw new IllegalStateException("Not encoded to buffer and storing to journal");
+ }
+ }
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ //TODO optimise
+
+ log.info("decoding server message");
+
+ this.buffer = HornetQChannelBuffers.dynamicBuffer(1500);
+
+ //work around Netty bug
+ this.buffer.writeByte((byte)0);
+
+ this.buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+
+ this.buffer.writeBytes(buffer, 0, buffer.readableBytes());
+
+
+ //Position to beginning of encoded message headers/properties
+
+ int msgHeadersPos = this.buffer.readInt(PacketImpl.PACKET_HEADERS_SIZE);
+
+ this.buffer.readerIndex(msgHeadersPos);
+
+ this.decodeHeadersAndProperties(this.buffer);
+
+ log.info("priority is now " + this.getPriority());
+
+
+ }
}
Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -13,6 +13,7 @@
package org.hornetq.integration.transports.netty;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
@@ -30,6 +31,9 @@
*/
public class ChannelBufferWrapper implements HornetQBuffer
{
+ private static final Logger log = Logger.getLogger(ChannelBufferWrapper.class);
+
+
private final ChannelBuffer buffer;
/**
@@ -280,8 +284,8 @@
* @see org.hornetq.core.remoting.spi.HornetQBuffer#readString()
*/
public String readString()
- {
- int len = readInt();
+ {
+ int len = readInt();
char[] chars = new char[len];
for (int i = 0; i < len; i++)
{
@@ -378,12 +382,12 @@
* @see org.hornetq.core.remoting.spi.HornetQBuffer#writeString(java.lang.String)
*/
public void writeString(final String val)
- {
+ {
writeInt(val.length());
for (int i = 0; i < val.length(); i++)
{
writeShort((short)val.charAt(i));
- }
+ }
}
/* (non-Javadoc)
Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -47,7 +47,7 @@
public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
{
assert pipeline != null;
- pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+ pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception
Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -27,8 +27,6 @@
/**
* A Netty decoder used to decode messages.
*
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
* @author <a href="tlee(a)redhat.com">Trustin Lee</a>
*
* @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, 금) $
@@ -49,7 +47,7 @@
{
if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
// XXX Length is unknown. Bet at 512. Tune this value.
- append(in, 512);
+ append(in, 1500);
return;
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -74,7 +74,7 @@
ServerMessage serverMessage = new ServerMessageImpl();
- serverMessage.decode(buffer);
+ serverMessage.decodeHeadersAndProperties(buffer);
int serverEncodeSize = serverMessage.getEncodeSize();
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -23,6 +23,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
@@ -78,57 +79,10 @@
ClientProducer producer = session.createProducer(QUEUE);
- final int numMessages = 10000;
+ final int numMessages = 10;
for (int i = 0; i < numMessages; i++)
- {
- /*
- * Like this:
- *
- * ClientMessage message = producer.createMessage(...);
- *
- * message.putStringProperty("foo", "bar");
- *
- * message.encodeToBuffer(); [this sets the destination from the producer, and encodes]
- *
- * message.getBuffer().writeString("testINVMCoreClient");
- *
- * message.send();
- *
- * OR, another option:
- *
- * Get rid of client producer altogether,
- *
- * Have send direct on the session, and destination must be set explicitly
- *
- * e.g.
- *
- * ClientMessage message = session.createMessage(...)
- *
- * message.putStringProperty("foo", "bar");
- *
- * message.setDestination("foo");
- *
- * message.writeBody();
- *
- * message.getBuffer().writeString("testINVMCoreClient");
- *
- * message.send();
- *
- *
- * ORRR
- *
- * we don't write the headers and properties until *AFTER* the body
- *
- * giving this format:
- * body length
- * body
- * headers + properties
- *
- * this means we don't need an encodeToBuffer() method!!
- *
- */
-
+ {
ClientMessage message = session.createClientMessage(HornetQTextMessage.TYPE,
false,
0,
@@ -143,8 +97,6 @@
message.setDestination(QUEUE);
- message.encodeToBuffer();
-
message.getBuffer().writeString("testINVMCoreClient");
producer.send(message);
@@ -159,10 +111,14 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message2 = consumer.receive();
+
+ HornetQBuffer buffer = message2.getBuffer();
+
+ assertEquals("testINVMCoreClient", buffer.readString());
- assertEquals("testINVMCoreClient", message2.getBuffer().readString());
-
message2.acknowledge();
+
+ log.info("got message " + i);
}
session.close();
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -61,7 +61,7 @@
ClientProducer producer = session.createProducer(address);
producer.send(session.createClientMessage(!durable));
-
+
restart();
session.start();
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -161,7 +161,7 @@
String str = new String(bytes);
- final int warmup = 50000;
+ final int warmup = 500000;
log.info("Warming up");
@@ -181,7 +181,7 @@
log.info("** WARMUP DONE");
- final int numMessages = 1000000;
+ final int numMessages = 2000000;
tm = sess.createTextMessage();
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -63,7 +63,7 @@
HornetQBuffer buffer = HornetQChannelBuffers.buffer(message.getEncodeSize());
message.encode(buffer);
Message message2 = new ClientMessageImpl(false);
- message2.decode(buffer);
+ message2.decodeHeadersAndProperties(buffer);
assertMessagesEquivalent(message, message2);
}
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-20 17:41:40 UTC (rev 8349)
@@ -353,7 +353,7 @@
public String getTextMessage(ClientMessage m)
{
- m.getBuffer().resetReaderIndex();
+ //m.getBuffer().resetReaderIndex();
return m.getBuffer().readString();
}
15 years, 1 month
JBoss hornetq SVN: r8348 - branches/ClebertTemporary/native/src.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-20 11:52:13 -0500 (Fri, 20 Nov 2009)
New Revision: 8348
Modified:
branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp
branches/ClebertTemporary/native/src/JNICallbackAdapter.h
branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp
branches/ClebertTemporary/native/src/Version.h
Log:
AIO order
Modified: branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp
===================================================================
--- branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp 2009-11-20 16:51:38 UTC (rev 8347)
+++ branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp 2009-11-20 16:52:13 UTC (rev 8348)
@@ -18,13 +18,20 @@
jobject nullObj = NULL;
-JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
+JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jint _sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
{
controller = _controller;
+
+ sequence = _sequence;
+
callback = _callback;
+
fileController = _fileController;
+
bufferReference = _bufferReference;
+
isRead = _isRead;
+
}
JNICallbackAdapter::~JNICallbackAdapter()
@@ -33,15 +40,19 @@
void JNICallbackAdapter::done(THREAD_CONTEXT threadContext)
{
- JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback, isRead ? nullObj : bufferReference);
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback, sequence, isRead ? nullObj : bufferReference);
+
release(threadContext);
}
void JNICallbackAdapter::onError(THREAD_CONTEXT threadContext, long errorCode, std::string error)
{
controller->log(threadContext, 0, "Libaio event generated errors, callback object was informed about it");
+
jstring strError = JNI_ENV(threadContext)->NewStringUTF(error.data());
- JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, isRead ? nullObj : bufferReference, (jint)errorCode, strError);
+
+ JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, sequence, isRead ? nullObj : bufferReference, (jint)errorCode, strError);
+
release(threadContext);
}
Modified: branches/ClebertTemporary/native/src/JNICallbackAdapter.h
===================================================================
--- branches/ClebertTemporary/native/src/JNICallbackAdapter.h 2009-11-20 16:51:38 UTC (rev 8347)
+++ branches/ClebertTemporary/native/src/JNICallbackAdapter.h 2009-11-20 16:52:13 UTC (rev 8348)
@@ -24,10 +24,17 @@
class JNICallbackAdapter : public CallbackAdapter
{
private:
+
AIOController * controller;
+
jobject callback;
+
jobject fileController;
+
jobject bufferReference;
+
+ jint sequence;
+
// Is this a read operation
short isRead;
@@ -43,7 +50,7 @@
public:
// _ob must be a global Reference (use createGloblReferente before calling the constructor)
- JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead);
+ JNICallbackAdapter(AIOController * _controller, jint sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead);
virtual ~JNICallbackAdapter();
void done(THREAD_CONTEXT threadContext);
Modified: branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp 2009-11-20 16:51:38 UTC (rev 8347)
+++ branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp 2009-11-20 16:52:13 UTC (rev 8348)
@@ -48,10 +48,10 @@
std::string fileName = convertJavaString(env, jstrFileName);
controller = new AIOController(fileName, (int) maxIO);
- controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;)V");
+ controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;ILjava/nio/ByteBuffer;)V");
if (!controller->done) return 0;
- controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/hornetq/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;ILjava/lang/String;)V");
+ controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/hornetq/core/asyncio/AIOCallback;ILjava/nio/ByteBuffer;ILjava/lang/String;)V");
if (!controller->error) return 0;
jclass loggerClass = env->GetObjectClass(logger);
@@ -97,7 +97,7 @@
return;
}
- CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), true);
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, -1, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), true);
controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
}
@@ -166,7 +166,7 @@
}
JNIEXPORT void JNICALL Java_org_hornetq_core_asyncio_impl_AsynchronousFileImpl_write
- (JNIEnv *env, jobject objThis, jlong controllerAddress, jlong position, jlong size, jobject jbuffer, jobject callback)
+ (JNIEnv *env, jobject objThis, jlong controllerAddress, jint sequence, jlong position, jlong size, jobject jbuffer, jobject callback)
{
try
{
@@ -180,7 +180,7 @@
}
- CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), false);
+ CallbackAdapter * adapter = new JNICallbackAdapter(controller, sequence, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), false);
controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
}
Modified: branches/ClebertTemporary/native/src/Version.h
===================================================================
--- branches/ClebertTemporary/native/src/Version.h 2009-11-20 16:51:38 UTC (rev 8347)
+++ branches/ClebertTemporary/native/src/Version.h 2009-11-20 16:52:13 UTC (rev 8348)
@@ -1,5 +1,5 @@
#ifndef _VERSION_NATIVE_AIO
-#define _VERSION_NATIVE_AIO 25
+#define _VERSION_NATIVE_AIO 26
#endif
15 years, 1 month
JBoss hornetq SVN: r8347 - in branches/ClebertTemporary/tests/src/org/hornetq/tests: unit/core/asyncio and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-20 11:51:38 -0500 (Fri, 20 Nov 2009)
New Revision: 8347
Added:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/SimpleClientTest.java
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
Log:
AIO order
Added: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/SimpleClientTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/SimpleClientTest.java (rev 0)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/SimpleClientTest.java 2009-11-20 16:51:38 UTC (rev 8347)
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * The basic test possible. Useful to validate basic stuff.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * TODO: Don't commit this test on the main branch
+ */
+public class SimpleClientTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testSimpleTestNoTransacted() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory factory = createInVMFactory();
+
+ ClientSession session = null;
+
+ try
+ {
+ session = factory.createSession(false, true, true);
+
+ session.createQueue("A", "A", true);
+ ClientProducer producer = session.createProducer("A");
+ producer.send(session.createClientMessage(true));
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer("A");
+
+ ClientMessage msg = cons.receive(10000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ }
+ finally
+ {
+ session.close();
+ factory.close();
+ server.stop();
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-20 16:50:48 UTC (rev 8346)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-20 16:51:38 UTC (rev 8347)
@@ -58,8 +58,6 @@
ExecutorService executor;
- ExecutorService callbackExecutor;
-
ExecutorService pollerExecutor;
@@ -74,7 +72,6 @@
{
super.setUp();
pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this), false));
- callbackExecutor = Executors.newSingleThreadExecutor();
executor = Executors.newSingleThreadExecutor();
}
@@ -82,7 +79,6 @@
{
executor.shutdown();
pollerExecutor.shutdown();
- callbackExecutor.shutdown();
super.tearDown();
}
@@ -92,7 +88,7 @@
* */
public void testOpenClose() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
for (int i = 0; i < 1000; i++)
{
controller.open(FILE_NAME, 10000);
@@ -103,7 +99,7 @@
public void testFileNonExistent() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
for (int i = 0; i < 1000; i++)
{
try
@@ -133,8 +129,8 @@
*/
public void testTwoFiles() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
- final AsynchronousFileImpl controller2 = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
+ final AsynchronousFileImpl controller2 = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME + ".1", 10000);
controller2.open(FILE_NAME + ".2", 10000);
@@ -246,7 +242,7 @@
}
}
- AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
ByteBuffer buffer = null;
try
{
@@ -256,7 +252,7 @@
controller.open(FILE_NAME, 10);
controller.close();
- controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ controller = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME, 10);
@@ -339,7 +335,7 @@
public void testBufferCallbackUniqueBuffers() throws Exception
{
boolean closed = false;
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
try
{
final int NUMBER_LINES = 1000;
@@ -419,7 +415,7 @@
public void testBufferCallbackAwaysSameBuffer() throws Exception
{
boolean closed = false;
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
ByteBuffer buffer = null;
try
{
@@ -497,7 +493,7 @@
public void testRead() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
controller.setBufferCallback(new BufferCallback()
{
@@ -604,7 +600,7 @@
* The file is also read after being written to validate its correctness */
public void testConcurrentClose() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
try
{
@@ -708,7 +704,7 @@
private void asyncData(final int numberOfLines, final int size, final int aioLimit) throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME, aioLimit);
ByteBuffer buffer = null;
@@ -790,7 +786,7 @@
final int NUMBER_LINES = 3000;
final int SIZE = 1024;
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME, 2000);
buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -838,7 +834,7 @@
public void testInvalidWrite() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
controller.open(FILE_NAME, 2000);
ByteBuffer buffer = null;
@@ -939,7 +935,7 @@
public void testSize() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl(executor, pollerExecutor);
final int NUMBER_LINES = 10;
final int SIZE = 1024;
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-11-20 16:50:48 UTC (rev 8346)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-11-20 16:51:38 UTC (rev 8347)
@@ -58,8 +58,6 @@
ExecutorService executor;
- ExecutorService callbackExecutor;
-
ExecutorService pollerExecutor;
@@ -74,7 +72,6 @@
{
super.setUp();
pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this), false));
- callbackExecutor = Executors.newSingleThreadExecutor();
executor = Executors.newSingleThreadExecutor();
}
@@ -98,7 +95,7 @@
private void executeTest(final boolean sync) throws Throwable
{
debug(sync ? "Sync test:" : "Async test");
- AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl(executor, pollerExecutor, callbackExecutor);
+ AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl(executor, pollerExecutor);
jlibAIO.open(FILE_NAME, 21000);
try
{
15 years, 1 month
JBoss hornetq SVN: r8346 - in branches/ClebertTemporary/src/main/org/hornetq/core: journal/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-20 11:50:48 -0500 (Fri, 20 Nov 2009)
New Revision: 8346
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
Log:
AIO order
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20 16:50:48 UTC (rev 8346)
@@ -46,8 +46,15 @@
private static boolean loaded = false;
- private static int EXPECTED_NATIVE_VERSION = 25;
+ private static int EXPECTED_NATIVE_VERSION = 26;
+
+ /** Used to determine the next writing sequence */
+ private AtomicInteger nextWritingSequence = new AtomicInteger(0);
+ /** Used to determine the next writing sequence.
+ * This is accessed from a single thread (the Poller Thread) */
+ private int readSequence = 0;
+
public static void addMax(final int io)
{
totalMaxIO.addAndGet(io);
@@ -149,10 +156,6 @@
// AIO using a single thread.
private final Executor writeExecutor;
- // We can't use the same thread on the callbacks
- // as the callbacks may perform other IO operations back what could cause dead locks
- private final Executor callbackExecutor;
-
private final Executor pollerExecutor;
// AsynchronousFile implementation ---------------------------------------------------
@@ -161,11 +164,10 @@
* @param writeExecutor It needs to be a single Thread executor. If null it will use the user thread to execute write operations
* @param pollerExecutor The thread pool that will initialize poller handlers
*/
- public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor, final Executor callbackExecutor)
+ public AsynchronousFileImpl(final Executor writeExecutor, final Executor pollerExecutor)
{
this.writeExecutor = writeExecutor;
this.pollerExecutor = pollerExecutor;
- this.callbackExecutor = callbackExecutor;
}
public void open(final String fileName, final int maxIO) throws HornetQException
@@ -207,6 +209,8 @@
}
opened = true;
addMax(this.maxIO);
+ nextWritingSequence.set(0);
+ readSequence = 0;
}
finally
{
@@ -278,18 +282,20 @@
public void run()
{
writeSemaphore.acquireUninterruptibly();
+
+ int sequence = nextWritingSequence.getAndIncrement();
try
{
- write(handler, position, size, directByteBuffer, aioCallback);
+ write(handler, sequence, position, size, directByteBuffer, aioCallback);
}
catch (HornetQException e)
{
- callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer, e.getCode(), e.getMessage());
}
catch (RuntimeException e)
{
- callbackError(aioCallback, directByteBuffer, HornetQException.INTERNAL_ERROR, e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer, HornetQException.INTERNAL_ERROR, e.getMessage());
}
}
});
@@ -298,17 +304,19 @@
{
writeSemaphore.acquireUninterruptibly();
+ int sequence = nextWritingSequence.getAndIncrement();
+
try
{
- write(handler, position, size, directByteBuffer, aioCallback);
+ write(handler, sequence, position, size, directByteBuffer, aioCallback);
}
catch (HornetQException e)
{
- callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer, e.getCode(), e.getMessage());
}
catch (RuntimeException e)
{
- callbackError(aioCallback, directByteBuffer, HornetQException.INTERNAL_ERROR, e.getMessage());
+ callbackError(aioCallback, sequence, directByteBuffer, HornetQException.INTERNAL_ERROR, e.getMessage());
}
}
@@ -419,17 +427,11 @@
@SuppressWarnings("unused")
// Called by the JNI layer.. just ignore the
// warning
- private void callbackDone(final AIOCallback callback, final ByteBuffer buffer)
+ private void callbackDone(final AIOCallback callback, final int sequence, final ByteBuffer buffer)
{
writeSemaphore.release();
pendingWrites.down();
- callbackExecutor.execute(new Runnable()
- {
- public void run()
- {
- callback.done();
- }
- });
+ callback.done();
// The buffer is not sent on callback for read operations
if (bufferCallback != null && buffer != null)
@@ -440,7 +442,7 @@
// Called by the JNI layer.. just ignore the
// warning
- private void callbackError(final AIOCallback callback, final ByteBuffer buffer, final int errorCode, final String errorMessage)
+ private void callbackError(final AIOCallback callback, final int sequence, final ByteBuffer buffer, final int errorCode, final String errorMessage)
{
log.warn("CallbackError: " + errorMessage);
writeSemaphore.release();
@@ -527,7 +529,7 @@
private native long size0(long handle) throws HornetQException;
- private native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
+ private native void write(long handle, int sequence, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
private native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-20 16:50:48 UTC (rev 8346)
@@ -60,11 +60,10 @@
final String fileName,
final int maxIO,
final BufferCallback bufferCallback,
- final Executor callbackExecutor,
final Executor writerExecutor,
final Executor pollerExecutor)
{
- super(callbackExecutor, directory, new File(directory + "/" + fileName), factory);
+ super(directory, new File(directory + "/" + fileName), factory);
this.maxIO = maxIO;
this.writerExecutor = writerExecutor;
this.bufferCallback = bufferCallback;
@@ -94,7 +93,7 @@
public SequentialFile copy()
{
- return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(), maxIO, bufferCallback, callbackExecutor, writerExecutor, pollerExecutor);
+ return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(), maxIO, bufferCallback, writerExecutor, pollerExecutor);
}
public synchronized void close() throws Exception
@@ -197,7 +196,7 @@
public synchronized void open(final int currentMaxIO) throws Exception
{
opened = true;
- aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor, callbackExecutor);
+ aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor);
aioFile.open(getFile().getAbsolutePath(), currentMaxIO);
position.set(0);
aioFile.setBufferCallback(bufferCallback);
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-20 16:50:48 UTC (rev 8346)
@@ -84,7 +84,6 @@
fileName,
maxIO,
buffersControl.callback,
- callbacksExecutor,
writeExecutor,
pollerExecutor);
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-20 16:50:48 UTC (rev 8346)
@@ -44,11 +44,6 @@
private final String directory;
- /** We can't execute callbacks while inside the locks, as more IO operations could be performed, what could cause serious dead locks. */
- protected final Executor callbackExecutor;
-
-
-
protected final SequentialFileFactory factory;
protected long fileSize = 0;
@@ -69,13 +64,12 @@
* @param file
* @param directory
*/
- public AbstractSequentialFile(final Executor executor, final String directory, final File file, final SequentialFileFactory factory)
+ public AbstractSequentialFile(final String directory, final File file, final SequentialFileFactory factory)
{
super();
this.file = file;
this.directory = directory;
this.factory = factory;
- this.callbackExecutor = executor;
}
// Public --------------------------------------------------------
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-20 16:50:48 UTC (rev 8346)
@@ -41,14 +41,14 @@
private RandomAccessFile rfile;
- public NIOSequentialFile(final SequentialFileFactory factory, final Executor executor, final String directory, final String fileName)
+ public NIOSequentialFile(final SequentialFileFactory factory, final String directory, final String fileName)
{
- super(executor, directory, new File(directory + "/" + fileName), factory);
+ super(directory, new File(directory + "/" + fileName), factory);
}
public NIOSequentialFile(final SequentialFileFactory factory, final File file)
{
- super(null, file.getParent(), new File(file.getPath()), factory);
+ super(file.getParent(), new File(file.getPath()), factory);
}
public int getAlignment()
@@ -240,20 +240,7 @@
if (callback != null)
{
- if (callbackExecutor == null)
- {
- callback.done();
- }
- else
- {
- callbackExecutor.execute(new Runnable()
- {
- public void run()
- {
- callback.done();
- }
- });
- }
+ callback.done();
}
}
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-20 16:50:48 UTC (rev 8346)
@@ -66,7 +66,7 @@
// maxIO is ignored on NIO
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
- return new NIOSequentialFile(this, this.callbacksExecutor, journalDir, fileName);
+ return new NIOSequentialFile(this, journalDir, fileName);
}
public boolean isSupportsCallbacks()
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-20 16:44:22 UTC (rev 8345)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-20 16:50:48 UTC (rev 8346)
@@ -13,6 +13,7 @@
package org.hornetq.core.persistence.impl.journal;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -116,12 +117,15 @@
{
if (stored >= minimalStore && replicated >= minimalReplicated)
{
- for (TaskHolder holder : tasks)
+ Iterator<TaskHolder> iter = tasks.iterator();
+ while (iter.hasNext())
{
+ TaskHolder holder = iter.next();
if (!holder.executed && stored >= holder.storeLined && replicated >= holder.replicationLined)
{
holder.executed = true;
holder.task.done();
+ iter.remove();
}
}
}
15 years, 1 month
JBoss hornetq SVN: r8345 - branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-20 11:44:22 -0500 (Fri, 20 Nov 2009)
New Revision: 8345
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
tweaks
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-20 15:05:02 UTC (rev 8344)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-20 16:44:22 UTC (rev 8345)
@@ -450,7 +450,7 @@
if (runItNow)
{
- repliToken.done();
+ repliToken.replicationDone();
}
}
@@ -499,7 +499,7 @@
}
if (executeNow)
{
- context.done();
+ context.replicationDone();
}
}
15 years, 1 month
JBoss hornetq SVN: r8344 - branches/20-optimisation/src/main/org/hornetq/integration/transports/netty.
by do-not-reply@jboss.org
Author: trustin
Date: 2009-11-20 10:05:02 -0500 (Fri, 20 Nov 2009)
New Revision: 8344
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
Log:
Added the markers which points to the location where additional performance tuning could be done
Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20 15:03:21 UTC (rev 8343)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20 15:05:02 UTC (rev 8344)
@@ -48,7 +48,8 @@
if (previousData.readable())
{
if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
- append(in, 512); // Length is unknown. Bet at 512.
+ // XXX Length is unknown. Bet at 512. Tune this value.
+ append(in, 512);
return;
}
@@ -104,6 +105,9 @@
previousData.writeBytes(in, length + 4 - previousData.readableBytes());
frame = previousData;
} else {
+ // XXX Tune this value: Increasing the initial capacity of the
+ // dynamic buffer might reduce the chance of additional memory
+ // copy.
frame = ChannelBuffers.dynamicBuffer(length + 4);
frame.writeBytes(previousData, previousData.readerIndex(), previousData.readableBytes());
frame.writeBytes(in, length + 4 - frame.writerIndex());
@@ -148,6 +152,8 @@
}
// Convert to dynamic buffer (this requires copy)
+ // XXX Tune this value: Increasing the initial capacity of the dynamic
+ // buffer might reduce the chance of additional memory copy.
ChannelBuffer frame = ChannelBuffers.dynamicBuffer(length + SIZE_INT);
frame.writeBytes(in, length + SIZE_INT);
frame.skipBytes(SIZE_INT);
15 years, 1 month
JBoss hornetq SVN: r8343 - branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-20 10:03:21 -0500 (Fri, 20 Nov 2009)
New Revision: 8343
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
Log:
Stopping a server on finalize
Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-20 15:01:10 UTC (rev 8342)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeStartTest.java 2009-11-20 15:03:21 UTC (rev 8343)
@@ -83,133 +83,140 @@
}
HornetQServer server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
- final String testAddress = "testAddress";
- final String queueName0 = "queue0";
- final String forwardAddress = "forwardAddress";
- final String queueName1 = "queue1";
+ try
+ {
- Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
- TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
- TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
- connectors.put(server1tc.getName(), server1tc);
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
- server0.getConfiguration().setConnectorConfigurations(connectors);
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
- Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
+ server0.getConfiguration().setConnectorConfigurations(connectors);
- final String bridgeName = "bridge1";
+ Pair<String, String> connectorPair = new Pair<String, String>(server1tc.getName(), null);
- BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
- queueName0,
- forwardAddress,
- null,
- null,
- 1000,
- 1d,
- 0,
- true,
- true,
- 1024,
- connectorPair);
+ final String bridgeName = "bridge1";
- List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
- bridgeConfigs.add(bridgeConfiguration);
- server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration(bridgeName,
+ queueName0,
+ forwardAddress,
+ null,
+ null,
+ 1000,
+ 1d,
+ 0,
+ true,
+ true,
+ 1024,
+ connectorPair);
- QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress, queueName0, null, true);
- List<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
- queueConfigs0.add(queueConfig0);
- server0.getConfiguration().setQueueConfigurations(queueConfigs0);
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
- QueueConfiguration queueConfig1 = new QueueConfiguration(forwardAddress, queueName1, null, true);
- List<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
- queueConfigs1.add(queueConfig1);
- server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+ QueueConfiguration queueConfig0 = new QueueConfiguration(testAddress, queueName0, null, true);
+ List<QueueConfiguration> queueConfigs0 = new ArrayList<QueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ server0.getConfiguration().setQueueConfigurations(queueConfigs0);
- server1.start();
- server0.start();
+ QueueConfiguration queueConfig1 = new QueueConfiguration(forwardAddress, queueName1, null, true);
+ List<QueueConfiguration> queueConfigs1 = new ArrayList<QueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
- ClientSessionFactory sf0 = new ClientSessionFactoryImpl(server0tc);
+ server1.start();
+ server0.start();
- ClientSessionFactory sf1 = new ClientSessionFactoryImpl(server1tc);
+ ClientSessionFactory sf0 = new ClientSessionFactoryImpl(server0tc);
- ClientSession session0 = sf0.createSession(false, true, true);
+ ClientSessionFactory sf1 = new ClientSessionFactoryImpl(server1tc);
- ClientSession session1 = sf1.createSession(false, true, true);
+ ClientSession session0 = sf0.createSession(false, true, true);
- ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
+ ClientSession session1 = sf1.createSession(false, true, true);
- ClientConsumer consumer1 = session1.createConsumer(queueName1);
+ ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
- session1.start();
+ ClientConsumer consumer1 = session1.createConsumer(queueName1);
- final int numMessages = 10;
+ session1.start();
- final SimpleString propKey = new SimpleString("testkey");
+ final int numMessages = 10;
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
+ final SimpleString propKey = new SimpleString("testkey");
- message.putIntProperty(propKey, i);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
- producer0.send(message);
- }
+ message.putIntProperty(propKey, i);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(200);
+ producer0.send(message);
+ }
- assertNotNull(message);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(200);
- assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
+ assertNotNull(message);
- message.acknowledge();
- }
+ assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
- assertNull(consumer1.receiveImmediate());
+ message.acknowledge();
+ }
- Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+ assertNull(consumer1.receiveImmediate());
- bridge.stop();
+ Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session0.createClientMessage(false);
+ bridge.stop();
- message.putIntProperty(propKey, i);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session0.createClientMessage(false);
- producer0.send(message);
- }
+ message.putIntProperty(propKey, i);
- assertNull(consumer1.receiveImmediate());
+ producer0.send(message);
+ }
- bridge.start();
+ assertNull(consumer1.receiveImmediate());
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer1.receive(1000);
+ bridge.start();
- assertNotNull(message);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer1.receive(1000);
- assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
+ assertNotNull(message);
- message.acknowledge();
- }
+ assertEquals((Integer)i, (Integer)message.getObjectProperty(propKey));
- assertNull(consumer1.receiveImmediate());
+ message.acknowledge();
+ }
- session0.close();
+ assertNull(consumer1.receiveImmediate());
- session1.close();
+ session0.close();
- sf0.close();
+ session1.close();
- sf1.close();
+ sf0.close();
- server0.stop();
+ sf1.close();
+ }
+ finally
+ {
- server1.stop();
+ server0.stop();
+
+ server1.stop();
+ }
}
public void testTargetServerUpAndDown() throws Exception
15 years, 1 month
JBoss hornetq SVN: r8342 - branches/20-optimisation/src/main/org/hornetq/integration/transports/netty.
by do-not-reply@jboss.org
Author: trustin
Date: 2009-11-20 10:01:10 -0500 (Fri, 20 Nov 2009)
New Revision: 8342
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
Log:
Optimized imports
Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20 15:00:41 UTC (rev 8341)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20 15:01:10 UTC (rev 8342)
@@ -15,22 +15,14 @@
import static org.hornetq.utils.DataConstants.SIZE_INT;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-import org.hornetq.core.remoting.impl.AbstractBufferHandler;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.buffer.DynamicChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
/**
* A Netty decoder used to decode messages.
15 years, 1 month