[jboss-cvs] JBoss Messaging SVN: r3775 - in projects/jaio/trunk/jaio/java: src/org/jboss/jaio/api/logging and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 22 20:14:17 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-02-22 20:14:17 -0500 (Fri, 22 Feb 2008)
New Revision: 3775

Added:
   projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOCallback.java
   projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AssynchronousFile.java
   projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/JLibAIO.java
   projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/MultiThreadTests.java
   projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/SingleThreadTests.java
Removed:
   projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOController.java
   projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOPackage.java
   projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIORecoveryCallback.java
   projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/LibAIOController.java
   projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/TestController.java
Modified:
   projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/logging/SystemOutLog.java
Log:
Finishing API

Copied: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOCallback.java (from rev 3753, projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOPackage.java)
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOCallback.java	                        (rev 0)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOCallback.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -0,0 +1,19 @@
+package org.jboss.jaio.api;
+
+import java.nio.ByteBuffer;
+
+
+
+/**
+ * 
+ * @author clebert.suconic at jboss.com
+ *
+ */
+public interface AIOCallback
+{
+    /** Leave this method as soon as possible, or you would be blocking the whole notification thread */
+    void done();
+
+    /** Observation: The whole file will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations*/
+    void onError(int errorCode, String errorMessage);
+}

Deleted: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOController.java
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOController.java	2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOController.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -1,37 +0,0 @@
-package org.jboss.jaio.api;
-
-/**
- * 
- * @author clebert.suconic at jboss.com
- *
- */
-public interface AIOController
-{
-    
-    void close();
-    
-    /**
-     * 
-     * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error 
-     * @param fileName
-     * @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
-     */
-    void open(String fileName, int maxIO);
-
-    /** 
-     * Warning: This function will perform a synchronous IO, probably translating to a fstat call
-     * */
-    long size();
-    
-    void append(AIOPackage aioPackage);
-    
-    void write(long position, AIOPackage aioPackage);
-    
-    void read(long position, AIOPackage aioPackage);
-    
-    void recoverContent(AIORecoveryCallback callbackRecover);
-    
-    void preAllocate(int blocks, long size);
-    
-    
-}

Deleted: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOPackage.java
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOPackage.java	2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOPackage.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -1,26 +0,0 @@
-package org.jboss.jaio.api;
-
-import java.nio.ByteBuffer;
-
-
-
-/**
- * 
- * @author clebert.suconic at jboss.com
- *
- */
-public interface AIOPackage
-{
-    int encodeSize();
-
-    void encode(ByteBuffer buffer);
-
-    void decode(int length, ByteBuffer buffer);
-    
-    void done();
-
-    /** Observation: The whole journal will be probably failing if this happens. Like, if you delete the file, you will start to get errors for these operations*/
-    void onError(int errorCode, String errorMessage);
-    
-
-}

Deleted: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIORecoveryCallback.java
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIORecoveryCallback.java	2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIORecoveryCallback.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -1,13 +0,0 @@
-package org.jboss.jaio.api;
-
-import java.nio.ByteBuffer;
-
-/**
- * 
- * @author clebert.suconic at jboss.com
- *
- */
-public interface AIORecoveryCallback
-{
-    void callbackRead(int bytes, ByteBuffer buffer);
-}

Copied: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AssynchronousFile.java (from rev 3753, projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AIOController.java)
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AssynchronousFile.java	                        (rev 0)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/AssynchronousFile.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -0,0 +1,39 @@
+package org.jboss.jaio.api;
+
+import java.nio.ByteBuffer;
+
+/**
+ * 
+ * @author clebert.suconic at jboss.com
+ *
+ */
+public interface AssynchronousFile
+{
+    
+    void close();
+    
+    /**
+     * 
+     * Note: If you are using a native Linux implementation, maxIO can't be higher than what's defined on /proc/sys/fs/aio-max-nr, or you would get an error 
+     * @param fileName
+     * @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
+     */
+    void open(String fileName, int maxIO);
+
+    /** 
+     * Warning: This function will perform a synchronous IO, probably translating to a fstat call
+     * */
+    long size();
+    
+    void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage);
+    
+    void read(long position, long size, ByteBuffer directByteBuffer,  AIOCallback aioPackage);
+    
+    void preAllocate(int blocks, long size);
+
+    ByteBuffer newBuffer(long size);
+    
+    void destroyBuffer(ByteBuffer buffer);
+    
+    
+}

Modified: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/logging/SystemOutLog.java
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/logging/SystemOutLog.java	2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/api/logging/SystemOutLog.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -54,7 +54,7 @@
                 e = new Exception ("trace");
             }
             
-            e.printStackTrace();
+            e.printStackTrace(System.out);
             
         }
         

