Author: clebert.suconic(a)jboss.com
Date: 2009-11-20 13:16:16 -0500 (Fri, 20 Nov 2009)
New Revision: 8350
Modified:
branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp
branches/ClebertTemporary/native/src/JNICallbackAdapter.h
branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
Log:
AIO order
Modified: branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp
===================================================================
--- branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp 2009-11-20 17:41:40 UTC
(rev 8349)
+++ branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp 2009-11-20 18:16:16 UTC
(rev 8350)
@@ -18,7 +18,7 @@
jobject nullObj = NULL;
-JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jint _sequence,
jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) :
CallbackAdapter()
+JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jlong _sequence,
jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) :
CallbackAdapter()
{
controller = _controller;
Modified: branches/ClebertTemporary/native/src/JNICallbackAdapter.h
===================================================================
--- branches/ClebertTemporary/native/src/JNICallbackAdapter.h 2009-11-20 17:41:40 UTC (rev
8349)
+++ branches/ClebertTemporary/native/src/JNICallbackAdapter.h 2009-11-20 18:16:16 UTC (rev
8350)
@@ -33,7 +33,7 @@
jobject bufferReference;
- jint sequence;
+ jlong sequence;
// Is this a read operation
short isRead;
@@ -50,7 +50,7 @@
public:
// _ob must be a global Reference (use createGloblReferente before calling the
constructor)
- JNICallbackAdapter(AIOController * _controller, jint sequence, jobject _callback,
jobject _fileController, jobject _bufferReference, short _isRead);
+ JNICallbackAdapter(AIOController * _controller, jlong sequence, jobject _callback,
jobject _fileController, jobject _bufferReference, short _isRead);
virtual ~JNICallbackAdapter();
void done(THREAD_CONTEXT threadContext);
Modified: branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp 2009-11-20 17:41:40
UTC (rev 8349)
+++ branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp 2009-11-20 18:16:16
UTC (rev 8350)
@@ -48,10 +48,10 @@
std::string fileName = convertJavaString(env, jstrFileName);
controller = new AIOController(fileName, (int) maxIO);
- controller->done =
env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;ILjava/nio/ByteBuffer;)V");
+ controller->done =
env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;)V");
if (!controller->done) return 0;
- controller->error = env->GetMethodID(clazz, "callbackError",
"(Lorg/hornetq/core/asyncio/AIOCallback;ILjava/nio/ByteBuffer;ILjava/lang/String;)V");
+ controller->error = env->GetMethodID(clazz, "callbackError",
"(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;ILjava/lang/String;)V");
if (!controller->error) return 0;
jclass loggerClass = env->GetObjectClass(logger);
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-11-20
18:16:16 UTC (rev 8350)
@@ -18,6 +18,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -49,11 +50,11 @@
private static int EXPECTED_NATIVE_VERSION = 26;
/** Used to determine the next writing sequence */
- private AtomicInteger nextWritingSequence = new AtomicInteger(0);
+ private AtomicLong nextWritingSequence = new AtomicLong(0);
/** Used to determine the next writing sequence.
* This is accessed from a single thread (the Poller Thread) */
- private int readSequence = 0;
+ private long readSequence = 0;
public static void addMax(final int io)
{
@@ -283,7 +284,7 @@
{
writeSemaphore.acquireUninterruptibly();
- int sequence = nextWritingSequence.getAndIncrement();
+ long sequence = nextWritingSequence.getAndIncrement();
try
{
@@ -304,7 +305,7 @@
{
writeSemaphore.acquireUninterruptibly();
- int sequence = nextWritingSequence.getAndIncrement();
+ long sequence = nextWritingSequence.getAndIncrement();
try
{
@@ -427,7 +428,7 @@
@SuppressWarnings("unused")
// Called by the JNI layer.. just ignore the
// warning
- private void callbackDone(final AIOCallback callback, final int sequence, final
ByteBuffer buffer)
+ private void callbackDone(final AIOCallback callback, final long sequence, final
ByteBuffer buffer)
{
writeSemaphore.release();
pendingWrites.down();
@@ -442,7 +443,7 @@
// Called by the JNI layer.. just ignore the
// warning
- private void callbackError(final AIOCallback callback, final int sequence, final
ByteBuffer buffer, final int errorCode, final String errorMessage)
+ private void callbackError(final AIOCallback callback, final long sequence, final
ByteBuffer buffer, final int errorCode, final String errorMessage)
{
log.warn("CallbackError: " + errorMessage);
writeSemaphore.release();
@@ -529,7 +530,7 @@
private native long size0(long handle) throws HornetQException;
- private native void write(long handle, int sequence, long position, long size,
ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
+ private native void write(long handle, long sequence, long position, long size,
ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
private native void read(long handle, long position, long size, ByteBuffer buffer,
AIOCallback aioPackage) throws HornetQException;
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-20
17:41:40 UTC (rev 8349)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java 2009-11-20
18:16:16 UTC (rev 8350)
@@ -15,6 +15,8 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -84,20 +86,36 @@
protected static class CountDownCallback implements AIOCallback
{
private final CountDownLatch latch;
+
+ private final List<Integer> outputList;
+
+ private final int order;
+
+ private final AtomicInteger errors;
- public CountDownCallback(final CountDownLatch latch)
+ public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors,
final List<Integer> outputList, final int order)
{
this.latch = latch;
+
+ this.outputList = outputList;
+
+ this.order = order;
+
+ this.errors = errors;
}
volatile boolean doneCalled = false;
- volatile boolean errorCalled = false;
+ volatile int errorCalled = 0;
final AtomicInteger timesDoneCalled = new AtomicInteger(0);
public void done()
{
+ if (outputList != null)
+ {
+ outputList.add(order);
+ }
doneCalled = true;
timesDoneCalled.incrementAndGet();
if (latch != null)
@@ -108,7 +126,15 @@
public void onError(final int errorCode, final String errorMessage)
{
- errorCalled = true;
+ errorCalled++;
+ if (outputList != null)
+ {
+ outputList.add(order);
+ }
+ if (errors != null)
+ {
+ errors.incrementAndGet();
+ }
if (latch != null)
{
// even thought an error happened, we need to inform the latch,
@@ -116,6 +142,16 @@
latch.countDown();
}
}
+
+ public static void checkResults(final int numberOfElements, final
ArrayList<Integer> result)
+ {
+ assertEquals(numberOfElements, result.size());
+ int i = 0;
+ for (Integer resultI : result)
+ {
+ assertEquals(i++, resultI.intValue());
+ }
+ }
}
}
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-20
17:41:40 UTC (rev 8349)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-11-20
18:16:16 UTC (rev 8350)
@@ -136,6 +136,11 @@
int numberOfLines = 1000;
int size = 1024;
+
+ ArrayList<Integer> listResult1 = new ArrayList<Integer>();
+ ArrayList<Integer> listResult2 = new ArrayList<Integer>();
+
+ AtomicInteger errors = new AtomicInteger(0);
ByteBuffer buffer = null;
try
@@ -154,41 +159,43 @@
for (int i = 0; i < numberOfLines; i++)
{
- list.add(new CountDownCallback(latchDone));
- list2.add(new CountDownCallback(latchDone2));
+ list.add(new CountDownCallback(latchDone, errors, listResult1, i));
+ list2.add(new CountDownCallback(latchDone2, errors, listResult2, i));
}
- long valueInitial = System.currentTimeMillis();
-
int counter = 0;
+
Iterator<CountDownCallback> iter2 = list2.iterator();
- for (CountDownCallback tmp : list)
+ for (CountDownCallback cb1 : list)
{
- CountDownCallback tmp2 = iter2.next();
+ CountDownCallback cb2 = iter2.next();
- controller.write(counter * size, size, buffer, tmp);
- controller.write(counter * size, size, buffer, tmp2);
+ controller.write(counter * size, size, buffer, cb1);
+ controller2.write(counter * size, size, buffer, cb2);
++counter;
}
latchDone.await();
latchDone2.await();
+
+ CountDownCallback.checkResults(numberOfLines, listResult1);
+ CountDownCallback.checkResults(numberOfLines, listResult2);
for (CountDownCallback callback : list)
{
assertEquals(1, callback.timesDoneCalled.get());
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
}
for (CountDownCallback callback : list2)
{
assertEquals(1, callback.timesDoneCalled.get());
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
}
+
+ assertEquals(0, errors.get());
controller.close();
}
@@ -358,7 +365,7 @@
controller.setBufferCallback(bufferCallback);
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
+ ArrayList<Integer> result = new ArrayList<Integer>();
for (int i = 0; i < NUMBER_LINES; i++)
{
ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -367,6 +374,7 @@
{
buffer.put((byte)(j % Byte.MAX_VALUE));
}
+ CountDownCallback aio = new CountDownCallback(latch, null, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -379,7 +387,7 @@
controller.close();
closed = true;
- assertEquals(NUMBER_LINES, buffers.size());
+ CountDownCallback.checkResults(NUMBER_LINES, result);
// Make sure all the buffers are unique
ByteBuffer lineOne = null;
@@ -439,7 +447,6 @@
controller.setBufferCallback(bufferCallback);
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
buffer = AsynchronousFileImpl.newBuffer(SIZE);
buffer.rewind();
@@ -448,8 +455,11 @@
buffer.put((byte)(j % Byte.MAX_VALUE));
}
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
for (int i = 0; i < NUMBER_LINES; i++)
{
+ CountDownCallback aio = new CountDownCallback(latch, null, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -462,6 +472,8 @@
controller.close();
closed = true;
+ CountDownCallback.checkResults(NUMBER_LINES, result);
+
assertEquals(NUMBER_LINES, buffers.size());
// Make sure all the buffers are unique
@@ -517,7 +529,9 @@
{
CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
+ AtomicInteger errors = new AtomicInteger(0);
for (int i = 0; i < NUMBER_LINES; i++)
{
@@ -531,12 +545,15 @@
buffer.put(getSamplebyte(j));
}
+ CountDownCallback aio = new CountDownCallback(latch, errors, result, i);
controller.write(i * SIZE, SIZE, buffer, aio);
}
latch.await();
- assertFalse(aio.errorCalled);
- assertEquals(NUMBER_LINES, aio.timesDoneCalled.get());
+
+ assertEquals(0, errors.get());
+
+ CountDownCallback.checkResults(NUMBER_LINES, result);
}
// If you call close you're supposed to wait events to finish before
@@ -557,12 +574,13 @@
AsynchronousFileImpl.clearBuffer(readBuffer);
CountDownLatch latch = new CountDownLatch(1);
- CountDownCallback aio = new CountDownCallback(latch);
+ AtomicInteger errors = new AtomicInteger(0);
+ CountDownCallback aio = new CountDownCallback(latch, errors, null, 0);
controller.read(i * SIZE, SIZE, readBuffer, aio);
latch.await();
- assertFalse(aio.errorCalled);
+ assertEquals(0, errors.get());
assertTrue(aio.doneCalled);
byte bytesRead[] = new byte[SIZE];
@@ -634,7 +652,7 @@
}
buffer.put((byte)'\n');
- CountDownCallback aio = new CountDownCallback(readLatch);
+ CountDownCallback aio = new CountDownCallback(readLatch, null, null, 0);
controller.write(i * SIZE, SIZE, buffer, aio);
}
@@ -663,10 +681,10 @@
newBuffer.put((byte)'\n');
CountDownLatch latch = new CountDownLatch(1);
- CountDownCallback aio = new CountDownCallback(latch);
+ CountDownCallback aio = new CountDownCallback(latch, null, null, 0);
controller.read(i * SIZE, SIZE, buffer, aio);
latch.await();
- assertFalse(aio.errorCalled);
+ assertEquals(0, aio.errorCalled);
assertTrue(aio.doneCalled);
byte bytesRead[] = new byte[SIZE];
@@ -720,9 +738,11 @@
ArrayList<CountDownCallback> list = new
ArrayList<CountDownCallback>();
+ ArrayList<Integer> result = new ArrayList<Integer>();
+
for (int i = 0; i < numberOfLines; i++)
{
- list.add(new CountDownCallback(latchDone));
+ list.add(new CountDownCallback(latchDone, null, result, i));
}
long valueInitial = System.currentTimeMillis();
@@ -743,6 +763,9 @@
latchDone.await();
long timeTotal = System.currentTimeMillis() - valueInitial;
+
+ CountDownCallback.checkResults(numberOfLines, result);
+
debug("After completions time = " + timeTotal +
" for " +
numberOfLines +
@@ -759,7 +782,7 @@
{
assertEquals(1, tmp.timesDoneCalled.get());
assertTrue(tmp.doneCalled);
- assertFalse(tmp.errorCalled);
+ assertEquals(0, tmp.errorCalled);
}
controller.close();
@@ -799,11 +822,11 @@
for (int i = 0; i < NUMBER_LINES; i++)
{
CountDownLatch latchDone = new CountDownLatch(1);
- CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null,
0);
controller.write(i * 512, 512, buffer, aioBlock);
latchDone.await();
assertTrue(aioBlock.doneCalled);
- assertFalse(aioBlock.errorCalled);
+ assertEquals(0, aioBlock.errorCalled);
}
long timeTotal = System.currentTimeMillis() - startTime;
@@ -850,12 +873,12 @@
CountDownLatch latchDone = new CountDownLatch(1);
- CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null, 0);
controller.write(11, 512, buffer, aioBlock);
latchDone.await();
- assertTrue(aioBlock.errorCalled);
+ assertTrue(aioBlock.errorCalled != 0);
assertFalse(aioBlock.doneCalled);
}
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-11-20
17:41:40 UTC (rev 8349)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-11-20
18:16:16 UTC (rev 8350)
@@ -40,7 +40,7 @@
* */
public class MultiThreadAsynchronousFileTest extends AIOTestBase
{
-
+
public static TestSuite suite()
{
return createAIOTestSuite(MultiThreadAsynchronousFileTest.class);
@@ -57,34 +57,32 @@
static final int NUMBER_OF_LINES = 1000;
ExecutorService executor;
-
+
ExecutorService pollerExecutor;
-
private static void debug(final String msg)
{
log.debug(msg);
}
-
-
protected void setUp() throws Exception
{
super.setUp();
- pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
false));
+ pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
+ false));
executor = Executors.newSingleThreadExecutor();
}
-
+
protected void tearDown() throws Exception
{
executor.shutdown();
pollerExecutor.shutdown();
super.tearDown();
}
-
+
public void testMultipleASynchronousWrites() throws Throwable
{
- executeTest(false);
+ executeTest(false);
}
public void testMultipleSynchronousWrites() throws Throwable
@@ -130,15 +128,15 @@
long endTime = System.currentTimeMillis();
debug((sync ? "Sync result:" : "Async result:") + "
Records/Second = " +
- NUMBER_OF_THREADS *
- NUMBER_OF_LINES *
- 1000 /
- (endTime - startTime) +
- " total time = " +
- (endTime - startTime) +
- " total number of records = " +
- NUMBER_OF_THREADS *
- NUMBER_OF_LINES);
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES *
+ 1000 /
+ (endTime - startTime) +
+ " total time = " +
+ (endTime - startTime) +
+ " total number of records = " +
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES);
}
finally
{
@@ -178,9 +176,8 @@
{
super.run();
-
ByteBuffer buffer = null;
-
+
synchronized (MultiThreadAsynchronousFileTest.class)
{
buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -220,7 +217,7 @@
{
latchFinishThread = new CountDownLatch(1);
}
- CountDownCallback callback = new CountDownCallback(latchFinishThread);
+ CountDownCallback callback = new CountDownCallback(latchFinishThread,
null, null, 0);
if (!sync)
{
list.add(callback);
@@ -230,7 +227,7 @@
{
latchFinishThread.await();
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
}
if (!sync)
@@ -241,13 +238,13 @@
for (CountDownCallback callback : list)
{
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
for (CountDownCallback callback : list)
{
assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
+ assertFalse(callback.errorCalled != 0);
}
}