[jboss-cvs] JBoss Messaging SVN: r6052 - trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 9 18:59:22 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-09 18:59:21 -0400 (Mon, 09 Mar 2009)
New Revision: 6052
Added:
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadWriteNativeTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/SingleThreadWriteNativeTest.java
Log:
Renaming Tests using Andy's name
Copied: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java (from rev 6051, trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/SingleThreadWriteNativeTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-03-09 22:59:21 UTC (rev 6052)
@@ -0,0 +1,873 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.asyncio;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.BufferCallback;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ *
+ * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+ * If you are running this test in eclipse you should do:
+ * I - Run->Open Run Dialog
+ * II - Find the class on the list (you will find it if you already tried running this testcase before)
+ * III - Add -Djava.library.path=<your project place>/native/src/.libs
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>.
+ * */
+public class AsynchronousFileTest extends AIOTestBase
+{
+
+ private static final Logger log = Logger.getLogger(AsynchronousFileTest.class);
+
+ private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
+
+ byte commonBuffer[] = null;
+
+ private static void debug(final String msg)
+ {
+ log.debug(msg);
+ }
+
+ /**
+ * Opening and closing a file immediately can lead to races on the native layer,
+ * creating crash conditions.
+ * */
+ public void testOpenClose() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ for (int i = 0; i < 1000; i++)
+ {
+ controller.open(FILE_NAME, 10000);
+ controller.close();
+
+ }
+ }
+
+ public void testFileNonExistent() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ for (int i = 0; i < 1000; i++)
+ {
+ try
+ {
+ controller.open("/non-existent/IDontExist.error", 10000);
+ fail ("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail.");
+ }
+ catch (Throwable ignored)
+ {
+ }
+ try
+ {
+ controller.close();
+ fail("Supposed to throw exception as the file wasn't opened");
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ }
+ }
+
+ /**
+ * This test is validating if the AIO layer can open two different
+ * simultaneous files without loose any callbacks. This test made the native
+ * layer to crash at some point during development
+ */
+ public void testTwoFiles() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ final AsynchronousFileImpl controller2 = new AsynchronousFileImpl();
+ controller.open(FILE_NAME + ".1", 10000);
+ controller2.open(FILE_NAME + ".2", 10000);
+
+ int numberOfLines = 1000;
+ int size = 1024;
+
+ try
+ {
+ CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+ CountDownLatch latchDone2 = new CountDownLatch(numberOfLines);
+
+ ByteBuffer block = controller.newBuffer(size);
+ encodeBufer(block);
+
+ preAlloc(controller, numberOfLines * size);
+ preAlloc(controller2, numberOfLines * size);
+
+ ArrayList<CountDownCallback> list = new ArrayList<CountDownCallback>();
+ ArrayList<CountDownCallback> list2 = new ArrayList<CountDownCallback>();
+
+ for (int i = 0; i < numberOfLines; i++)
+ {
+ list.add(new CountDownCallback(latchDone));
+ list2.add(new CountDownCallback(latchDone2));
+ }
+
+ long valueInitial = System.currentTimeMillis();
+
+ long lastTime = System.currentTimeMillis();
+ int counter = 0;
+ Iterator<CountDownCallback> iter2 = list2.iterator();
+
+ for (CountDownCallback tmp : list)
+ {
+ CountDownCallback tmp2 = iter2.next();
+
+ controller.write(counter * size, size, block, tmp);
+ controller.write(counter * size, size, block, tmp2);
+ if (++counter % 5000 == 0)
+ {
+ debug(5000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
+ lastTime = System.currentTimeMillis();
+ }
+
+ }
+
+ long timeTotal = System.currentTimeMillis() - valueInitial;
+
+ debug("Asynchronous time = " + timeTotal +
+ " for " +
+ numberOfLines +
+ " registers " +
+ " size each line = " +
+ size +
+ " Records/Sec=" +
+ numberOfLines *
+ 1000 /
+ timeTotal +
+ " (Assynchronous)");
+
+ latchDone.await();
+ latchDone2.await();
+
+ timeTotal = System.currentTimeMillis() - valueInitial;
+ debug("After completions time = " + timeTotal +
+ " for " +
+ numberOfLines +
+ " registers " +
+ " size each line = " +
+ size +
+ " Records/Sec=" +
+ numberOfLines *
+ 1000 /
+ timeTotal +
+ " (Assynchronous)");
+
+ for (CountDownCallback callback : list)
+ {
+ assertEquals(1, callback.timesDoneCalled.get());
+ assertTrue(callback.doneCalled);
+ assertFalse(callback.errorCalled);
+ }
+
+ for (CountDownCallback callback : list2)
+ {
+ assertEquals(1, callback.timesDoneCalled.get());
+ assertTrue(callback.doneCalled);
+ assertFalse(callback.errorCalled);
+ }
+
+ controller.close();
+ }
+ finally
+ {
+ try
+ {
+ controller.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ try
+ {
+ controller2.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+
+ public void testAddBeyongSimultaneousLimit() throws Exception
+ {
+ asyncData(3000, 1024, 10);
+ }
+
+ public void testAddAsyncData() throws Exception
+ {
+ asyncData(10000, 1024, 30000);
+ }
+
+ public void testInvalidReads() throws Exception
+ {
+ class LocalCallback implements AIOCallback
+ {
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ volatile boolean error;
+
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ error = true;
+ latch.countDown();
+ }
+ }
+
+ AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+
+ final int SIZE = 512;
+
+ controller.open(FILE_NAME, 10);
+ controller.close();
+
+ controller = new AsynchronousFileImpl();
+
+ controller.open(FILE_NAME, 10);
+
+ controller.fill(0, 1, 512, (byte)'j');
+
+ ByteBuffer buffer = controller.newBuffer(SIZE);
+
+ buffer.clear();
+
+ for (int i = 0; i < SIZE; i++)
+ {
+ buffer.put((byte)(i % 100));
+ }
+
+ LocalCallback callbackLocal = new LocalCallback();
+
+ controller.write(0, 512, buffer, callbackLocal);
+
+ callbackLocal.latch.await();
+
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(50);
+
+ callbackLocal = new LocalCallback();
+
+ controller.read(0, 50, newBuffer, callbackLocal);
+
+ callbackLocal.latch.await();
+
+ // assertTrue(callbackLocal.error);
+
+ callbackLocal = new LocalCallback();
+
+ byte bytes[] = new byte[512];
+
+ try
+ {
+ newBuffer = ByteBuffer.wrap(bytes);
+
+ controller.read(0, 512, newBuffer, callbackLocal);
+
+ fail("An exception was supposed to be thrown");
+ }
+ catch (MessagingException ignored)
+ {
+ }
+
+ // newBuffer = ByteBuffer.allocateDirect(512);
+ newBuffer = controller.newBuffer(512);
+ callbackLocal = new LocalCallback();
+ controller.read(0, 512, newBuffer, callbackLocal);
+ callbackLocal.latch.await();
+ assertFalse(callbackLocal.error);
+
+ newBuffer.rewind();
+
+ byte[] bytesRead = new byte[SIZE];
+
+ newBuffer.get(bytesRead);
+
+ for (int i = 0; i < SIZE; i++)
+ {
+ assertEquals((byte)(i % 100), bytesRead[i]);
+ }
+ }
+ finally
+ {
+ try
+ {
+ controller.close();
+ }
+ catch (MessagingException ignored)
+ {
+ }
+
+ }
+
+ }
+
+ public void testBufferCallbackUniqueBuffers() throws Exception
+ {
+ boolean closed = false;
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+ final int NUMBER_LINES = 1000;
+ final int SIZE = 512;
+
+ controller.open(FILE_NAME, 1000);
+
+ controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+
+ final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+
+ BufferCallback bufferCallback = new BufferCallback()
+ {
+ public void bufferDone(ByteBuffer buffer)
+ {
+ buffers.add(buffer);
+ }
+ };
+
+ controller.setBufferCallback(bufferCallback);
+
+ CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
+ CountDownCallback aio = new CountDownCallback(latch);
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ ByteBuffer buffer = controller.newBuffer(SIZE);
+ buffer.rewind();
+ for (int j = 0; j < SIZE; j++)
+ {
+ buffer.put((byte)(j % Byte.MAX_VALUE));
+ }
+ controller.write(i * SIZE, SIZE, buffer, aio);
+ }
+
+ // The buffer callback is only called after the complete callback was
+ // called.
+ // Because of that a race could happen on the assertions to
+ // buffers.size what would invalidate the test
+ // We close the file and that would guarantee the buffer callback was
+ // called for all the elements
+ controller.close();
+ closed = true;
+
+ assertEquals(NUMBER_LINES, buffers.size());
+
+ // Make sure all the buffers are unique
+ ByteBuffer lineOne = null;
+ for (ByteBuffer bufferTmp : buffers)
+ {
+ if (lineOne == null)
+ {
+ lineOne = bufferTmp;
+ }
+ else
+ {
+ assertTrue(lineOne != bufferTmp);
+ }
+ }
+
+ buffers.clear();
+
+ }
+ finally
+ {
+ if (!closed)
+ {
+ controller.close();
+ }
+ }
+ }
+
+ public void testBufferCallbackAwaysSameBuffer() throws Exception
+ {
+ boolean closed = false;
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+ final int NUMBER_LINES = 1000;
+ final int SIZE = 512;
+
+ controller.open(FILE_NAME, 1000);
+
+ controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+
+ final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+
+ BufferCallback bufferCallback = new BufferCallback()
+ {
+ public void bufferDone(ByteBuffer buffer)
+ {
+ buffers.add(buffer);
+ }
+ };
+
+ controller.setBufferCallback(bufferCallback);
+
+ CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
+ CountDownCallback aio = new CountDownCallback(latch);
+
+ ByteBuffer buffer = controller.newBuffer(SIZE);
+ buffer.rewind();
+ for (int j = 0; j < SIZE; j++)
+ {
+ buffer.put((byte)(j % Byte.MAX_VALUE));
+ }
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ controller.write(i * SIZE, SIZE, buffer, aio);
+ }
+
+ // The buffer callback is only called after the complete callback was
+ // called.
+ // Because of that a race could happen on the assertions to
+ // buffers.size what would invalidate the test
+ // We close the file and that would guarantee the buffer callback was
+ // called for all the elements
+ controller.close();
+ closed = true;
+
+ assertEquals(NUMBER_LINES, buffers.size());
+
+ // Make sure all the buffers are unique
+ ByteBuffer lineOne = null;
+ for (ByteBuffer bufferTmp : buffers)
+ {
+ if (lineOne == null)
+ {
+ lineOne = bufferTmp;
+ }
+ else
+ {
+ assertTrue(lineOne == bufferTmp);
+ }
+ }
+
+ buffers.clear();
+
+ }
+ finally
+ {
+ if (!closed)
+ {
+ controller.close();
+ }
+ }
+ }
+
+ public void testRead() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+
+ final int NUMBER_LINES = 5000;
+ final int SIZE = 1024;
+
+ controller.open(FILE_NAME, 1000);
+
+ controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+
+ {
+ CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
+ CountDownCallback aio = new CountDownCallback(latch);
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+ addString("Str value " + i + "\n", buffer);
+ for (int j = buffer.position(); j < buffer.capacity() - 1; j++)
+ {
+ buffer.put((byte)' ');
+ }
+ buffer.put((byte)'\n');
+
+ controller.write(i * SIZE, SIZE, buffer, aio);
+ }
+
+ latch.await();
+ assertFalse(aio.errorCalled);
+ assertEquals(NUMBER_LINES, aio.timesDoneCalled.get());
+ }
+
+ // If you call close you're supposed to wait events to finish before
+ // closing it
+ controller.close();
+ controller.open(FILE_NAME, 10);
+
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ newBuffer.clear();
+ addString("Str value " + i + "\n", newBuffer);
+ for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++)
+ {
+ newBuffer.put((byte)' ');
+ }
+ newBuffer.put((byte)'\n');
+
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownCallback aio = new CountDownCallback(latch);
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+
+ controller.read(i * SIZE, SIZE, buffer, aio);
+ latch.await();
+ assertFalse(aio.errorCalled);
+ assertTrue(aio.doneCalled);
+
+ byte bytesRead[] = new byte[SIZE];
+ byte bytesCompare[] = new byte[SIZE];
+
+ newBuffer.rewind();
+ newBuffer.get(bytesCompare);
+ buffer.rewind();
+ buffer.get(bytesRead);
+
+ for (int count = 0; count < SIZE; count++)
+ {
+ assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
+ }
+
+ assertTrue(buffer.equals(newBuffer));
+ }
+ }
+ finally
+ {
+ try
+ {
+ controller.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ }
+
+ }
+
+ /**
+ * This test will call file.close() when there are still callbacks being processed.
+ * This could cause a crash or callbacks missing and this test is validating both situations.
+ * The file is also read after being written to validate its correctness */
+ public void testConcurrentClose() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+
+ final int NUMBER_LINES = 1000;
+ CountDownLatch readLatch = new CountDownLatch(NUMBER_LINES);
+ final int SIZE = 1024;
+
+ controller.open(FILE_NAME, 10000);
+
+ controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+
+ buffer.clear();
+ addString("Str value " + i + "\n", buffer);
+ for (int j = buffer.position(); j < buffer.capacity() - 1; j++)
+ {
+ buffer.put((byte)' ');
+ }
+ buffer.put((byte)'\n');
+
+ CountDownCallback aio = new CountDownCallback(readLatch);
+ controller.write(i * SIZE, SIZE, buffer, aio);
+ }
+
+ // If you call close you're supposed to wait events to finish before
+ // closing it
+ controller.close();
+
+ assertEquals(0, readLatch.getCount());
+ readLatch.await();
+ controller.open(FILE_NAME, 10);
+
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ newBuffer.clear();
+ addString("Str value " + i + "\n", newBuffer);
+ for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++)
+ {
+ newBuffer.put((byte)' ');
+ }
+ newBuffer.put((byte)'\n');
+
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownCallback aio = new CountDownCallback(latch);
+ controller.read(i * SIZE, SIZE, buffer, aio);
+ latch.await();
+ assertFalse(aio.errorCalled);
+ assertTrue(aio.doneCalled);
+
+ byte bytesRead[] = new byte[SIZE];
+ byte bytesCompare[] = new byte[SIZE];
+
+ newBuffer.rewind();
+ newBuffer.get(bytesCompare);
+ buffer.rewind();
+ buffer.get(bytesRead);
+
+ for (int count = 0; count < SIZE; count++)
+ {
+ assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
+ }
+
+ assertTrue(buffer.equals(newBuffer));
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ controller.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ }
+ }
+
+ private void asyncData(final int numberOfLines, final int size, final int aioLimit) throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ controller.open(FILE_NAME, aioLimit);
+
+ try
+ {
+ CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+
+ ByteBuffer block = controller.newBuffer(size);
+ encodeBufer(block);
+
+ preAlloc(controller, numberOfLines * size);
+
+ ArrayList<CountDownCallback> list = new ArrayList<CountDownCallback>();
+
+ for (int i = 0; i < numberOfLines; i++)
+ {
+ list.add(new CountDownCallback(latchDone));
+ }
+
+ long valueInitial = System.currentTimeMillis();
+
+ long lastTime = System.currentTimeMillis();
+ int counter = 0;
+ for (CountDownCallback tmp : list)
+ {
+ controller.write(counter * size, size, block, tmp);
+ if (++counter % 20000 == 0)
+ {
+ debug(20000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
+ lastTime = System.currentTimeMillis();
+ }
+
+ }
+
+ latchDone.await();
+
+ long timeTotal = System.currentTimeMillis() - valueInitial;
+ debug("After completions time = " + timeTotal +
+ " for " +
+ numberOfLines +
+ " registers " +
+ " size each line = " +
+ size +
+ ", Records/Sec=" +
+ numberOfLines *
+ 1000 /
+ timeTotal +
+ " (Assynchronous)");
+
+ for (CountDownCallback tmp : list)
+ {
+ assertEquals(1, tmp.timesDoneCalled.get());
+ assertTrue(tmp.doneCalled);
+ assertFalse(tmp.errorCalled);
+ }
+
+ controller.close();
+ }
+ finally
+ {
+ try
+ {
+ controller.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testDirectSynchronous() throws Exception
+ {
+ try
+ {
+ final int NUMBER_LINES = 3000;
+ final int SIZE = 1024;
+
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ controller.open(FILE_NAME, 2000);
+
+ ByteBuffer block = ByteBuffer.allocateDirect(SIZE);
+ encodeBufer(block);
+
+ preAlloc(controller, NUMBER_LINES * SIZE);
+
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ CountDownLatch latchDone = new CountDownLatch(1);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ controller.write(i * 512, 512, block, aioBlock);
+ latchDone.await();
+ assertTrue(aioBlock.doneCalled);
+ assertFalse(aioBlock.errorCalled);
+ }
+
+ long timeTotal = System.currentTimeMillis() - startTime;
+ debug("time = " + timeTotal +
+ " for " +
+ NUMBER_LINES +
+ " registers " +
+ " size each line = " +
+ SIZE +
+ " Records/Sec=" +
+ NUMBER_LINES *
+ 1000 /
+ timeTotal +
+ " Synchronous");
+
+ controller.close();
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+
+ }
+
+ public void testInvalidWrite() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ controller.open(FILE_NAME, 2000);
+
+ try
+ {
+ final int SIZE = 512;
+
+ ByteBuffer block = controller.newBuffer(SIZE);
+ encodeBufer(block);
+
+ preAlloc(controller, 10 * 512);
+
+ CountDownLatch latchDone = new CountDownLatch(1);
+
+ CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ controller.write(11, 512, block, aioBlock);
+
+ latchDone.await();
+
+ assertTrue(aioBlock.errorCalled);
+ assertFalse(aioBlock.doneCalled);
+
+ }
+ catch (Exception e)
+ {
+ throw e;
+ }
+ finally
+ {
+ controller.close();
+ }
+
+ }
+
+ public void testInvalidAlloc() throws Exception
+ {
+ AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+ ByteBuffer buffer = controller.newBuffer(300);
+ fail("Exception expected");
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ }
+
+ public void testSize() throws Exception
+ {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+
+ final int NUMBER_LINES = 10;
+ final int SIZE = 1024;
+
+ controller.open(FILE_NAME, 1);
+
+ controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
+
+ assertEquals(NUMBER_LINES * SIZE, controller.size());
+
+ controller.close();
+
+ }
+
+ private void addString(final String str, final ByteBuffer buffer)
+ {
+ CharBuffer charBuffer = CharBuffer.wrap(str);
+ UTF_8_ENCODER.encode(charBuffer, buffer, true);
+
+ }
+
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Name: svn:mergeinfo
+
Copied: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java (from rev 6051, trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadWriteNativeTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-03-09 22:59:21 UTC (rev 6052)
@@ -0,0 +1,296 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.unit.core.asyncio;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.logging.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
+ * If you are running this test in eclipse you should do:
+ * I - Run->Open Run Dialog
+ * II - Find the class on the list (you will find it if you already tried running this testcase before)
+ * III - Add -Djava.library.path=<your project place>/native/src/.libs
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>.
+ * */
+public class MultiThreadAsynchronousFileTest extends AIOTestBase
+{
+
+ static Logger log = Logger.getLogger(MultiThreadAsynchronousFileTest.class);
+
+ AtomicInteger position = new AtomicInteger(0);
+
+ static final int SIZE = 1024;
+
+ static final int NUMBER_OF_THREADS = 10;
+
+ static final int NUMBER_OF_LINES = 1000;
+
+ // Executor exec
+
+ Executor executor = Executors.newSingleThreadExecutor();
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ position.set(0);
+ }
+
+ public void testMultipleASynchronousWrites() throws Throwable
+ {
+ executeTest(false);
+ }
+
+ public void testMultipleSynchronousWrites() throws Throwable
+ {
+ executeTest(true);
+ }
+
+ private void executeTest(final boolean sync) throws Throwable
+ {
+ log.debug(sync ? "Sync test:" : "Async test");
+ AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl();
+ jlibAIO.open(FILE_NAME, 21000);
+ try
+ {
+ log.debug("Preallocating file");
+
+ jlibAIO.fill(0l, NUMBER_OF_THREADS, SIZE * NUMBER_OF_LINES, (byte)0);
+ log.debug("Done Preallocating file");
+
+ CountDownLatch latchStart = new CountDownLatch(NUMBER_OF_THREADS + 1);
+
+ ArrayList<ThreadProducer> list = new ArrayList<ThreadProducer>(NUMBER_OF_THREADS);
+ for (int i = 0; i < NUMBER_OF_THREADS; i++)
+ {
+ ThreadProducer producer = new ThreadProducer("Thread " + i, latchStart, jlibAIO, sync);
+ list.add(producer);
+ producer.start();
+ }
+
+ latchStart.countDown();
+ latchStart.await();
+
+ long startTime = System.currentTimeMillis();
+
+ for (ThreadProducer producer : list)
+ {
+ producer.join();
+ if (producer.failed != null)
+ {
+ throw producer.failed;
+ }
+ }
+ long endTime = System.currentTimeMillis();
+
+ log.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES *
+ 1000 /
+ (endTime - startTime) +
+ " total time = " +
+ (endTime - startTime) +
+ " total number of records = " +
+ NUMBER_OF_THREADS *
+ NUMBER_OF_LINES);
+ }
+ finally
+ {
+ jlibAIO.close();
+ }
+
+ }
+
+ private int getNewPosition()
+ {
+ return position.addAndGet(1);
+ }
+
+ class ThreadProducer extends Thread
+ {
+ Throwable failed = null;
+
+ CountDownLatch latchStart;
+
+ boolean sync;
+
+ AsynchronousFileImpl libaio;
+
+ public ThreadProducer(final String name,
+ final CountDownLatch latchStart,
+ final AsynchronousFileImpl libaio,
+ final boolean sync)
+ {
+ super(name);
+ this.latchStart = latchStart;
+ this.libaio = libaio;
+ this.sync = sync;
+ }
+
+ @Override
+ public void run()
+ {
+ super.run();
+
+ try
+ {
+
+ ByteBuffer buffer = libaio.newBuffer(SIZE);
+
+ // I'm aways reusing the same buffer, as I don't want any noise from
+ // malloc on the measurement
+ // Encoding buffer
+ addString("Thread name=" + Thread.currentThread().getName() + ";" + "\n", buffer);
+ for (int local = buffer.position(); local < buffer.capacity() - 1; local++)
+ {
+ buffer.put((byte)' ');
+ }
+ buffer.put((byte)'\n');
+
+ latchStart.countDown();
+ latchStart.await();
+
+ long startTime = System.currentTimeMillis();
+
+ CountDownLatch latchFinishThread = null;
+
+ if (!sync)
+ {
+ latchFinishThread = new CountDownLatch(NUMBER_OF_LINES);
+ }
+
+ LinkedList<CountDownCallback> list = new LinkedList<CountDownCallback>();
+
+ for (int i = 0; i < NUMBER_OF_LINES; i++)
+ {
+
+ if (sync)
+ {
+ latchFinishThread = new CountDownLatch(1);
+ }
+ CountDownCallback callback = new CountDownCallback(latchFinishThread);
+ if (!sync)
+ {
+ list.add(callback);
+ }
+ addData(libaio, buffer, callback);
+ if (sync)
+ {
+ latchFinishThread.await();
+ assertTrue(callback.doneCalled);
+ assertFalse(callback.errorCalled);
+ }
+ }
+ if (!sync)
+ {
+ latchFinishThread.await();
+ }
+ for (CountDownCallback callback : list)
+ {
+ assertTrue(callback.doneCalled);
+ assertFalse(callback.errorCalled);
+ }
+
+ long endtime = System.currentTimeMillis();
+
+ log.debug(Thread.currentThread().getName() + " Rec/Sec= " +
+ NUMBER_OF_LINES *
+ 1000 /
+ (endtime - startTime) +
+ " total time = " +
+ (endtime - startTime) +
+ " number of lines=" +
+ NUMBER_OF_LINES);
+
+ for (CountDownCallback callback : list)
+ {
+ assertTrue(callback.doneCalled);
+ assertFalse(callback.errorCalled);
+ }
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ failed = e;
+ }
+
+ }
+ }
+
+ private static void addString(final String str, final ByteBuffer buffer)
+ {
+ byte bytes[] = str.getBytes();
+ buffer.put(bytes);
+ }
+
+ private void addData(final AsynchronousFileImpl aio, final ByteBuffer buffer, final AIOCallback callback) throws Exception
+ {
+ executor.execute(new WriteRunnable(aio, buffer, callback));
+ }
+
+ private class WriteRunnable implements Runnable
+ {
+
+ AsynchronousFileImpl aio;
+
+ ByteBuffer buffer;
+
+ AIOCallback callback;
+
+ public WriteRunnable(final AsynchronousFileImpl aio, final ByteBuffer buffer, final AIOCallback callback)
+ {
+ this.aio = aio;
+ this.buffer = buffer;
+ this.callback = callback;
+ }
+
+ public void run()
+ {
+ try
+ {
+ aio.write(getNewPosition() * SIZE, SIZE, buffer, callback);
+
+ }
+ catch (Exception e)
+ {
+ callback.onError(-1, e.toString());
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Name: svn:mergeinfo
+
Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadWriteNativeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadWriteNativeTest.java 2009-03-09 18:21:33 UTC (rev 6051)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadWriteNativeTest.java 2009-03-09 22:59:21 UTC (rev 6052)
@@ -1,296 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.unit.core.asyncio;
-
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.logging.Logger;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- *
- * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
- * If you are running this test in eclipse you should do:
- * I - Run->Open Run Dialog
- * II - Find the class on the list (you will find it if you already tried running this testcase before)
- * III - Add -Djava.library.path=<your project place>/native/src/.libs
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>.
- * */
-public class MultiThreadWriteNativeTest extends AIOTestBase
-{
-
- static Logger log = Logger.getLogger(MultiThreadWriteNativeTest.class);
-
- AtomicInteger position = new AtomicInteger(0);
-
- static final int SIZE = 1024;
-
- static final int NUMBER_OF_THREADS = 10;
-
- static final int NUMBER_OF_LINES = 1000;
-
- // Executor exec
-
- Executor executor = Executors.newSingleThreadExecutor();
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- position.set(0);
- }
-
- public void testMultipleASynchronousWrites() throws Throwable
- {
- executeTest(false);
- }
-
- public void testMultipleSynchronousWrites() throws Throwable
- {
- executeTest(true);
- }
-
- private void executeTest(final boolean sync) throws Throwable
- {
- log.debug(sync ? "Sync test:" : "Async test");
- AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl();
- jlibAIO.open(FILE_NAME, 21000);
- try
- {
- log.debug("Preallocating file");
-
- jlibAIO.fill(0l, NUMBER_OF_THREADS, SIZE * NUMBER_OF_LINES, (byte)0);
- log.debug("Done Preallocating file");
-
- CountDownLatch latchStart = new CountDownLatch(NUMBER_OF_THREADS + 1);
-
- ArrayList<ThreadProducer> list = new ArrayList<ThreadProducer>(NUMBER_OF_THREADS);
- for (int i = 0; i < NUMBER_OF_THREADS; i++)
- {
- ThreadProducer producer = new ThreadProducer("Thread " + i, latchStart, jlibAIO, sync);
- list.add(producer);
- producer.start();
- }
-
- latchStart.countDown();
- latchStart.await();
-
- long startTime = System.currentTimeMillis();
-
- for (ThreadProducer producer : list)
- {
- producer.join();
- if (producer.failed != null)
- {
- throw producer.failed;
- }
- }
- long endTime = System.currentTimeMillis();
-
- log.debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
- NUMBER_OF_THREADS *
- NUMBER_OF_LINES *
- 1000 /
- (endTime - startTime) +
- " total time = " +
- (endTime - startTime) +
- " total number of records = " +
- NUMBER_OF_THREADS *
- NUMBER_OF_LINES);
- }
- finally
- {
- jlibAIO.close();
- }
-
- }
-
- private int getNewPosition()
- {
- return position.addAndGet(1);
- }
-
- class ThreadProducer extends Thread
- {
- Throwable failed = null;
-
- CountDownLatch latchStart;
-
- boolean sync;
-
- AsynchronousFileImpl libaio;
-
- public ThreadProducer(final String name,
- final CountDownLatch latchStart,
- final AsynchronousFileImpl libaio,
- final boolean sync)
- {
- super(name);
- this.latchStart = latchStart;
- this.libaio = libaio;
- this.sync = sync;
- }
-
- @Override
- public void run()
- {
- super.run();
-
- try
- {
-
- ByteBuffer buffer = libaio.newBuffer(SIZE);
-
- // I'm aways reusing the same buffer, as I don't want any noise from
- // malloc on the measurement
- // Encoding buffer
- addString("Thread name=" + Thread.currentThread().getName() + ";" + "\n", buffer);
- for (int local = buffer.position(); local < buffer.capacity() - 1; local++)
- {
- buffer.put((byte)' ');
- }
- buffer.put((byte)'\n');
-
- latchStart.countDown();
- latchStart.await();
-
- long startTime = System.currentTimeMillis();
-
- CountDownLatch latchFinishThread = null;
-
- if (!sync)
- {
- latchFinishThread = new CountDownLatch(NUMBER_OF_LINES);
- }
-
- LinkedList<CountDownCallback> list = new LinkedList<CountDownCallback>();
-
- for (int i = 0; i < NUMBER_OF_LINES; i++)
- {
-
- if (sync)
- {
- latchFinishThread = new CountDownLatch(1);
- }
- CountDownCallback callback = new CountDownCallback(latchFinishThread);
- if (!sync)
- {
- list.add(callback);
- }
- addData(libaio, buffer, callback);
- if (sync)
- {
- latchFinishThread.await();
- assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
- }
- }
- if (!sync)
- {
- latchFinishThread.await();
- }
- for (CountDownCallback callback : list)
- {
- assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
- }
-
- long endtime = System.currentTimeMillis();
-
- log.debug(Thread.currentThread().getName() + " Rec/Sec= " +
- NUMBER_OF_LINES *
- 1000 /
- (endtime - startTime) +
- " total time = " +
- (endtime - startTime) +
- " number of lines=" +
- NUMBER_OF_LINES);
-
- for (CountDownCallback callback : list)
- {
- assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
- }
-
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- failed = e;
- }
-
- }
- }
-
- private static void addString(final String str, final ByteBuffer buffer)
- {
- byte bytes[] = str.getBytes();
- buffer.put(bytes);
- }
-
- private void addData(final AsynchronousFileImpl aio, final ByteBuffer buffer, final AIOCallback callback) throws Exception
- {
- executor.execute(new WriteRunnable(aio, buffer, callback));
- }
-
- private class WriteRunnable implements Runnable
- {
-
- AsynchronousFileImpl aio;
-
- ByteBuffer buffer;
-
- AIOCallback callback;
-
- public WriteRunnable(final AsynchronousFileImpl aio, final ByteBuffer buffer, final AIOCallback callback)
- {
- this.aio = aio;
- this.buffer = buffer;
- this.callback = callback;
- }
-
- public void run()
- {
- try
- {
- aio.write(getNewPosition() * SIZE, SIZE, buffer, callback);
-
- }
- catch (Exception e)
- {
- callback.onError(-1, e.toString());
- e.printStackTrace();
- }
- }
-
- }
-
-}
Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/SingleThreadWriteNativeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/SingleThreadWriteNativeTest.java 2009-03-09 18:21:33 UTC (rev 6051)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/SingleThreadWriteNativeTest.java 2009-03-09 22:59:21 UTC (rev 6052)
@@ -1,873 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.unit.core.asyncio;
-
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.BufferCallback;
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetEncoder;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.concurrent.CountDownLatch;
-
-/**
- *
- * you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
- * If you are running this test in eclipse you should do:
- * I - Run->Open Run Dialog
- * II - Find the class on the list (you will find it if you already tried running this testcase before)
- * III - Add -Djava.library.path=<your project place>/native/src/.libs
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>.
- * */
-public class SingleThreadWriteNativeTest extends AIOTestBase
-{
-
- private static final Logger log = Logger.getLogger(SingleThreadWriteNativeTest.class);
-
- private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
-
- byte commonBuffer[] = null;
-
- private static void debug(final String msg)
- {
- log.debug(msg);
- }
-
- /**
- * Opening and closing a file immediately can lead to races on the native layer,
- * creating crash conditions.
- * */
- public void testOpenClose() throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- for (int i = 0; i < 1000; i++)
- {
- controller.open(FILE_NAME, 10000);
- controller.close();
-
- }
- }
-
- public void testFileNonExistent() throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- for (int i = 0; i < 1000; i++)
- {
- try
- {
- controller.open("/non-existent/IDontExist.error", 10000);
- fail ("Exception expected! The test could create a file called /non-existent/IDontExist.error when it was supposed to fail.");
- }
- catch (Throwable ignored)
- {
- }
- try
- {
- controller.close();
- fail("Supposed to throw exception as the file wasn't opened");
- }
- catch (Throwable ignored)
- {
- }
-
- }
- }
-
- /**
- * This test is validating if the AIO layer can open two different
- * simultaneous files without loose any callbacks. This test made the native
- * layer to crash at some point during development
- */
- public void testTwoFiles() throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- final AsynchronousFileImpl controller2 = new AsynchronousFileImpl();
- controller.open(FILE_NAME + ".1", 10000);
- controller2.open(FILE_NAME + ".2", 10000);
-
- int numberOfLines = 1000;
- int size = 1024;
-
- try
- {
- CountDownLatch latchDone = new CountDownLatch(numberOfLines);
- CountDownLatch latchDone2 = new CountDownLatch(numberOfLines);
-
- ByteBuffer block = controller.newBuffer(size);
- encodeBufer(block);
-
- preAlloc(controller, numberOfLines * size);
- preAlloc(controller2, numberOfLines * size);
-
- ArrayList<CountDownCallback> list = new ArrayList<CountDownCallback>();
- ArrayList<CountDownCallback> list2 = new ArrayList<CountDownCallback>();
-
- for (int i = 0; i < numberOfLines; i++)
- {
- list.add(new CountDownCallback(latchDone));
- list2.add(new CountDownCallback(latchDone2));
- }
-
- long valueInitial = System.currentTimeMillis();
-
- long lastTime = System.currentTimeMillis();
- int counter = 0;
- Iterator<CountDownCallback> iter2 = list2.iterator();
-
- for (CountDownCallback tmp : list)
- {
- CountDownCallback tmp2 = iter2.next();
-
- controller.write(counter * size, size, block, tmp);
- controller.write(counter * size, size, block, tmp2);
- if (++counter % 5000 == 0)
- {
- debug(5000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
- lastTime = System.currentTimeMillis();
- }
-
- }
-
- long timeTotal = System.currentTimeMillis() - valueInitial;
-
- debug("Asynchronous time = " + timeTotal +
- " for " +
- numberOfLines +
- " registers " +
- " size each line = " +
- size +
- " Records/Sec=" +
- numberOfLines *
- 1000 /
- timeTotal +
- " (Assynchronous)");
-
- latchDone.await();
- latchDone2.await();
-
- timeTotal = System.currentTimeMillis() - valueInitial;
- debug("After completions time = " + timeTotal +
- " for " +
- numberOfLines +
- " registers " +
- " size each line = " +
- size +
- " Records/Sec=" +
- numberOfLines *
- 1000 /
- timeTotal +
- " (Assynchronous)");
-
- for (CountDownCallback callback : list)
- {
- assertEquals(1, callback.timesDoneCalled.get());
- assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
- }
-
- for (CountDownCallback callback : list2)
- {
- assertEquals(1, callback.timesDoneCalled.get());
- assertTrue(callback.doneCalled);
- assertFalse(callback.errorCalled);
- }
-
- controller.close();
- }
- finally
- {
- try
- {
- controller.close();
- }
- catch (Exception ignored)
- {
- }
- try
- {
- controller2.close();
- }
- catch (Exception ignored)
- {
- }
- }
- }
-
- public void testAddBeyongSimultaneousLimit() throws Exception
- {
- asyncData(3000, 1024, 10);
- }
-
- public void testAddAsyncData() throws Exception
- {
- asyncData(10000, 1024, 30000);
- }
-
- public void testInvalidReads() throws Exception
- {
- class LocalCallback implements AIOCallback
- {
- private final CountDownLatch latch = new CountDownLatch(1);
-
- volatile boolean error;
-
- public void done()
- {
- latch.countDown();
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- error = true;
- latch.countDown();
- }
- }
-
- AsynchronousFileImpl controller = new AsynchronousFileImpl();
- try
- {
-
- final int SIZE = 512;
-
- controller.open(FILE_NAME, 10);
- controller.close();
-
- controller = new AsynchronousFileImpl();
-
- controller.open(FILE_NAME, 10);
-
- controller.fill(0, 1, 512, (byte)'j');
-
- ByteBuffer buffer = controller.newBuffer(SIZE);
-
- buffer.clear();
-
- for (int i = 0; i < SIZE; i++)
- {
- buffer.put((byte)(i % 100));
- }
-
- LocalCallback callbackLocal = new LocalCallback();
-
- controller.write(0, 512, buffer, callbackLocal);
-
- callbackLocal.latch.await();
-
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(50);
-
- callbackLocal = new LocalCallback();
-
- controller.read(0, 50, newBuffer, callbackLocal);
-
- callbackLocal.latch.await();
-
- // assertTrue(callbackLocal.error);
-
- callbackLocal = new LocalCallback();
-
- byte bytes[] = new byte[512];
-
- try
- {
- newBuffer = ByteBuffer.wrap(bytes);
-
- controller.read(0, 512, newBuffer, callbackLocal);
-
- fail("An exception was supposed to be thrown");
- }
- catch (MessagingException ignored)
- {
- }
-
- // newBuffer = ByteBuffer.allocateDirect(512);
- newBuffer = controller.newBuffer(512);
- callbackLocal = new LocalCallback();
- controller.read(0, 512, newBuffer, callbackLocal);
- callbackLocal.latch.await();
- assertFalse(callbackLocal.error);
-
- newBuffer.rewind();
-
- byte[] bytesRead = new byte[SIZE];
-
- newBuffer.get(bytesRead);
-
- for (int i = 0; i < SIZE; i++)
- {
- assertEquals((byte)(i % 100), bytesRead[i]);
- }
- }
- finally
- {
- try
- {
- controller.close();
- }
- catch (MessagingException ignored)
- {
- }
-
- }
-
- }
-
- public void testBufferCallbackUniqueBuffers() throws Exception
- {
- boolean closed = false;
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- try
- {
- final int NUMBER_LINES = 1000;
- final int SIZE = 512;
-
- controller.open(FILE_NAME, 1000);
-
- controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
-
- final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
-
- BufferCallback bufferCallback = new BufferCallback()
- {
- public void bufferDone(ByteBuffer buffer)
- {
- buffers.add(buffer);
- }
- };
-
- controller.setBufferCallback(bufferCallback);
-
- CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
- for (int i = 0; i < NUMBER_LINES; i++)
- {
- ByteBuffer buffer = controller.newBuffer(SIZE);
- buffer.rewind();
- for (int j = 0; j < SIZE; j++)
- {
- buffer.put((byte)(j % Byte.MAX_VALUE));
- }
- controller.write(i * SIZE, SIZE, buffer, aio);
- }
-
- // The buffer callback is only called after the complete callback was
- // called.
- // Because of that a race could happen on the assertions to
- // buffers.size what would invalidate the test
- // We close the file and that would guarantee the buffer callback was
- // called for all the elements
- controller.close();
- closed = true;
-
- assertEquals(NUMBER_LINES, buffers.size());
-
- // Make sure all the buffers are unique
- ByteBuffer lineOne = null;
- for (ByteBuffer bufferTmp : buffers)
- {
- if (lineOne == null)
- {
- lineOne = bufferTmp;
- }
- else
- {
- assertTrue(lineOne != bufferTmp);
- }
- }
-
- buffers.clear();
-
- }
- finally
- {
- if (!closed)
- {
- controller.close();
- }
- }
- }
-
- public void testBufferCallbackAwaysSameBuffer() throws Exception
- {
- boolean closed = false;
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- try
- {
- final int NUMBER_LINES = 1000;
- final int SIZE = 512;
-
- controller.open(FILE_NAME, 1000);
-
- controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
-
- final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
-
- BufferCallback bufferCallback = new BufferCallback()
- {
- public void bufferDone(ByteBuffer buffer)
- {
- buffers.add(buffer);
- }
- };
-
- controller.setBufferCallback(bufferCallback);
-
- CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
-
- ByteBuffer buffer = controller.newBuffer(SIZE);
- buffer.rewind();
- for (int j = 0; j < SIZE; j++)
- {
- buffer.put((byte)(j % Byte.MAX_VALUE));
- }
-
- for (int i = 0; i < NUMBER_LINES; i++)
- {
- controller.write(i * SIZE, SIZE, buffer, aio);
- }
-
- // The buffer callback is only called after the complete callback was
- // called.
- // Because of that a race could happen on the assertions to
- // buffers.size what would invalidate the test
- // We close the file and that would guarantee the buffer callback was
- // called for all the elements
- controller.close();
- closed = true;
-
- assertEquals(NUMBER_LINES, buffers.size());
-
- // Make sure all the buffers are unique
- ByteBuffer lineOne = null;
- for (ByteBuffer bufferTmp : buffers)
- {
- if (lineOne == null)
- {
- lineOne = bufferTmp;
- }
- else
- {
- assertTrue(lineOne == bufferTmp);
- }
- }
-
- buffers.clear();
-
- }
- finally
- {
- if (!closed)
- {
- controller.close();
- }
- }
- }
-
- public void testRead() throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- try
- {
-
- final int NUMBER_LINES = 5000;
- final int SIZE = 1024;
-
- controller.open(FILE_NAME, 1000);
-
- controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
-
- {
- CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
- CountDownCallback aio = new CountDownCallback(latch);
-
- for (int i = 0; i < NUMBER_LINES; i++)
- {
- ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
- addString("Str value " + i + "\n", buffer);
- for (int j = buffer.position(); j < buffer.capacity() - 1; j++)
- {
- buffer.put((byte)' ');
- }
- buffer.put((byte)'\n');
-
- controller.write(i * SIZE, SIZE, buffer, aio);
- }
-
- latch.await();
- assertFalse(aio.errorCalled);
- assertEquals(NUMBER_LINES, aio.timesDoneCalled.get());
- }
-
- // If you call close you're supposed to wait events to finish before
- // closing it
- controller.close();
- controller.open(FILE_NAME, 10);
-
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
-
- for (int i = 0; i < NUMBER_LINES; i++)
- {
- newBuffer.clear();
- addString("Str value " + i + "\n", newBuffer);
- for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++)
- {
- newBuffer.put((byte)' ');
- }
- newBuffer.put((byte)'\n');
-
- CountDownLatch latch = new CountDownLatch(1);
- CountDownCallback aio = new CountDownCallback(latch);
-
- ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
-
- controller.read(i * SIZE, SIZE, buffer, aio);
- latch.await();
- assertFalse(aio.errorCalled);
- assertTrue(aio.doneCalled);
-
- byte bytesRead[] = new byte[SIZE];
- byte bytesCompare[] = new byte[SIZE];
-
- newBuffer.rewind();
- newBuffer.get(bytesCompare);
- buffer.rewind();
- buffer.get(bytesRead);
-
- for (int count = 0; count < SIZE; count++)
- {
- assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
- }
-
- assertTrue(buffer.equals(newBuffer));
- }
- }
- finally
- {
- try
- {
- controller.close();
- }
- catch (Throwable ignored)
- {
- }
-
- }
-
- }
-
- /**
- * This test will call file.close() when there are still callbacks being processed.
- * This could cause a crash or callbacks missing and this test is validating both situations.
- * The file is also read after being written to validate its correctness */
- public void testConcurrentClose() throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- try
- {
-
- final int NUMBER_LINES = 1000;
- CountDownLatch readLatch = new CountDownLatch(NUMBER_LINES);
- final int SIZE = 1024;
-
- controller.open(FILE_NAME, 10000);
-
- controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
-
- for (int i = 0; i < NUMBER_LINES; i++)
- {
- ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
-
- buffer.clear();
- addString("Str value " + i + "\n", buffer);
- for (int j = buffer.position(); j < buffer.capacity() - 1; j++)
- {
- buffer.put((byte)' ');
- }
- buffer.put((byte)'\n');
-
- CountDownCallback aio = new CountDownCallback(readLatch);
- controller.write(i * SIZE, SIZE, buffer, aio);
- }
-
- // If you call close you're supposed to wait events to finish before
- // closing it
- controller.close();
-
- assertEquals(0, readLatch.getCount());
- readLatch.await();
- controller.open(FILE_NAME, 10);
-
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
-
- ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
-
- for (int i = 0; i < NUMBER_LINES; i++)
- {
- newBuffer.clear();
- addString("Str value " + i + "\n", newBuffer);
- for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++)
- {
- newBuffer.put((byte)' ');
- }
- newBuffer.put((byte)'\n');
-
- CountDownLatch latch = new CountDownLatch(1);
- CountDownCallback aio = new CountDownCallback(latch);
- controller.read(i * SIZE, SIZE, buffer, aio);
- latch.await();
- assertFalse(aio.errorCalled);
- assertTrue(aio.doneCalled);
-
- byte bytesRead[] = new byte[SIZE];
- byte bytesCompare[] = new byte[SIZE];
-
- newBuffer.rewind();
- newBuffer.get(bytesCompare);
- buffer.rewind();
- buffer.get(bytesRead);
-
- for (int count = 0; count < SIZE; count++)
- {
- assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
- }
-
- assertTrue(buffer.equals(newBuffer));
- }
-
- }
- finally
- {
- try
- {
- controller.close();
- }
- catch (Throwable ignored)
- {
- }
-
- }
- }
-
- private void asyncData(final int numberOfLines, final int size, final int aioLimit) throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- controller.open(FILE_NAME, aioLimit);
-
- try
- {
- CountDownLatch latchDone = new CountDownLatch(numberOfLines);
-
- ByteBuffer block = controller.newBuffer(size);
- encodeBufer(block);
-
- preAlloc(controller, numberOfLines * size);
-
- ArrayList<CountDownCallback> list = new ArrayList<CountDownCallback>();
-
- for (int i = 0; i < numberOfLines; i++)
- {
- list.add(new CountDownCallback(latchDone));
- }
-
- long valueInitial = System.currentTimeMillis();
-
- long lastTime = System.currentTimeMillis();
- int counter = 0;
- for (CountDownCallback tmp : list)
- {
- controller.write(counter * size, size, block, tmp);
- if (++counter % 20000 == 0)
- {
- debug(20000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
- lastTime = System.currentTimeMillis();
- }
-
- }
-
- latchDone.await();
-
- long timeTotal = System.currentTimeMillis() - valueInitial;
- debug("After completions time = " + timeTotal +
- " for " +
- numberOfLines +
- " registers " +
- " size each line = " +
- size +
- ", Records/Sec=" +
- numberOfLines *
- 1000 /
- timeTotal +
- " (Assynchronous)");
-
- for (CountDownCallback tmp : list)
- {
- assertEquals(1, tmp.timesDoneCalled.get());
- assertTrue(tmp.doneCalled);
- assertFalse(tmp.errorCalled);
- }
-
- controller.close();
- }
- finally
- {
- try
- {
- controller.close();
- }
- catch (Exception ignored)
- {
- }
- }
-
- }
-
- public void testDirectSynchronous() throws Exception
- {
- try
- {
- final int NUMBER_LINES = 3000;
- final int SIZE = 1024;
-
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- controller.open(FILE_NAME, 2000);
-
- ByteBuffer block = ByteBuffer.allocateDirect(SIZE);
- encodeBufer(block);
-
- preAlloc(controller, NUMBER_LINES * SIZE);
-
- long startTime = System.currentTimeMillis();
-
- for (int i = 0; i < NUMBER_LINES; i++)
- {
- CountDownLatch latchDone = new CountDownLatch(1);
- CountDownCallback aioBlock = new CountDownCallback(latchDone);
- controller.write(i * 512, 512, block, aioBlock);
- latchDone.await();
- assertTrue(aioBlock.doneCalled);
- assertFalse(aioBlock.errorCalled);
- }
-
- long timeTotal = System.currentTimeMillis() - startTime;
- debug("time = " + timeTotal +
- " for " +
- NUMBER_LINES +
- " registers " +
- " size each line = " +
- SIZE +
- " Records/Sec=" +
- NUMBER_LINES *
- 1000 /
- timeTotal +
- " Synchronous");
-
- controller.close();
- }
- catch (Exception e)
- {
- throw e;
- }
-
- }
-
- public void testInvalidWrite() throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- controller.open(FILE_NAME, 2000);
-
- try
- {
- final int SIZE = 512;
-
- ByteBuffer block = controller.newBuffer(SIZE);
- encodeBufer(block);
-
- preAlloc(controller, 10 * 512);
-
- CountDownLatch latchDone = new CountDownLatch(1);
-
- CountDownCallback aioBlock = new CountDownCallback(latchDone);
- controller.write(11, 512, block, aioBlock);
-
- latchDone.await();
-
- assertTrue(aioBlock.errorCalled);
- assertFalse(aioBlock.doneCalled);
-
- }
- catch (Exception e)
- {
- throw e;
- }
- finally
- {
- controller.close();
- }
-
- }
-
- public void testInvalidAlloc() throws Exception
- {
- AsynchronousFileImpl controller = new AsynchronousFileImpl();
- try
- {
- ByteBuffer buffer = controller.newBuffer(300);
- fail("Exception expected");
- }
- catch (Exception ignored)
- {
- }
-
- }
-
- public void testSize() throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
-
- final int NUMBER_LINES = 10;
- final int SIZE = 1024;
-
- controller.open(FILE_NAME, 1);
-
- controller.fill(0, 1, NUMBER_LINES * SIZE, (byte)'j');
-
- assertEquals(NUMBER_LINES * SIZE, controller.size());
-
- controller.close();
-
- }
-
- private void addString(final String str, final ByteBuffer buffer)
- {
- CharBuffer charBuffer = CharBuffer.wrap(str);
- UTF_8_ENCODER.encode(charBuffer, buffer, true);
-
- }
-
-}
More information about the jboss-cvs-commits
mailing list