Copied: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/JLibAIO.java (from rev 3753, projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/LibAIOController.java)
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/JLibAIO.java	                        (rev 0)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/JLibAIO.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -0,0 +1,142 @@
+package org.jboss.jaio.libaioimpl;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.jaio.api.AssynchronousFile;
+import org.jboss.jaio.api.AIOCallback;
+import org.jboss.jaio.api.logging.LogFactory;
+import org.jboss.jaio.api.logging.Logger;
+
+/**
+ * 
+ * @author clebert.suconic at jboss.com
+ *
+ */
+public class JLibAIO implements AssynchronousFile
+{
+    private static Logger log = LogFactory.getFactory().createLogger(JLibAIO.class.getCanonicalName());
+    private boolean opened = false;
+    private String fileName;
+    private Thread poller;
+    /**
+     *  Warning: Beware of the C++ pointer! It will bite you! :-)
+     */ 
+    private long handler;
+    
+    Object lock = new Object();
+    
+    static
+    {
+        try
+        {
+            log.info("JLibAIO being loaded");
+            System.loadLibrary("JLibAIO");
+        }
+        catch (Throwable e)
+        {
+            log.error(e.getLocalizedMessage(), e);
+            throw new RuntimeException (e.toString(), e);
+        }
+    }
+
+    
+    public void open(String fileName, int maxIO)
+    {
+        opened = true;
+        this.fileName=fileName;
+        handler = init (fileName, AIOCallback.class, maxIO, LogFactory.getFactory().createLogger(this.getClass().getCanonicalName()+".native"));
+        startPoller();
+    }
+    
+    class PollerThread extends Thread
+    {
+        PollerThread ()
+        {
+            super("NativePoller for " + fileName);
+        }
+        public void run()
+        {
+            pollEvents();
+        }
+    }
+    
+    private void startPoller()
+    {
+        checkOpened();
+        poller = new PollerThread(); 
+        poller.start();
+    }
+    
+    public void close()
+    {
+        checkOpened();
+        opened = false;
+        closeInternal(handler);
+        handler = 0;
+    }
+    
+    
+    public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+    {
+        checkOpened();
+        write (handler, position, size, directByteBuffer, aioPackage);
+        
+    }
+
+    public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
+    {
+        checkOpened();
+        read (handler, position, size, directByteBuffer, aioPackage);
+        
+    }
+
+    public long size()
+    {
+        checkOpened();
+        // TODO: wire this method to ftell
+        return 0;
+    }
+
+    public void preAllocate(int blocks, long size)
+    {
+        checkOpened();
+        preAllocate(handler, blocks, size);
+    }
+
+    private void pollEvents()
+    {
+        checkOpened();
+        internalPollEvents(handler);
+    }
+    
+    private void checkOpened() 
+    {
+        if (!opened)
+        {
+            throw new RuntimeException("File is not opened");
+        }
+    }
+    
+    /** 
+     * I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
+     */
+    @SuppressWarnings("unchecked")
+    private static native long init(String fileName, Class aioPackageClazz, int maxIO, Logger logger);
+    
+    private static native void write(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
+
+    private static native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage);
+    
+    private static native void preAllocate(long handle, int blocks, long size);
+
+    private static native void closeInternal(long handler);
+    
+    /** Poll asynchrounous events from internal queues */
+    private static native void internalPollEvents(long handler);
+
+    public native void destroyBuffer(ByteBuffer buffer);
+
+    public native ByteBuffer newBuffer(long size);
+   
+    
+}

