[jboss-cvs] JBoss Messaging SVN: r3775 - in projects/jaio/trunk/jaio/java: src/org/jboss/jaio/api/logging and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 22 20:14:17 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-02-22 20:14:17 -0500 (Fri, 22 Feb 2008)
New Revision: 3775
Added:
projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOCallback.java
projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AssynchronousFile.java
projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/JLibAIO.java
projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/MultiThreadTests.java
projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/SingleThreadTests.java
Removed:
projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOController.java
projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOPackage.java
projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIORecoveryCallback.java
projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/LibAIOController.java
projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/TestController.java
Modified:
projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/logging/SystemOutLog.java
Log:
Finishing API
Copied: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOCallback.java (from rev 3753, projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOPackage.java)
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOCallback.java (rev 0)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOCallback.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -0,0 +1,19 @@
+package org.jboss.jaio.api;
+
+import java.nio.ByteBuffer;
+
+
+
+/**
+ *
+ * @author clebert.suconic at jboss.com
+ *
+ */
+public interface AIOCallback
+{
+ /** Leave this method as soon as possible, or you would be blocking the whole notification thread */
+ void done();
+
+ /** Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations*/
+ void onError(int errorCode, String errorMessage);
+}
Deleted: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOController.java
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOController.java 2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOController.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -1,37 +0,0 @@
-package org.jboss.jaio.api;
-
-/**
- *
- * @author clebert.suconic at jboss.com
- *
- */
-public interface AIOController
-{
-
- void close();
-
- /**
- *
- * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error
- * @param fileName
- * @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
- */
- void open(String fileName, int maxIO);
-
- /**
- * Warning: This function will perform a synchronous IO, probably translating to a fstat call
- * */
- long size();
-
- void append(AIOPackage aioPackage);
-
- void write(long position, AIOPackage aioPackage);
-
- void read(long position, AIOPackage aioPackage);
-
- void recoverContent(AIORecoveryCallback callbackRecover);
-
- void preAllocate(int blocks, long size);
-
-
-}
Deleted: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOPackage.java
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOPackage.java 2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOPackage.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -1,26 +0,0 @@
-package org.jboss.jaio.api;
-
-import java.nio.ByteBuffer;
-
-
-
-/**
- *
- * @author clebert.suconic at jboss.com
- *
- */
-public interface AIOPackage
-{
- int encodeSize();
-
- void encode(ByteBuffer buffer);
-
- void decode(int length, ByteBuffer buffer);
-
- void done();
-
- /** Observation: The whole journal will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations*/
- void onError(int errorCode, String errorMessage);
-
-
-}
Deleted: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIORecoveryCallback.java
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIORecoveryCallback.java 2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIORecoveryCallback.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -1,13 +0,0 @@
-package org.jboss.jaio.api;
-
-import java.nio.ByteBuffer;
-
-/**
- *
- * @author clebert.suconic at jboss.com
- *
- */
-public interface AIORecoveryCallback
-{
- void callbackRead(int bytes, ByteBuffer buffer);
-}
Copied: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AssynchronousFile.java (from rev 3753, projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOController.java)
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AssynchronousFile.java (rev 0)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AssynchronousFile.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -0,0 +1,39 @@
+package org.jboss.jaio.api;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ * @author clebert.suconic at jboss.com
+ *
+ */
+public interface AssynchronousFile
+{
+
+ void close();
+
+ /**
+ *
+ * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error
+ * @param fileName
+ * @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
+ */
+ void open(String fileName, int maxIO);
+
+ /**
+ * Warning: This function will perform a synchronous IO, probably translating to a fstat call
+ * */
+ long size();
+
+ void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage);
+
+ void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage);
+
+ void preAllocate(int blocks, long size);
+
+ ByteBuffer newBuffer(long size);
+
+ void destroyBuffer(ByteBuffer buffer);
+
+
+}
Modified: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/logging/SystemOutLog.java
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/logging/SystemOutLog.java 2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/logging/SystemOutLog.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -54,7 +54,7 @@
e = new Exception ("trace");
}
- e.printStackTrace();
+ e.printStackTrace(System.out);
}
Copied: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/JLibAIO.java (from rev 3753, projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/LibAIOController.java)
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/JLibAIO.java (rev 0)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/JLibAIO.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -0,0 +1,142 @@
+package org.jboss.jaio.libaioimpl;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.jaio.api.AssynchronousFile;
+import org.jboss.jaio.api.AIOCallback;
+import org.jboss.jaio.api.logging.LogFactory;
+import org.jboss.jaio.api.logging.Logger;
+
+/**
+ *
+ * @author clebert.suconic at jboss.com
+ *
+ */
+public class JLibAIO implements AssynchronousFile
+{
+ private static Logger log = LogFactory.getFactory().createLogger(JLibAIO.class.getCanonicalName());
+ private boolean opened = false;
+ private String fileName;
+ private Thread poller;
+ /**
+ * Warning: Beware of the C++ pointer! It will bite you! :-)
+ */
+ private long handler;
+
+ Object lock = new Object();
+
+ static
+ {
+ try
+ {
+ log.info("JLibAIO being loaded");
+ System.loadLibrary("JLibAIO");
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getLocalizedMessage(), e);
+ throw new RuntimeException (e.toString(), e);
+ }
+ }
+
+
+ public void open(String fileName, int maxIO)
+ {
+ opened = true;
+ this.fileName=fileName;
+ handler = init (fileName, AIOCallback.class, maxIO, LogFactory.getFactory().createLogger(this.getClass().getCanonicalName()+".native"));
+ startPoller();
+ }
+
+ class PollerThread extends Thread
+ {
+ PollerThread ()
+ {
+ super("NativePoller for " + fileName);
+ }
+ public void run()
+ {
+ pollEvents();
+ }
+ }
+
+ private void startPoller()
+ {
+ checkOpened();
+ poller = new PollerThread();
+ poller.start();
+ }
+
+ public void close()
+ {
+ checkOpened();
+ opened = false;
+ closeInternal(handler);
+ handler = 0;
+ }
+
+
+ public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+ {
+ checkOpened();
+ write (handler, position, size, directByteBuffer, aioPackage);
+
+ }
+
+ public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+ {
+ checkOpened();
+ read (handler, position, size, directByteBuffer, aioPackage);
+
+ }
+
+ public long size()
+ {
+ checkOpened();
+ // TODO: wire this method to ftell
+ return 0;
+ }
+
+ public void preAllocate(int blocks, long size)
+ {
+ checkOpened();
+ preAllocate(handler, blocks, size);
+ }
+
+ private void pollEvents()
+ {
+ checkOpened();
+ internalPollEvents(handler);
+ }
+
+ private void checkOpened()
+ {
+ if (!opened)
+ {
+ throw new RuntimeException("File is not opened");
+ }
+ }
+
+ /**
+ * I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
+ */
+ @SuppressWarnings("unchecked")
+ private static native long init(String fileName, Class aioPackageClazz, int maxIO, Logger logger);
+
+ private static native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
+
+ private static native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
+
+ private static native void preAllocate(long handle, int blocks, long size);
+
+ private static native void closeInternal(long handler);
+
+ /** Poll asynchrounous events from internal queues */
+ private static native void internalPollEvents(long handler);
+
+ public native void destroyBuffer(ByteBuffer buffer);
+
+ public native ByteBuffer newBuffer(long size);
+
+
+}
Deleted: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/LibAIOController.java
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/LibAIOController.java 2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/LibAIOController.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -1,141 +0,0 @@
-package org.jboss.jaio.libaioimpl;
-
-import java.util.LinkedList;
-
-import org.jboss.jaio.api.AIOController;
-import org.jboss.jaio.api.AIOPackage;
-import org.jboss.jaio.api.AIORecoveryCallback;
-import org.jboss.jaio.api.logging.LogFactory;
-import org.jboss.jaio.api.logging.Logger;
-
-/**
- *
- * @author clebert.suconic at jboss.com
- *
- */
-public class LibAIOController implements AIOController
-{
- private static Logger log = LogFactory.getFactory().createLogger(LibAIOController.class.getCanonicalName());
- private boolean opened = false;
- private String fileName;
- private Thread poller;
- /**
- * Warning: Beware of the C++ pointer! It will bite you! :-)
- */
- private long handler;
-
- Object lock = new Object();
-
- static
- {
- try
- {
- log.info("JLibAIO being loaded");
- System.loadLibrary("Jaio");
- }
- catch (Throwable e)
- {
- log.error(e.getLocalizedMessage(), e);
- throw new RuntimeException (e.toString(), e);
- }
- }
-
-
- public void open(String fileName, int maxIO)
- {
- opened = true;
- this.fileName=fileName;
- handler = init (fileName, AIOPackage.class, maxIO, LogFactory.getFactory().createLogger(this.getClass().getCanonicalName()+".native"));
- startPoller();
- }
-
- class PollerThread extends Thread
- {
- PollerThread ()
- {
- super("NativePoller for " + fileName);
- }
- public void run()
- {
- pollEvents();
- }
- }
-
- private void startPoller()
- {
- if (handler == 0)
- throw new RuntimeException("not opened");
- poller = new PollerThread();
- poller.start();
- }
-
- public void close()
- {
- if (poller!=null)
- {
- opened = false;
- }
- closeInternal(handler);
- handler = 0;
- }
-
-
- public void append(AIOPackage aioPackage)
- {
-
- append(handler, aioPackage);
-
- }
-
- public void read(long position, AIOPackage aioPackage)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void recoverContent(AIORecoveryCallback callbackRecover)
- {
- // TODO Auto-generated method stub
-
- }
-
- public long size()
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- public void write(long position, AIOPackage aioPackage)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void preAllocate(int blocks, long size)
- {
- preAllocate(handler, blocks, size);
- }
-
- private void pollEvents()
- {
- internalPollEvents(handler);
- }
-
- @SuppressWarnings("unchecked")
- /**
- * I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
- */
- private static native long init(String fileName, Class aioPackageClazz, int maxIO, Logger logger);
-
- private static native void append(long handle, AIOPackage aioPackage);
-
- private static native void preAllocate(long handle, int blocks, long size);
-
- private static native void closeInternal(long handler);
-
- /** Poll asynchrounous events from internal queues */
- private static native void internalPollEvents(long handler);
-
-
-
-}
Added: projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/MultiThreadTests.java
===================================================================
--- projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/MultiThreadTests.java (rev 0)
+++ projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/MultiThreadTests.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -0,0 +1,193 @@
+package org.jboss.jaio.libaioimpl.test;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.jaio.api.AIOCallback;
+import org.jboss.jaio.libaioimpl.JLibAIO;
+
+
+import junit.framework.TestCase;
+
+public class MultiThreadTests extends TestCase
+{
+ private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
+
+
+ static int position = 0;
+
+ String FILE_NAME="/tmp/libaio.log";
+
+ static final int SIZE = 1024;
+ static final int NUMBER_OF_THREADS = 5;
+ static final int NUMBER_OF_LINES = 50000;
+
+
+ private static void addData(JLibAIO aio, ByteBuffer buffer, AIOCallback callback)
+ {
+ aio.write(getNewPosition()*SIZE, SIZE, buffer, callback);
+ }
+
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ File file = new File(FILE_NAME);
+ file.delete();
+
+ position = 0;
+ }
+
+ public void testMultipleASynchronousWrites() throws Throwable
+ {
+ JLibAIO jlibAIO = new JLibAIO();
+ jlibAIO.open(FILE_NAME, 10000);
+ System.out.println("Preallocating file");
+ jlibAIO.preAllocate(NUMBER_OF_THREADS, SIZE * NUMBER_OF_LINES);
+
+ CountDownLatch latchStart = new CountDownLatch (NUMBER_OF_THREADS + 1);
+ ArrayList<ThreadProducer> list = new ArrayList<ThreadProducer>(NUMBER_OF_THREADS);
+ for(int i=0;i<NUMBER_OF_THREADS;i++)
+ {
+ ThreadProducer producer = new ThreadProducer("Thread " + i, latchStart, jlibAIO);
+ list.add(producer);
+ producer.start();
+ }
+
+ latchStart.countDown();
+ latchStart.await();
+
+ long startTime = System.currentTimeMillis();
+
+ for (ThreadProducer producer: list)
+ {
+ producer.join();
+ if (producer.failed != null)
+ {
+ throw producer.failed;
+ }
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ System.out.println("Records/Second = " + (NUMBER_OF_THREADS * NUMBER_OF_LINES * 1000 / (endTime - startTime)));
+ }
+
+
+
+
+ private static synchronized int getNewPosition()
+ {
+ return position++;
+ }
+
+ static class ThreadProducer extends Thread
+ {
+
+ Throwable failed = null;
+ CountDownLatch latchStart;
+ JLibAIO libaio;
+
+ public ThreadProducer(String name, CountDownLatch latchStart, JLibAIO libaio)
+ {
+ super(name);
+ this.latchStart = latchStart;
+ this.libaio = libaio;
+ }
+
+ public void run()
+ {
+ super.run();
+
+
+ try
+ {
+
+ latchStart.countDown();
+ latchStart.await();
+
+
+ CountDownLatch latchFinishThread = new CountDownLatch(NUMBER_OF_LINES);
+
+ LinkedList<LocalCallback> list = new LinkedList<LocalCallback>();
+
+ for (int i=0;i<NUMBER_OF_LINES;i++)
+ {
+ //System.out.println("Thread " + Thread.currentThread().getName() + " Adding a line");
+ ByteBuffer buffer = libaio.newBuffer(SIZE);
+ addString ("Thread name=" + Thread.currentThread().getName() + ";" + i + "\n", buffer);
+ for (int local = buffer.position(); local < buffer.capacity() - 1; local++)
+ {
+ buffer.put((byte)' ');
+ }
+ buffer.put((byte)'\n');
+
+ LocalCallback callback = new LocalCallback(latchFinishThread, buffer, libaio);
+ list.add(callback);
+ addData(libaio, buffer,callback);
+ //libaio.write(getNewPosition() * SIZE, SIZE, buffer, callback);
+ }
+ latchFinishThread.await();
+
+ for (LocalCallback callback: list)
+ {
+ assertTrue (callback.doneCalled);
+ assertFalse (callback.errorCalled);
+ }
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ failed = e;
+ }
+
+ }
+ }
+
+ private static void addString(String str, ByteBuffer buffer)
+ {
+ CharBuffer charBuffer = CharBuffer.wrap(str);
+ UTF_8_ENCODER.encode(charBuffer, buffer, true);
+
+ }
+
+ static class LocalCallback implements AIOCallback
+ {
+
+ boolean doneCalled = false;
+ boolean errorCalled = false;
+ CountDownLatch latchDone;
+ ByteBuffer releaseMe;
+ JLibAIO libaio;
+
+ public LocalCallback(CountDownLatch latchDone, ByteBuffer releaseMe, JLibAIO libaio)
+ {
+ this.latchDone = latchDone;
+ this.releaseMe = releaseMe;
+ this.libaio = libaio;
+ }
+
+ public void done()
+ {
+ doneCalled=true;
+ latchDone.countDown();
+ libaio.destroyBuffer(releaseMe);
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ errorCalled=true;
+ latchDone.countDown();
+ libaio.destroyBuffer(releaseMe);
+ }
+
+ }
+}
Property changes on: projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/MultiThreadTests.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Copied: projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/SingleThreadTests.java (from rev 3753, projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/TestController.java)
===================================================================
--- projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/SingleThreadTests.java (rev 0)
+++ projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/SingleThreadTests.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -0,0 +1,413 @@
+package org.jboss.jaio.libaioimpl.test;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.jaio.api.AIOCallback;
+import org.jboss.jaio.libaioimpl.JLibAIO;
+
+import junit.framework.TestCase;
+
+public class SingleThreadTests extends TestCase
+{
+ private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
+
+ public static void main(String arg[])
+ {
+ try
+ {
+ SingleThreadTests test = new SingleThreadTests();
+ test.setUp();
+ test.testRead();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ byte commonBuffer[] = null;
+
+ String FILE_NAME="/tmp/libaio.log";
+
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ LocalAIO.staticDone = 0;
+ File file = new File(FILE_NAME);
+ file.delete();
+ }
+
+ private void encodeBufer(ByteBuffer buffer)
+ {
+ buffer.clear();
+ int size = buffer.limit();
+ for (int i=0;i<size-1;i++)
+ {
+ buffer.put((byte)('a' + (i%20)));
+ }
+
+ buffer.put((byte)'\n');
+
+ }
+
+ public void testAddBeyongSimultaneousLimit() throws Exception
+ {
+ asyncData(150000,1024,100);
+ }
+
+ public void testAddAsyncData() throws Exception
+ {
+ asyncData(150000,1024,20000);
+ }
+
+ public void testRead() throws Exception
+ {
+
+
+
+
+ final JLibAIO controller = new JLibAIO();
+ try
+ {
+
+ final int NUMBER_LINES = 300;
+ final int SIZE = 1024;
+
+ controller.open(FILE_NAME, 10);
+
+ ByteBuffer buffer = controller.newBuffer(SIZE);
+
+
+ for (int i=0; i<NUMBER_LINES; i++)
+ {
+ buffer.clear();
+ addString ("Str value " + i + "\n", buffer);
+ for (int j=buffer.position(); j<buffer.capacity()-1;j++)
+ {
+ buffer.put((byte)' ');
+ }
+ buffer.put((byte)'\n');
+
+
+ CountDownLatch latch = new CountDownLatch(1);
+ LocalAIO aio = new LocalAIO(latch);
+ controller.write(i * SIZE, SIZE, buffer, aio);
+ latch.await();
+ assertFalse(aio.errorCalled);
+ assertTrue(aio.doneCalled);
+ }
+
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+
+ for (int i=0; i<NUMBER_LINES; i++)
+ {
+ newBuffer.clear();
+ addString ("Str value " + i + "\n", newBuffer);
+ for (int j=newBuffer.position(); j<newBuffer.capacity()-1;j++)
+ {
+ newBuffer.put((byte)' ');
+ }
+ newBuffer.put((byte)'\n');
+
+
+ CountDownLatch latch = new CountDownLatch(1);
+ LocalAIO aio = new LocalAIO(latch);
+ controller.read(i * SIZE, SIZE, buffer, aio);
+ latch.await();
+ assertFalse(aio.errorCalled);
+ assertTrue(aio.doneCalled);
+
+ byte bytesRead[] = new byte[SIZE];
+ byte bytesCompare[] = new byte[SIZE];
+
+ newBuffer.rewind();
+ newBuffer.get(bytesCompare);
+ buffer.rewind();
+ buffer.get(bytesRead);
+
+ for (int count=0;count<SIZE;count++)
+ {
+ assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
+ }
+
+
+ //byte[] byteCompare = new byte[SIZE];
+ //byte[] byteRead = new byte[SIZE];
+
+ assertTrue(buffer.equals(newBuffer));
+ }
+
+ controller.destroyBuffer(buffer);
+ }
+ finally
+ {
+ try { controller.close(); } catch (Throwable ignored){}
+
+ }
+
+ }
+
+ private void asyncData(int numberOfLines, int size, int aioLimit) throws Exception
+ {
+ final JLibAIO controller = new JLibAIO();
+ controller.open(FILE_NAME, aioLimit);
+
+ try
+ {
+ System.out.println("++testDirectDataNoPage"); System.out.flush();
+ CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+
+ ByteBuffer block = controller.newBuffer(size);
+ encodeBufer(block);
+
+ preAlloc(controller, numberOfLines * size);
+
+ ArrayList<LocalAIO> list = new ArrayList<LocalAIO>();
+
+ for (int i=0; i<numberOfLines; i++)
+ {
+ list.add(new LocalAIO(latchDone));
+ }
+
+
+ long valueInitial = System.currentTimeMillis();
+
+ System.out.println("Adding data");
+
+ long lastTime = System.currentTimeMillis();
+ int counter = 0;
+ for (LocalAIO tmp: list)
+ {
+ controller.write(counter * size, size, block, tmp);
+ if (++counter % 5000 == 0)
+ {
+ System.out.println(5000*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Async)");
+ lastTime = System.currentTimeMillis();
+ }
+
+ }
+
+ System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
+
+
+ System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
+ System.out.println("Flush now");
+ System.out.println("Received " + LocalAIO.staticDone);
+ long timeTotal = System.currentTimeMillis() - valueInitial;
+
+ System.out.println("Asynchronous time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
+
+ latchDone.await();
+
+ timeTotal = System.currentTimeMillis() - valueInitial;
+ System.out.println("After completions time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
+
+ for (LocalAIO tmp: list)
+ {
+ assertEquals(1, tmp.timesDoneCalled);
+ assertTrue(tmp.doneCalled);
+ assertFalse(tmp.errorCalled);
+ }
+
+ controller.destroyBuffer(block);
+
+ controller.close();
+ }
+ finally
+ {
+ try {controller.close();} catch (Exception ignored){}
+ }
+
+
+ }
+
+ public void testDirectSynchronous() throws Exception
+ {
+ try
+ {
+ System.out.println("++testDirectDataNoPage"); System.out.flush();
+ final int NUMBER_LINES = 100000;
+ final int SIZE = 1024;
+ //final int SIZE = 512;
+
+ final JLibAIO controller = new JLibAIO();
+ controller.open(FILE_NAME, 2000);
+
+ ByteBuffer block = controller.newBuffer(SIZE);
+ encodeBufer(block);
+
+ preAlloc(controller, NUMBER_LINES * SIZE);
+
+ long valueInitial = System.currentTimeMillis();
+
+ System.out.println("Adding data");
+
+ long lastTime = System.currentTimeMillis();
+ int counter = 0;
+
+ for (int i=0; i<NUMBER_LINES; i++)
+ {
+ CountDownLatch latchDone = new CountDownLatch(1);
+ LocalAIO aioBlock = new LocalAIO(latchDone);
+ controller.write(i*512, 512, block, aioBlock);
+ latchDone.await();
+ assertTrue(aioBlock.doneCalled);
+ assertFalse(aioBlock.errorCalled);
+ if (++counter % 500 == 0)
+ {
+ System.out.println(500*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Synchronous)");
+ lastTime = System.currentTimeMillis();
+ }
+ }
+
+ System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
+
+
+ System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
+ System.out.println("Flush now");
+ System.out.println("Received " + LocalAIO.staticDone);
+
+ long timeTotal = System.currentTimeMillis() - valueInitial;
+ System.out.println("Flushed " + timeTotal);
+ System.out.println("time = " + timeTotal + " for " + NUMBER_LINES + " registers " + " size each line = " + SIZE + " Records/Sec=" + (NUMBER_LINES*1000/timeTotal) + " Synchronous");
+
+ controller.destroyBuffer(block);
+ controller.close();
+ }
+ catch (Exception e)
+ {
+ System.out.println("Received " + LocalAIO.staticDone + " before it failed");
+ throw e;
+ }
+
+ }
+
+ private void preAlloc(JLibAIO controller, long size)
+ {
+ System.out.println("Pre allocating"); System.out.flush();
+ long startPreAllocate = System.currentTimeMillis();
+ controller.preAllocate(1, size);
+ long endPreAllocate = System.currentTimeMillis() - startPreAllocate;
+ if (endPreAllocate != 0) System.out.println("PreAllocated the file in " + endPreAllocate + " seconds, What means " + (size/endPreAllocate) + " bytes per millisecond");
+ }
+
+
+ public void testInvalidWrite() throws Exception
+ {
+ final JLibAIO controller = new JLibAIO();
+ controller.open(FILE_NAME, 2000);
+
+ try
+ {
+
+ final int SIZE=512;
+
+ ByteBuffer block = controller.newBuffer(SIZE);
+ encodeBufer(block);
+
+ preAlloc(controller, 1000 * 512);
+
+
+ CountDownLatch latchDone = new CountDownLatch(1);
+
+ LocalAIO aioBlock = new LocalAIO(latchDone);
+ controller.write(11, 512, block, aioBlock);
+
+ latchDone.await();
+
+ assertTrue (aioBlock.errorCalled);
+ assertFalse(aioBlock.doneCalled);
+
+ controller.destroyBuffer(block);
+ }
+ catch (Exception e)
+ {
+ System.out.println("Received " + LocalAIO.staticDone + " before it failed");
+ throw e;
+ }
+ finally
+ {
+ controller.close();
+ }
+
+ }
+
+ public void testInvalidAlloc() throws Exception
+ {
+ JLibAIO controller = new JLibAIO();
+ try
+ {
+ // You don't need to open the file to alloc it
+ ByteBuffer buffer = controller.newBuffer(300);
+ fail ("Exception expected");
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ }
+
+ private static class LocalAIO implements AIOCallback
+ {
+
+ CountDownLatch latch;
+
+ public LocalAIO(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ boolean doneCalled = false;
+ boolean errorCalled = false;
+ int timesDoneCalled = 0;
+ static int staticDone = 0;
+ public void decode(int length, ByteBuffer buffer)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void done()
+ {
+ //System.out.println("Received Done"); System.out.flush();
+ doneCalled = true;
+ timesDoneCalled++;
+ staticDone++;
+ if (latch != null)
+ {
+ latch.countDown();
+ }
+
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ errorCalled = true;
+ if (latch != null)
+ {
+ // even thought an error happened, we need to inform the latch, or the test won't finish
+ latch.countDown();
+ }
+ System.out.println("Received an Error - " + errorCode + " message=" + errorMessage);
+
+ }
+
+ }
+
+ private void addString(String str, ByteBuffer buffer)
+ {
+ CharBuffer charBuffer = CharBuffer.wrap(str);
+ UTF_8_ENCODER.encode(charBuffer, buffer, true);
+
+ }
+
+
+}
Deleted: projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/TestController.java
===================================================================
--- projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/TestController.java 2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/TestController.java 2008-02-23 01:14:17 UTC (rev 3775)
@@ -1,363 +0,0 @@
-package org.jboss.jaio.libaioimpl.test;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-
-import org.jboss.jaio.api.AIOPackage;
-import org.jboss.jaio.libaioimpl.LibAIOController;
-
-import junit.framework.TestCase;
-
-public class TestController extends TestCase
-{
-
- byte commonBuffer[] = null;
-
- String FILE_NAME="/tmp/libaio.log";
-
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- LocalAIO.staticDone = 0;
- File file = new File(FILE_NAME);
- file.delete();
- }
-
- // If an AIO encode blocks is blocked, we should be able to finish execution (validating the thread model)
- public void testBlockOnSize() throws Exception
- {
- System.out.println("++testDirectDataNoPage"); System.out.flush();
- final LibAIOController controller = new LibAIOController();
- controller.open(FILE_NAME, 100);
- controller.preAllocate(1, 100 * 512);
-
- final CountDownLatch releaseSize = new CountDownLatch(1);
- final CountDownLatch releaseEncode = new CountDownLatch(1);
- final CountDownLatch latchDone = new CountDownLatch(2);
- final AIOBlocker block1 = new AIOBlocker(true, false, releaseSize, latchDone);
- final AIOBlocker block2 = new AIOBlocker(false, true, releaseEncode, latchDone);
-
- Thread blocker = new Thread()
- {
- public void run()
- {
- controller.append(block1);
- controller.append(block2);
- }
- };
-
-
- blocker.start();
-
- Thread.sleep(1000); // some time to the other thread wake up (a lot of time actually)
-
- ArrayList<LocalAIO> aios = new ArrayList<LocalAIO>();
-
-
- CountDownLatch latchDoneOnRegularblocks = new CountDownLatch(10);
-
- for (int i=0;i<10;i++) aios.add(new LocalAIO(512, latchDoneOnRegularblocks));
-
- for (LocalAIO tmp: aios)
- {
- controller.append(tmp);
- }
-
- latchDoneOnRegularblocks.await();
-
- for (LocalAIO tmp: aios)
- {
- assertTrue(tmp.doneCalled);
- }
-
- releaseSize.countDown();
- releaseEncode.countDown();
- latchDone.await();
-
- assertTrue (block1.doneCalled);
- assertTrue (block2.doneCalled);
-
- controller.close();
-
- }
-
- public void testAddAsyncData() throws Exception
- {
- try
- {
- System.out.println("++testDirectDataNoPage"); System.out.flush();
- final int NUMBER_LINES = 500000;
- final int SIZE = 1024;
- //final int SIZE = 512;
-
- CountDownLatch latchDone = new CountDownLatch(NUMBER_LINES);
- final LibAIOController controller = new LibAIOController();
- controller.open(FILE_NAME, 20000);
- System.out.println("Pre allocating file"); System.out.flush();
-
- long startPreAllocate = System.currentTimeMillis();
- controller.preAllocate(1, NUMBER_LINES * SIZE);
- long endPreAllocate = System.currentTimeMillis() - startPreAllocate;
- if (endPreAllocate != 0) System.out.println("PreAllocated the file in " + endPreAllocate + " What means " + ((100 * 1024*1024)/endPreAllocate) + " bytes per millisecond");
- System.out.println("Pre allocating file done"); System.out.flush();
- ArrayList<LocalAIO> list = new ArrayList<LocalAIO>();
-
- for (int i=0; i<NUMBER_LINES; i++)
- {
- list.add(new LocalAIO(SIZE, latchDone));
- }
-
-
- long valueInitial = System.currentTimeMillis();
-
- System.out.println("Adding data");
-
- long lastTime = System.currentTimeMillis();
- int counter = 0;
- for (LocalAIO tmp: list)
- {
- if (SIZE > 1024*1024) System.out.println("adding"); System.out.flush();
- controller.append(tmp);
- if (++counter % 500 == 0)
- {
- System.out.println(500*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Async)");
- lastTime = System.currentTimeMillis();
- }
-
- }
-
- System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
-
-
- System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
- System.out.println("Flush now");
- System.out.println("Received " + LocalAIO.staticDone);
- latchDone.await();
-
- long timeTotal = System.currentTimeMillis() - valueInitial;
- System.out.println("Flushed " + timeTotal);
- System.out.println("time = " + timeTotal + " for " + NUMBER_LINES + " registers " + " size each line = " + SIZE + " Records/Sec=" + (NUMBER_LINES*1000/timeTotal) + " (Assynchronous)");
-
- for (LocalAIO tmp: list)
- {
- assertEquals(1, tmp.timesDoneCalled);
- assertTrue(tmp.encodeCalled);
- assertTrue(tmp.encodeSizeCalled);
- assertTrue(tmp.doneCalled);
- }
-
-
- controller.close();
- }
- catch (Exception e)
- {
- System.out.println("Received " + LocalAIO.staticDone + " before it failed");
- }
-
- }
-
- public void testDirectSynchronous() throws Exception
- {
- try
- {
- System.out.println("++testDirectDataNoPage"); System.out.flush();
- final int NUMBER_LINES = 100000;
- final int SIZE = 1024;
- //final int SIZE = 512;
-
- final LibAIOController controller = new LibAIOController();
- controller.open(FILE_NAME, 2000);
-
- long startPreAllocate = System.currentTimeMillis();
- controller.preAllocate(1, NUMBER_LINES * SIZE);
- long endPreAllocate = System.currentTimeMillis() - startPreAllocate;
- if (endPreAllocate != 0) System.out.println("PreAllocated the file in " + endPreAllocate + " seconds, What means " + ((100 * 1024*1024)/endPreAllocate) + " bytes per millisecond");
-
- long valueInitial = System.currentTimeMillis();
-
- System.out.println("Adding data");
-
- long lastTime = System.currentTimeMillis();
- int counter = 0;
-
- for (int i=0; i<NUMBER_LINES; i++)
- {
- CountDownLatch latchDone = new CountDownLatch(1);
- LocalAIO aioBlock = new LocalAIO(SIZE, latchDone);
- controller.append(aioBlock);
- latchDone.await();
- assertTrue(aioBlock.doneCalled);
- assertTrue(aioBlock.encodeCalled);
- if (++counter % 500 == 0)
- {
- System.out.println(500*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Synchronous)");
- lastTime = System.currentTimeMillis();
- }
- }
-
- System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
-
-
- System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
- System.out.println("Flush now");
- System.out.println("Received " + LocalAIO.staticDone);
-
- long timeTotal = System.currentTimeMillis() - valueInitial;
- System.out.println("Flushed " + timeTotal);
- System.out.println("time = " + timeTotal + " for " + NUMBER_LINES + " registers " + " size each line = " + SIZE + " Records/Sec=" + (NUMBER_LINES*1000/timeTotal) + " Synchronous");
-
- controller.close();
- }
- catch (Exception e)
- {
- System.out.println("Received " + LocalAIO.staticDone + " before it failed");
- throw e;
- }
-
- }
-
- private static class AIOBlocker implements AIOPackage
- {
-
- boolean doneCalled = false;
-
- private boolean blockSize;
- private boolean blockEncode;
- private CountDownLatch latch;
- private CountDownLatch latchDone;
-
- public AIOBlocker(boolean blockSize, boolean blockEncode, CountDownLatch latch, CountDownLatch latchDone)
- {
- this.blockSize = blockSize;
- this.blockEncode = blockEncode;
- this.latch = latch;
- this.latchDone = latchDone;
- }
-
- public void decode(int length, ByteBuffer buffer)
- {
- }
-
- public void done()
- {
- latchDone.countDown();
- doneCalled = true;
- }
-
- public void encode(ByteBuffer buffer)
- {
- if (blockEncode)
- {
- try
- {
- latch.await();
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
-
- for (int i=0;i<511;i++)
- {
- buffer.put((byte)'b');
- }
- buffer.put((byte)'\n');
-
-
- }
-
- public int encodeSize()
- {
- if (blockSize)
- {
- try
- {
- latch.await();
- } catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- return 512;
- }
-
- public void onError(int errorCode, String errorMessage)
- {
-
- }
-
- }
-
- private static class LocalAIO implements AIOPackage
- {
-
- CountDownLatch latch;
- int size = 1;
- public LocalAIO()
- {
- this(10);
- }
-
- public LocalAIO(int size)
- {
- this.size = size;
- }
-
- public LocalAIO(int size, CountDownLatch latch)
- {
- this(size);
- this.latch = latch;
- }
-
- boolean encodeCalled = false;
- boolean encodeSizeCalled = false;
- boolean doneCalled = false;
- int timesDoneCalled = 0;
- static int staticDone = 0;
- public void decode(int length, ByteBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void done()
- {
- //System.out.println("Received Done"); System.out.flush();
- doneCalled = true;
- timesDoneCalled++;
- staticDone++;
- if (latch != null)
- {
- latch.countDown();
- }
-
- }
-
- public void encode(ByteBuffer buffer)
- {
- encodeCalled = true;
- for (int i=0; i<size-1; i++)
- {
- buffer.put((byte)('A' + (i%10)));
- }
- buffer.put((byte)'\n');
- }
-
- public int encodeSize()
- {
- encodeSizeCalled = true;
- return size;
- }
-
- public void onError(int errorCode, String errorMessage)
- {
- System.out.println("Received an Error - " + errorCode + " message=" + errorMessage);
-
- }
-
- }
-}
More information about the jboss-cvs-commits
mailing list