Deleted: projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/LibAIOController.java
===================================================================
--- projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/LibAIOController.java	2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/src/org/jboss/jaio/libaioimpl/LibAIOController.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -1,141 +0,0 @@
-package org.jboss.jaio.libaioimpl;
-
-import java.util.LinkedList;
-
-import org.jboss.jaio.api.AIOController;
-import org.jboss.jaio.api.AIOPackage;
-import org.jboss.jaio.api.AIORecoveryCallback;
-import org.jboss.jaio.api.logging.LogFactory;
-import org.jboss.jaio.api.logging.Logger;
-
-/**
- * 
- * @author clebert.suconic at jboss.com
- *
- */
-public class LibAIOController implements AIOController
-{
-    private static Logger log = LogFactory.getFactory().createLogger(LibAIOController.class.getCanonicalName());
-    private boolean opened = false;
-    private String fileName;
-    private Thread poller;
-    /**
-     *  Warning: Beware of the C++ pointer! It will bite you! :-)
-     */ 
-    private long handler;
-    
-    Object lock = new Object();
-    
-    static
-    {
-        try
-        {
-            log.info("JLibAIO being loaded");
-            System.loadLibrary("Jaio");
-        }
-        catch (Throwable e)
-        {
-            log.error(e.getLocalizedMessage(), e);
-            throw new RuntimeException (e.toString(), e);
-        }
-    }
-
-    
-    public void open(String fileName, int maxIO)
-    {
-        opened = true;
-        this.fileName=fileName;
-        handler = init (fileName, AIOPackage.class, maxIO, LogFactory.getFactory().createLogger(this.getClass().getCanonicalName()+".native"));
-        startPoller();
-    }
-    
-    class PollerThread extends Thread
-    {
-        PollerThread ()
-        {
-            super("NativePoller for " + fileName);
-        }
-        public void run()
-        {
-            pollEvents();
-        }
-    }
-    
-    private void startPoller()
-    {
-        if (handler == 0)
-            throw new RuntimeException("not opened");
-        poller = new PollerThread(); 
-        poller.start();
-    }
-    
-    public void close()
-    {
-        if (poller!=null)
-        {
-            opened = false;
-        }
-        closeInternal(handler);
-        handler = 0;
-    }
-    
-    
-    public void append(AIOPackage aioPackage)
-    {
-        
-        append(handler, aioPackage);
-        
-    }
-
-    public void read(long position, AIOPackage aioPackage)
-    {
-        // TODO Auto-generated method stub
-        
-    }
-
-    public void recoverContent(AIORecoveryCallback callbackRecover)
-    {
-        // TODO Auto-generated method stub
-        
-    }
-
-    public long size()
-    {
-        // TODO Auto-generated method stub
-        return 0;
-    }
-
-    public void write(long position, AIOPackage aioPackage)
-    {
-        // TODO Auto-generated method stub
-        
-    }
-    
-    public void preAllocate(int blocks, long size)
-    {
-        preAllocate(handler, blocks, size);
-    }
-
-    private void pollEvents()
-    {
-        internalPollEvents(handler);
-    }
-    
-    @SuppressWarnings("unchecked")
-    /** 
-     * I'm sending aioPackageClazz here, as you could have multiple classLoaders with the same class, and I don't want the hassle of doing classLoading in the Native layer
-     */
-    private static native long init(String fileName, Class aioPackageClazz, int maxIO, Logger logger);
-    
-    private static native void append(long handle, AIOPackage aioPackage);
-    
-    private static native void preAllocate(long handle, int blocks, long size);
-
-    private static native void closeInternal(long handler);
-    
-    /** Poll asynchrounous events from internal queues */
-    private static native void internalPollEvents(long handler);
-
-   
-    
-}

Added: projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/MultiThreadTests.java
===================================================================
--- projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/MultiThreadTests.java	                        (rev 0)
+++ projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/MultiThreadTests.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -0,0 +1,193 @@
+package org.jboss.jaio.libaioimpl.test;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.jaio.api.AIOCallback;
+import org.jboss.jaio.libaioimpl.JLibAIO;
+
+
+import junit.framework.TestCase;
+
+public class MultiThreadTests extends TestCase
+{
+    private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
+    
+
+    static int position = 0;
+    
+    String FILE_NAME="/tmp/libaio.log";
+    
+    static final int SIZE = 1024;
+    static final int NUMBER_OF_THREADS = 5;
+    static final int NUMBER_OF_LINES = 50000;
+    
+    
+    private static void addData(JLibAIO aio, ByteBuffer buffer, AIOCallback callback)
+    {
+        aio.write(getNewPosition()*SIZE, SIZE, buffer, callback);
+    }
+    
+    
+    
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        File file = new File(FILE_NAME);
+        file.delete();
+        
+        position = 0;
+    }
+    
+    public void testMultipleASynchronousWrites() throws Throwable
+    {
+        JLibAIO jlibAIO = new JLibAIO();
+        jlibAIO.open(FILE_NAME, 10000);
+        System.out.println("Preallocating file");
+        jlibAIO.preAllocate(NUMBER_OF_THREADS, SIZE * NUMBER_OF_LINES);
+        
+        CountDownLatch latchStart = new CountDownLatch (NUMBER_OF_THREADS + 1);
+        ArrayList<ThreadProducer> list = new ArrayList<ThreadProducer>(NUMBER_OF_THREADS);
+        for(int i=0;i<NUMBER_OF_THREADS;i++)
+        {
+            ThreadProducer producer = new ThreadProducer("Thread " + i, latchStart, jlibAIO);
+            list.add(producer);
+            producer.start();
+        }
+        
+        latchStart.countDown();
+        latchStart.await();
+        
+        long startTime = System.currentTimeMillis();
+        
+        for (ThreadProducer producer: list)
+        {
+            producer.join();
+            if (producer.failed != null)
+            {
+                throw producer.failed;
+            }
+        }
+        
+        long endTime = System.currentTimeMillis();
+        
+        System.out.println("Records/Second = " + (NUMBER_OF_THREADS * NUMBER_OF_LINES * 1000 / (endTime - startTime)));
+    }
+    
+    
+
+    
+    private static synchronized int getNewPosition()
+    {
+        return position++;
+    }
+    
+    static class ThreadProducer extends Thread
+    {
+        
+        Throwable failed = null;
+        CountDownLatch latchStart;
+        JLibAIO libaio;
+
+        public ThreadProducer(String name, CountDownLatch latchStart, JLibAIO libaio)
+        {
+            super(name);
+            this.latchStart = latchStart;
+            this.libaio = libaio;
+        }
+        
+        public void run()
+        {
+            super.run();
+            
+            
+            try
+            {
+                
+                latchStart.countDown();
+                latchStart.await();
+                
+                
+                CountDownLatch latchFinishThread = new CountDownLatch(NUMBER_OF_LINES);
+
+                LinkedList<LocalCallback> list = new LinkedList<LocalCallback>();
+                
+                for (int i=0;i<NUMBER_OF_LINES;i++)
+                {
+                    //System.out.println("Thread " + Thread.currentThread().getName() + " Adding a line");
+                    ByteBuffer buffer = libaio.newBuffer(SIZE);
+                    addString ("Thread name=" + Thread.currentThread().getName() + ";" + i + "\n", buffer);
+                    for (int local = buffer.position(); local < buffer.capacity() - 1; local++)
+                    {
+                        buffer.put((byte)' ');
+                    }
+                    buffer.put((byte)'\n');
+                    
+                    LocalCallback callback = new LocalCallback(latchFinishThread, buffer, libaio);
+                    list.add(callback);
+                    addData(libaio, buffer,callback);
+                    //libaio.write(getNewPosition() * SIZE, SIZE, buffer, callback);
+                }
+                latchFinishThread.await();
+                
+                for (LocalCallback callback: list)
+                {
+                    assertTrue (callback.doneCalled);
+                    assertFalse (callback.errorCalled);
+                }
+                
+            }
+            catch (Throwable e)
+            {
+                e.printStackTrace();
+                failed = e;
+            }
+            
+        }
+    }
+    
+    private static void addString(String str, ByteBuffer buffer)
+    {
+        CharBuffer charBuffer = CharBuffer.wrap(str);
+        UTF_8_ENCODER.encode(charBuffer, buffer, true);
+        
+    }
+    
+    static class LocalCallback implements AIOCallback
+    {
+
+        boolean doneCalled = false;
+        boolean errorCalled = false;
+        CountDownLatch latchDone;
+        ByteBuffer releaseMe;
+        JLibAIO libaio;
+        
+        public LocalCallback(CountDownLatch latchDone, ByteBuffer releaseMe, JLibAIO libaio)
+        {
+            this.latchDone = latchDone;
+            this.releaseMe = releaseMe;
+            this.libaio = libaio;
+        }
+        
+        public void done()
+        {
+            doneCalled=true;
+            latchDone.countDown();
+            libaio.destroyBuffer(releaseMe);
+        }
+
+        public void onError(int errorCode, String errorMessage)
+        {
+            errorCalled=true;
+            latchDone.countDown();
+            libaio.destroyBuffer(releaseMe);
+        }
+        
+    }
+}


Property changes on: projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/MultiThreadTests.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Copied: projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/SingleThreadTests.java (from rev 3753, projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/TestController.java)
===================================================================
--- projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/SingleThreadTests.java	                        (rev 0)
+++ projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/SingleThreadTests.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -0,0 +1,413 @@
+package org.jboss.jaio.libaioimpl.test;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.jaio.api.AIOCallback;
+import org.jboss.jaio.libaioimpl.JLibAIO;
+
+import junit.framework.TestCase;
+
+public class SingleThreadTests extends TestCase
+{
+    private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
+    
+    public static void main(String arg[])
+    {
+        try
+        {
+            SingleThreadTests test = new SingleThreadTests();
+            test.setUp();
+            test.testRead();
+        }
+        catch (Throwable e)
+        {
+            e.printStackTrace();
+        }
+    }
+    
+    byte commonBuffer[] = null; 
+    
+    String FILE_NAME="/tmp/libaio.log";
+    
+    
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        LocalAIO.staticDone = 0;
+        File file = new File(FILE_NAME);
+        file.delete();
+    }
+
+    private void encodeBufer(ByteBuffer buffer)
+    {
+        buffer.clear();
+        int size = buffer.limit();
+        for (int i=0;i<size-1;i++)
+        {
+            buffer.put((byte)('a' + (i%20)));
+        }
+        
+        buffer.put((byte)'\n');
+        
+    }
+    
+    public void testAddBeyongSimultaneousLimit() throws Exception
+    {
+        asyncData(150000,1024,100);
+    }
+
+    public void testAddAsyncData() throws Exception
+    {
+        asyncData(150000,1024,20000);
+    }
+    
+    public void testRead() throws Exception
+    {
+        
+        
+        
+
+        final JLibAIO controller = new JLibAIO();
+        try
+        {
+            
+            final int NUMBER_LINES = 300;
+            final int SIZE = 1024;
+            
+            controller.open(FILE_NAME, 10);
+            
+            ByteBuffer buffer = controller.newBuffer(SIZE);
+    
+            
+            for (int i=0; i<NUMBER_LINES; i++)
+            {
+                buffer.clear();
+                addString ("Str value " + i + "\n", buffer);
+                for (int j=buffer.position(); j<buffer.capacity()-1;j++)
+                {
+                    buffer.put((byte)' ');
+                }
+                buffer.put((byte)'\n');
+                
+                
+                CountDownLatch latch = new CountDownLatch(1);
+                LocalAIO aio = new LocalAIO(latch);
+                controller.write(i * SIZE, SIZE, buffer, aio);
+                latch.await();
+                assertFalse(aio.errorCalled);
+                assertTrue(aio.doneCalled);
+            }
+            
+            ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+            
+            for (int i=0; i<NUMBER_LINES; i++)
+            {
+                newBuffer.clear();
+                addString ("Str value " + i + "\n", newBuffer);
+                for (int j=newBuffer.position(); j<newBuffer.capacity()-1;j++)
+                {
+                    newBuffer.put((byte)' ');
+                }
+                newBuffer.put((byte)'\n');
+                
+                
+                CountDownLatch latch = new CountDownLatch(1);
+                LocalAIO aio = new LocalAIO(latch);
+                controller.read(i * SIZE, SIZE, buffer, aio);
+                latch.await();
+                assertFalse(aio.errorCalled);
+                assertTrue(aio.doneCalled);
+                
+                byte bytesRead[] = new byte[SIZE];
+                byte bytesCompare[] = new byte[SIZE];
+                
+                newBuffer.rewind();
+                newBuffer.get(bytesCompare);
+                buffer.rewind();
+                buffer.get(bytesRead);
+                
+                for (int count=0;count<SIZE;count++)
+                {
+                    assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
+                }
+                
+                
+                //byte[] byteCompare = new byte[SIZE];
+                //byte[] byteRead = new byte[SIZE];
+
+                assertTrue(buffer.equals(newBuffer));
+            }
+            
+            controller.destroyBuffer(buffer);
+        }
+        finally
+        {
+            try { controller.close(); } catch (Throwable ignored){}
+            
+        }
+            
+    }
+    
+    private void asyncData(int numberOfLines, int size, int aioLimit) throws Exception
+    {
+        final JLibAIO controller = new JLibAIO();
+        controller.open(FILE_NAME, aioLimit);
+        
+        try
+        {
+            System.out.println("++testDirectDataNoPage"); System.out.flush();
+            CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+            
+            ByteBuffer block = controller.newBuffer(size);
+            encodeBufer(block);
+
+            preAlloc(controller, numberOfLines * size);
+
+            ArrayList<LocalAIO> list = new ArrayList<LocalAIO>();
+    
+            for (int i=0; i<numberOfLines; i++)
+            {
+                list.add(new LocalAIO(latchDone));
+            }
+            
+           
+            long valueInitial = System.currentTimeMillis();
+    
+            System.out.println("Adding data");
+            
+            long lastTime = System.currentTimeMillis();
+            int counter = 0;
+            for (LocalAIO tmp: list)
+            {
+                controller.write(counter * size, size, block, tmp);
+                if (++counter % 5000 == 0)
+                {
+                    System.out.println(5000*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Async)");
+                    lastTime = System.currentTimeMillis();
+                }
+                
+            }
+            
+            System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
+            
+            
+            System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
+            System.out.println("Flush now");
+            System.out.println("Received " + LocalAIO.staticDone);
+            long timeTotal = System.currentTimeMillis() - valueInitial;
+
+            System.out.println("Asynchronous time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size  + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
+
+            latchDone.await();
+    
+            timeTotal = System.currentTimeMillis() - valueInitial;
+            System.out.println("After completions time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size  + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
+    
+            for (LocalAIO tmp: list)
+            {
+                assertEquals(1, tmp.timesDoneCalled);
+                assertTrue(tmp.doneCalled);
+                assertFalse(tmp.errorCalled);
+            }
+            
+            controller.destroyBuffer(block);
+            
+            controller.close();
+        }
+        finally
+        {
+            try {controller.close();} catch (Exception ignored){}
+        }
+        
+        
+    }
+    
+    public void testDirectSynchronous() throws Exception
+    {
+        try
+        {
+            System.out.println("++testDirectDataNoPage"); System.out.flush();
+            final int NUMBER_LINES = 100000; 
+            final int SIZE = 1024;
+            //final int SIZE = 512;
+            
+            final JLibAIO controller = new JLibAIO();
+            controller.open(FILE_NAME, 2000);
+
+            ByteBuffer block = controller.newBuffer(SIZE);
+            encodeBufer(block);
+            
+            preAlloc(controller, NUMBER_LINES * SIZE);
+
+            long valueInitial = System.currentTimeMillis();
+    
+            System.out.println("Adding data");
+            
+            long lastTime = System.currentTimeMillis();
+            int counter = 0;
+            
+            for (int i=0; i<NUMBER_LINES; i++)
+            {
+                CountDownLatch latchDone = new CountDownLatch(1);
+                LocalAIO aioBlock = new LocalAIO(latchDone);
+                controller.write(i*512, 512, block, aioBlock);
+                latchDone.await();
+                assertTrue(aioBlock.doneCalled);
+                assertFalse(aioBlock.errorCalled);
+                if (++counter % 500 == 0)
+                {
+                    System.out.println(500*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Synchronous)");
+                    lastTime = System.currentTimeMillis();
+                }
+            }
+
+            System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
+            
+            
+            System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
+            System.out.println("Flush now");
+            System.out.println("Received " + LocalAIO.staticDone);
+    
+            long timeTotal = System.currentTimeMillis() - valueInitial;
+            System.out.println("Flushed " + timeTotal);
+            System.out.println("time = " + timeTotal + " for " + NUMBER_LINES + " registers " + " size each line = " + SIZE  + " Records/Sec=" + (NUMBER_LINES*1000/timeTotal) + " Synchronous");
+    
+            controller.destroyBuffer(block);
+            controller.close();
+        }
+        catch (Exception e)
+        {
+            System.out.println("Received " + LocalAIO.staticDone + " before it failed");
+            throw e;
+        }
+        
+    }
+    
+    private void preAlloc(JLibAIO controller, long size)
+    {
+        System.out.println("Pre allocating");  System.out.flush();
+        long startPreAllocate = System.currentTimeMillis();
+        controller.preAllocate(1, size);
+        long endPreAllocate = System.currentTimeMillis() - startPreAllocate;
+        if (endPreAllocate != 0) System.out.println("PreAllocated the file in " + endPreAllocate + " seconds, What means " + (size/endPreAllocate) + " bytes per millisecond");
+    }
+    
+    
+    public void testInvalidWrite() throws Exception
+    {
+        final JLibAIO controller = new JLibAIO();
+        controller.open(FILE_NAME, 2000);
+
+        try
+        {
+            
+            final int SIZE=512;
+
+            ByteBuffer block = controller.newBuffer(SIZE);
+            encodeBufer(block);
+            
+            preAlloc(controller, 1000 * 512);
+            
+            
+            CountDownLatch latchDone = new CountDownLatch(1);
+            
+            LocalAIO aioBlock = new LocalAIO(latchDone);
+            controller.write(11, 512, block, aioBlock);
+            
+            latchDone.await();
+            
+            assertTrue (aioBlock.errorCalled);
+            assertFalse(aioBlock.doneCalled);
+    
+            controller.destroyBuffer(block);
+        }
+        catch (Exception e)
+        {
+            System.out.println("Received " + LocalAIO.staticDone + " before it failed");
+            throw e;
+        }
+        finally
+        {
+            controller.close();
+        }
+        
+    }
+    
+    public void testInvalidAlloc() throws Exception
+    {
+        JLibAIO controller = new JLibAIO();
+        try
+        {
+            // You don't need to open the file to alloc it
+            ByteBuffer buffer = controller.newBuffer(300);
+            fail ("Exception expected");
+        }
+        catch (Exception ignored)
+        {
+        }
+        
+    }
+    
+    private static class LocalAIO implements AIOCallback
+    {
+
+        CountDownLatch latch;
+        
+        public LocalAIO(CountDownLatch latch)
+        {
+            this.latch = latch;
+        }
+        
+        boolean doneCalled = false;
+        boolean errorCalled = false;
+        int timesDoneCalled = 0;
+        static int staticDone = 0;
+        public void decode(int length, ByteBuffer buffer)
+        {
+            // TODO Auto-generated method stub
+            
+        }
+
+        public void done()
+        {
+            //System.out.println("Received Done"); System.out.flush();
+            doneCalled = true;
+            timesDoneCalled++;
+            staticDone++;
+            if (latch != null) 
+            {
+                latch.countDown();
+            }
+            
+        }
+
+        public void onError(int errorCode, String errorMessage)
+        {
+            errorCalled = true;
+            if (latch != null)
+            {
+                // even thought an error happened, we need to inform the latch, or the test won't finish
+                latch.countDown();
+            }
+            System.out.println("Received an Error - " + errorCode + " message=" + errorMessage);
+            
+        }
+
+    }
+
+    private void addString(String str, ByteBuffer buffer)
+    {
+        CharBuffer charBuffer = CharBuffer.wrap(str);
+        UTF_8_ENCODER.encode(charBuffer, buffer, true);
+        
+    }
+    
+
+}

Deleted: projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/TestController.java
===================================================================
--- projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/TestController.java	2008-02-23 01:12:38 UTC (rev 3774)
+++ projects/jaio/trunk/jaio/java/tests/org/jboss/jaio/libaioimpl/test/TestController.java	2008-02-23 01:14:17 UTC (rev 3775)
@@ -1,363 +0,0 @@
-package org.jboss.jaio.libaioimpl.test;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-
-import org.jboss.jaio.api.AIOPackage;
-import org.jboss.jaio.libaioimpl.LibAIOController;
-
-import junit.framework.TestCase;
-
-public class TestController extends TestCase
-{
-    
-    byte commonBuffer[] = null; 
-    
-    String FILE_NAME="/tmp/libaio.log";
-    
-    
-    @Override
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-        LocalAIO.staticDone = 0;
-        File file = new File(FILE_NAME);
-        file.delete();
-    }
-
-    // If an AIO encode blocks is blocked, we should be able to finish execution (validating the thread model)
-    public void testBlockOnSize() throws Exception
-    {
-        System.out.println("++testDirectDataNoPage"); System.out.flush();
-        final LibAIOController controller = new LibAIOController();
-        controller.open(FILE_NAME, 100);
-        controller.preAllocate(1, 100 * 512);
-
-        final CountDownLatch releaseSize = new CountDownLatch(1);
-        final CountDownLatch releaseEncode = new CountDownLatch(1);
-        final CountDownLatch latchDone = new CountDownLatch(2);
-        final AIOBlocker block1 = new AIOBlocker(true, false, releaseSize, latchDone);
-        final AIOBlocker block2 = new AIOBlocker(false, true, releaseEncode, latchDone);
-        
-        Thread blocker = new Thread()
-        {
-          public void run()
-          {
-              controller.append(block1);
-              controller.append(block2);
-          }
-        };
-        
-        
-        blocker.start();
-
-        Thread.sleep(1000); // some time to the other thread wake up (a lot of time actually)
-        
-        ArrayList<LocalAIO> aios = new ArrayList<LocalAIO>();
-        
-        
-        CountDownLatch latchDoneOnRegularblocks = new CountDownLatch(10);
-        
-        for (int i=0;i<10;i++) aios.add(new LocalAIO(512, latchDoneOnRegularblocks));
-        
-        for (LocalAIO tmp: aios)
-        {
-            controller.append(tmp);
-        }
-        
-        latchDoneOnRegularblocks.await();
-        
-        for (LocalAIO tmp: aios)
-        {
-            assertTrue(tmp.doneCalled);
-        }
-        
-        releaseSize.countDown();
-        releaseEncode.countDown();
-        latchDone.await();
-        
-        assertTrue (block1.doneCalled);
-        assertTrue (block2.doneCalled);
-
-        controller.close();
-        
-    }
-    
-    public void testAddAsyncData() throws Exception
-    {
-        try
-        {
-            System.out.println("++testDirectDataNoPage"); System.out.flush();
-            final int NUMBER_LINES = 500000; 
-            final int SIZE = 1024;
-            //final int SIZE = 512;
-            
-            CountDownLatch latchDone = new CountDownLatch(NUMBER_LINES);
-            final LibAIOController controller = new LibAIOController();
-            controller.open(FILE_NAME, 20000);
-            System.out.println("Pre allocating file"); System.out.flush();
-            
-            long startPreAllocate = System.currentTimeMillis();
-            controller.preAllocate(1, NUMBER_LINES * SIZE);
-            long endPreAllocate = System.currentTimeMillis() - startPreAllocate;
-            if (endPreAllocate != 0) System.out.println("PreAllocated the file in " + endPreAllocate + " What means " + ((100 * 1024*1024)/endPreAllocate) + " bytes per millisecond");
-            System.out.println("Pre allocating file done");  System.out.flush();
-            ArrayList<LocalAIO> list = new ArrayList<LocalAIO>();
-    
-            for (int i=0; i<NUMBER_LINES; i++)
-            {
-                list.add(new LocalAIO(SIZE, latchDone));
-            }
-            
-           
-            long valueInitial = System.currentTimeMillis();
-    
-            System.out.println("Adding data");
-            
-            long lastTime = System.currentTimeMillis();
-            int counter = 0;
-            for (LocalAIO tmp: list)
-            {
-                if (SIZE > 1024*1024) System.out.println("adding"); System.out.flush();
-                controller.append(tmp);
-                if (++counter % 500 == 0)
-                {
-                    System.out.println(500*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Async)");
-                    lastTime = System.currentTimeMillis();
-                }
-                
-            }
-            
-            System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
-            
-            
-            System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
-            System.out.println("Flush now");
-            System.out.println("Received " + LocalAIO.staticDone);
-            latchDone.await();
-    
-            long timeTotal = System.currentTimeMillis() - valueInitial;
-            System.out.println("Flushed " + timeTotal);
-            System.out.println("time = " + timeTotal + " for " + NUMBER_LINES + " registers " + " size each line = " + SIZE  + " Records/Sec=" + (NUMBER_LINES*1000/timeTotal) + " (Assynchronous)");
-    
-            for (LocalAIO tmp: list)
-            {
-                assertEquals(1, tmp.timesDoneCalled);
-                assertTrue(tmp.encodeCalled);
-                assertTrue(tmp.encodeSizeCalled);
-                assertTrue(tmp.doneCalled);
-            }
-            
-            
-            controller.close();
-        }
-        catch (Exception e)
-        {
-            System.out.println("Received " + LocalAIO.staticDone + " before it failed");
-        }
-        
-    }
-    
-    public void testDirectSynchronous() throws Exception
-    {
-        try
-        {
-            System.out.println("++testDirectDataNoPage"); System.out.flush();
-            final int NUMBER_LINES = 100000; 
-            final int SIZE = 1024;
-            //final int SIZE = 512;
-            
-            final LibAIOController controller = new LibAIOController();
-            controller.open(FILE_NAME, 2000);
-            
-            long startPreAllocate = System.currentTimeMillis();
-            controller.preAllocate(1, NUMBER_LINES * SIZE);
-            long endPreAllocate = System.currentTimeMillis() - startPreAllocate;
-            if (endPreAllocate != 0) System.out.println("PreAllocated the file in " + endPreAllocate + " seconds, What means " + ((100 * 1024*1024)/endPreAllocate) + " bytes per millisecond");
-
-            long valueInitial = System.currentTimeMillis();
-    
-            System.out.println("Adding data");
-            
-            long lastTime = System.currentTimeMillis();
-            int counter = 0;
-            
-            for (int i=0; i<NUMBER_LINES; i++)
-            {
-                CountDownLatch latchDone = new CountDownLatch(1);
-                LocalAIO aioBlock = new LocalAIO(SIZE, latchDone);
-                controller.append(aioBlock);
-                latchDone.await();
-                assertTrue(aioBlock.doneCalled);
-                assertTrue(aioBlock.encodeCalled);
-                if (++counter % 500 == 0)
-                {
-                    System.out.println(500*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Synchronous)");
-                    lastTime = System.currentTimeMillis();
-                }
-            }
-
-            System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
-            
-            
-            System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
-            System.out.println("Flush now");
-            System.out.println("Received " + LocalAIO.staticDone);
-    
-            long timeTotal = System.currentTimeMillis() - valueInitial;
-            System.out.println("Flushed " + timeTotal);
-            System.out.println("time = " + timeTotal + " for " + NUMBER_LINES + " registers " + " size each line = " + SIZE  + " Records/Sec=" + (NUMBER_LINES*1000/timeTotal) + " Synchronous");
-    
-            controller.close();
-        }
-        catch (Exception e)
-        {
-            System.out.println("Received " + LocalAIO.staticDone + " before it failed");
-            throw e;
-        }
-        
-    }
-    
-    private static class AIOBlocker implements AIOPackage
-    {
-        
-        boolean doneCalled = false;
-        
-        private boolean blockSize;
-        private boolean blockEncode;
-        private CountDownLatch latch;
-        private CountDownLatch latchDone;
-        
-        public AIOBlocker(boolean blockSize, boolean blockEncode, CountDownLatch latch, CountDownLatch latchDone)
-        {
-            this.blockSize = blockSize;
-            this.blockEncode = blockEncode;
-            this.latch = latch;
-            this.latchDone = latchDone;
-        }
-
-        public void decode(int length, ByteBuffer buffer)
-        {
-        }
-
-        public void done()
-        {
-            latchDone.countDown();
-            doneCalled = true;
-        }
-
-        public void encode(ByteBuffer buffer)
-        {
-            if (blockEncode)
-            {
-                try
-                {
-                    latch.await();
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
-            }
-            
-            for (int i=0;i<511;i++)
-            {
-                buffer.put((byte)'b');
-            }
-            buffer.put((byte)'\n');
-            
-            
-        }
-
-        public int encodeSize()
-        {
-            if (blockSize)
-            {
-                try
-                {
-                    latch.await();
-                } catch (InterruptedException e)
-                {
-                    e.printStackTrace();
-                }
-            }
-            return 512;
-        }
-
-        public void onError(int errorCode, String errorMessage)
-        {
-            
-        }
-        
-    }
-    
-    private static class LocalAIO implements AIOPackage
-    {
-
-        CountDownLatch latch;
-        int size = 1;
-        public LocalAIO()
-        {
-            this(10);
-        }
-        
-        public LocalAIO(int size)
-        {
-            this.size = size;
-        }
-        
-        public LocalAIO(int size, CountDownLatch latch)
-        {
-            this(size);
-            this.latch = latch;
-        }
-        
-        boolean encodeCalled = false;
-        boolean encodeSizeCalled = false;
-        boolean doneCalled = false;
-        int timesDoneCalled = 0;
-        static int staticDone = 0;
-        public void decode(int length, ByteBuffer buffer)
-        {
-            // TODO Auto-generated method stub
-            
-        }
-
-        public void done()
-        {
-            //System.out.println("Received Done"); System.out.flush();
-            doneCalled = true;
-            timesDoneCalled++;
-            staticDone++;
-            if (latch != null) 
-            {
-                latch.countDown();
-            }
-            
-        }
-
-        public void encode(ByteBuffer buffer)
-        {
-            encodeCalled = true;
-            for (int i=0; i<size-1; i++)
-            {
-                buffer.put((byte)('A' + (i%10)));
-            }
-            buffer.put((byte)'\n');
-        }
-
-        public int encodeSize()
-        {
-            encodeSizeCalled = true;
-            return size;
-        }
-
-        public void onError(int errorCode, String errorMessage)
-        {
-            System.out.println("Received an Error - " + errorCode + " message=" + errorMessage);
-            
-        }
-        
-    }
-}




More information about the jboss-cvs-commits mailing list