[hornetq-commits] JBoss hornetq SVN: r8350 - in branches/ClebertTemporary: src/main/org/hornetq/core/asyncio/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 20 13:16:16 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-20 13:16:16 -0500 (Fri, 20 Nov 2009)
New Revision: 8350

Modified:
   branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp
   branches/ClebertTemporary/native/src/JNICallbackAdapter.h
   branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp
   branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
   branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
Log:
AIO order

Modified: branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp
===================================================================
--- branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp	2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/native/src/JNICallbackAdapter.cpp	2009-11-20 18:16:16 UTC (rev 8350)
@@ -18,7 +18,7 @@
 
 jobject nullObj = NULL;
 
-JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jint _sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
+JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jlong _sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
 {
 	controller = _controller;
 

Modified: branches/ClebertTemporary/native/src/JNICallbackAdapter.h
===================================================================
--- branches/ClebertTemporary/native/src/JNICallbackAdapter.h	2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/native/src/JNICallbackAdapter.h	2009-11-20 18:16:16 UTC (rev 8350)
@@ -33,7 +33,7 @@
 	
 	jobject bufferReference;
 	
-	jint sequence;
+	jlong sequence;
 	
 	// Is this a read operation
 	short isRead;
@@ -50,7 +50,7 @@
 	
 public:
 	// _ob must be a global Reference (use createGloblReferente before calling the constructor)
-	JNICallbackAdapter(AIOController * _controller, jint sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead);
+	JNICallbackAdapter(AIOController * _controller, jlong sequence, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead);
 	virtual ~JNICallbackAdapter();
 
 	void done(THREAD_CONTEXT threadContext);

Modified: branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp	2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/native/src/JNI_AsynchronousFileImpl.cpp	2009-11-20 18:16:16 UTC (rev 8350)
@@ -48,10 +48,10 @@
 		std::string fileName = convertJavaString(env, jstrFileName);
 
 		controller = new AIOController(fileName, (int) maxIO);
-		controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;ILjava/nio/ByteBuffer;)V");
+		controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;)V");
 		if (!controller->done) return 0;
 
-		controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/hornetq/core/asyncio/AIOCallback;ILjava/nio/ByteBuffer;ILjava/lang/String;)V");
+		controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/hornetq/core/asyncio/AIOCallback;JLjava/nio/ByteBuffer;ILjava/lang/String;)V");
         if (!controller->error) return 0;
 
         jclass loggerClass = env->GetObjectClass(logger);

Modified: branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java	2009-11-20 18:16:16 UTC (rev 8350)
@@ -18,6 +18,7 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -49,11 +50,11 @@
    private static int EXPECTED_NATIVE_VERSION = 26;
    
    /** Used to determine the next writing sequence */
-   private AtomicInteger nextWritingSequence = new AtomicInteger(0);
+   private AtomicLong nextWritingSequence = new AtomicLong(0);
 
    /** Used to determine the next writing sequence.
     *  This is accessed from a single thread (the Poller Thread) */
-   private int readSequence = 0;
+   private long readSequence = 0;
 
    public static void addMax(final int io)
    {
@@ -283,7 +284,7 @@
             {
                writeSemaphore.acquireUninterruptibly();
                
-               int sequence = nextWritingSequence.getAndIncrement();
+               long sequence = nextWritingSequence.getAndIncrement();
 
                try
                {
@@ -304,7 +305,7 @@
       {
          writeSemaphore.acquireUninterruptibly();
 
-         int sequence = nextWritingSequence.getAndIncrement();
+         long sequence = nextWritingSequence.getAndIncrement();
 
          try
          {
@@ -427,7 +428,7 @@
    @SuppressWarnings("unused")
    // Called by the JNI layer.. just ignore the
    // warning
-   private void callbackDone(final AIOCallback callback, final int sequence, final ByteBuffer buffer)
+   private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
    {
       writeSemaphore.release();
       pendingWrites.down();
@@ -442,7 +443,7 @@
 
    // Called by the JNI layer.. just ignore the
    // warning
-   private void callbackError(final AIOCallback callback, final int sequence, final ByteBuffer buffer, final int errorCode, final String errorMessage)
+   private void callbackError(final AIOCallback callback, final long sequence, final ByteBuffer buffer, final int errorCode, final String errorMessage)
    {
       log.warn("CallbackError: " + errorMessage);
       writeSemaphore.release();
@@ -529,7 +530,7 @@
 
    private native long size0(long handle) throws HornetQException;
 
-   private native void write(long handle, int sequence, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
+   private native void write(long handle, long sequence, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
 
    private native void read(long handle, long position, long size, ByteBuffer buffer, AIOCallback aioPackage) throws HornetQException;
 

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java	2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AIOTestBase.java	2009-11-20 18:16:16 UTC (rev 8350)
@@ -15,6 +15,8 @@
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -84,20 +86,36 @@
    protected static class CountDownCallback implements AIOCallback
    {
       private final CountDownLatch latch;
+      
+      private final List<Integer> outputList;
+      
+      private final int order;
+      
+      private final AtomicInteger errors;
 
-      public CountDownCallback(final CountDownLatch latch)
+      public CountDownCallback(final CountDownLatch latch, final AtomicInteger errors, final List<Integer> outputList, final int order)
       {
          this.latch = latch;
+         
+         this.outputList = outputList;
+         
+         this.order = order;
+         
+         this.errors = errors;
       }
 
       volatile boolean doneCalled = false;
 
-      volatile boolean errorCalled = false;
+      volatile int errorCalled = 0;
 
       final AtomicInteger timesDoneCalled = new AtomicInteger(0);
 
       public void done()
       {
+         if (outputList != null)
+         {
+            outputList.add(order);
+         }
          doneCalled = true;
          timesDoneCalled.incrementAndGet();
          if (latch != null)
@@ -108,7 +126,15 @@
 
       public void onError(final int errorCode, final String errorMessage)
       {
-         errorCalled = true;
+         errorCalled++;
+         if (outputList != null)
+         {
+            outputList.add(order);
+         }
+         if (errors != null)
+         {
+            errors.incrementAndGet();
+         }
          if (latch != null)
          {
             // even thought an error happened, we need to inform the latch,
@@ -116,6 +142,16 @@
             latch.countDown();
          }
       }
+      
+      public static void checkResults(final int numberOfElements, final ArrayList<Integer> result)
+      {
+         assertEquals(numberOfElements, result.size());
+         int i = 0;
+         for (Integer resultI : result)
+         {
+            assertEquals(i++, resultI.intValue());
+         }
+      }
    }
 
 }

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java	2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/AsynchronousFileTest.java	2009-11-20 18:16:16 UTC (rev 8350)
@@ -136,6 +136,11 @@
 
       int numberOfLines = 1000;
       int size = 1024;
+      
+      ArrayList<Integer> listResult1 = new ArrayList<Integer>();
+      ArrayList<Integer> listResult2 = new ArrayList<Integer>();
+      
+      AtomicInteger errors = new AtomicInteger(0);
 
       ByteBuffer buffer = null;
       try
@@ -154,41 +159,43 @@
 
          for (int i = 0; i < numberOfLines; i++)
          {
-            list.add(new CountDownCallback(latchDone));
-            list2.add(new CountDownCallback(latchDone2));
+            list.add(new CountDownCallback(latchDone, errors, listResult1, i));
+            list2.add(new CountDownCallback(latchDone2, errors, listResult2, i));
          }
 
-         long valueInitial = System.currentTimeMillis();
-
          int counter = 0;
+         
          Iterator<CountDownCallback> iter2 = list2.iterator();
 
-         for (CountDownCallback tmp : list)
+         for (CountDownCallback cb1 : list)
          {
-            CountDownCallback tmp2 = iter2.next();
+            CountDownCallback cb2 = iter2.next();
 
-            controller.write(counter * size, size, buffer, tmp);
-            controller.write(counter * size, size, buffer, tmp2);
+            controller.write(counter * size, size, buffer, cb1);
+            controller2.write(counter * size, size, buffer, cb2);
             ++counter;
 
          }
 
          latchDone.await();
          latchDone2.await();
+         
+         CountDownCallback.checkResults(numberOfLines, listResult1);
+         CountDownCallback.checkResults(numberOfLines, listResult2);
 
          for (CountDownCallback callback : list)
          {
             assertEquals(1, callback.timesDoneCalled.get());
             assertTrue(callback.doneCalled);
-            assertFalse(callback.errorCalled);
          }
 
          for (CountDownCallback callback : list2)
          {
             assertEquals(1, callback.timesDoneCalled.get());
             assertTrue(callback.doneCalled);
-            assertFalse(callback.errorCalled);
          }
+         
+         assertEquals(0, errors.get());
 
          controller.close();
       }
@@ -358,7 +365,7 @@
          controller.setBufferCallback(bufferCallback);
 
          CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
-         CountDownCallback aio = new CountDownCallback(latch);
+         ArrayList<Integer> result = new ArrayList<Integer>();
          for (int i = 0; i < NUMBER_LINES; i++)
          {
             ByteBuffer buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -367,6 +374,7 @@
             {
                buffer.put((byte)(j % Byte.MAX_VALUE));
             }
+            CountDownCallback aio = new CountDownCallback(latch, null, result, i);
             controller.write(i * SIZE, SIZE, buffer, aio);
          }
 
@@ -379,7 +387,7 @@
          controller.close();
          closed = true;
 
-         assertEquals(NUMBER_LINES, buffers.size());
+         CountDownCallback.checkResults(NUMBER_LINES, result);
 
          // Make sure all the buffers are unique
          ByteBuffer lineOne = null;
@@ -439,7 +447,6 @@
          controller.setBufferCallback(bufferCallback);
 
          CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
-         CountDownCallback aio = new CountDownCallback(latch);
 
          buffer = AsynchronousFileImpl.newBuffer(SIZE);
          buffer.rewind();
@@ -448,8 +455,11 @@
             buffer.put((byte)(j % Byte.MAX_VALUE));
          }
 
+         ArrayList<Integer> result = new ArrayList<Integer>();
+         
          for (int i = 0; i < NUMBER_LINES; i++)
          {
+            CountDownCallback aio = new CountDownCallback(latch, null, result, i);
             controller.write(i * SIZE, SIZE, buffer, aio);
          }
 
@@ -462,6 +472,8 @@
          controller.close();
          closed = true;
 
+         CountDownCallback.checkResults(NUMBER_LINES, result);
+
          assertEquals(NUMBER_LINES, buffers.size());
 
          // Make sure all the buffers are unique
@@ -517,7 +529,9 @@
 
          {
             CountDownLatch latch = new CountDownLatch(NUMBER_LINES);
-            CountDownCallback aio = new CountDownCallback(latch);
+            ArrayList<Integer> result = new ArrayList<Integer>();
+            
+            AtomicInteger errors = new AtomicInteger(0);
 
             for (int i = 0; i < NUMBER_LINES; i++)
             {
@@ -531,12 +545,15 @@
                   buffer.put(getSamplebyte(j));
                }
 
+               CountDownCallback aio = new CountDownCallback(latch, errors, result, i);
                controller.write(i * SIZE, SIZE, buffer, aio);
             }
 
             latch.await();
-            assertFalse(aio.errorCalled);
-            assertEquals(NUMBER_LINES, aio.timesDoneCalled.get());
+
+            assertEquals(0, errors.get());
+            
+            CountDownCallback.checkResults(NUMBER_LINES, result);
          }
 
          // If you call close you're supposed to wait events to finish before
@@ -557,12 +574,13 @@
             AsynchronousFileImpl.clearBuffer(readBuffer);
 
             CountDownLatch latch = new CountDownLatch(1);
-            CountDownCallback aio = new CountDownCallback(latch);
+            AtomicInteger errors = new AtomicInteger(0);
+            CountDownCallback aio = new CountDownCallback(latch, errors, null, 0);
 
             controller.read(i * SIZE, SIZE, readBuffer, aio);
 
             latch.await();
-            assertFalse(aio.errorCalled);
+            assertEquals(0, errors.get());
             assertTrue(aio.doneCalled);
 
             byte bytesRead[] = new byte[SIZE];
@@ -634,7 +652,7 @@
             }
             buffer.put((byte)'\n');
 
-            CountDownCallback aio = new CountDownCallback(readLatch);
+            CountDownCallback aio = new CountDownCallback(readLatch, null, null, 0);
             controller.write(i * SIZE, SIZE, buffer, aio);
          }
 
@@ -663,10 +681,10 @@
             newBuffer.put((byte)'\n');
 
             CountDownLatch latch = new CountDownLatch(1);
-            CountDownCallback aio = new CountDownCallback(latch);
+            CountDownCallback aio = new CountDownCallback(latch, null, null, 0);
             controller.read(i * SIZE, SIZE, buffer, aio);
             latch.await();
-            assertFalse(aio.errorCalled);
+            assertEquals(0, aio.errorCalled);
             assertTrue(aio.doneCalled);
 
             byte bytesRead[] = new byte[SIZE];
@@ -720,9 +738,11 @@
 
          ArrayList<CountDownCallback> list = new ArrayList<CountDownCallback>();
 
+         ArrayList<Integer> result = new ArrayList<Integer>();
+         
          for (int i = 0; i < numberOfLines; i++)
          {
-            list.add(new CountDownCallback(latchDone));
+            list.add(new CountDownCallback(latchDone, null, result, i));
          }
 
          long valueInitial = System.currentTimeMillis();
@@ -743,6 +763,9 @@
          latchDone.await();
 
          long timeTotal = System.currentTimeMillis() - valueInitial;
+         
+         CountDownCallback.checkResults(numberOfLines, result);
+         
          debug("After completions time = " + timeTotal +
                " for " +
                numberOfLines +
@@ -759,7 +782,7 @@
          {
             assertEquals(1, tmp.timesDoneCalled.get());
             assertTrue(tmp.doneCalled);
-            assertFalse(tmp.errorCalled);
+            assertEquals(0, tmp.errorCalled);
          }
 
          controller.close();
@@ -799,11 +822,11 @@
          for (int i = 0; i < NUMBER_LINES; i++)
          {
             CountDownLatch latchDone = new CountDownLatch(1);
-            CountDownCallback aioBlock = new CountDownCallback(latchDone);
+            CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null, 0);
             controller.write(i * 512, 512, buffer, aioBlock);
             latchDone.await();
             assertTrue(aioBlock.doneCalled);
-            assertFalse(aioBlock.errorCalled);
+            assertEquals(0, aioBlock.errorCalled);
          }
 
          long timeTotal = System.currentTimeMillis() - startTime;
@@ -850,12 +873,12 @@
 
          CountDownLatch latchDone = new CountDownLatch(1);
 
-         CountDownCallback aioBlock = new CountDownCallback(latchDone);
+         CountDownCallback aioBlock = new CountDownCallback(latchDone, null, null, 0);
          controller.write(11, 512, buffer, aioBlock);
 
          latchDone.await();
 
-         assertTrue(aioBlock.errorCalled);
+         assertTrue(aioBlock.errorCalled != 0);
          assertFalse(aioBlock.doneCalled);
 
       }

Modified: branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
===================================================================
--- branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java	2009-11-20 17:41:40 UTC (rev 8349)
+++ branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java	2009-11-20 18:16:16 UTC (rev 8350)
@@ -40,7 +40,7 @@
  *   */
 public class MultiThreadAsynchronousFileTest extends AIOTestBase
 {
-   
+
    public static TestSuite suite()
    {
       return createAIOTestSuite(MultiThreadAsynchronousFileTest.class);
@@ -57,34 +57,32 @@
    static final int NUMBER_OF_LINES = 1000;
 
    ExecutorService executor;
-   
+
    ExecutorService pollerExecutor;
 
-
    private static void debug(final String msg)
    {
       log.debug(msg);
    }
 
-   
-   
    protected void setUp() throws Exception
    {
       super.setUp();
-      pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this), false));
+      pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
+                                                                              false));
       executor = Executors.newSingleThreadExecutor();
    }
-   
+
    protected void tearDown() throws Exception
    {
       executor.shutdown();
       pollerExecutor.shutdown();
       super.tearDown();
    }
-   
+
    public void testMultipleASynchronousWrites() throws Throwable
    {
-         executeTest(false);
+      executeTest(false);
    }
 
    public void testMultipleSynchronousWrites() throws Throwable
@@ -130,15 +128,15 @@
          long endTime = System.currentTimeMillis();
 
          debug((sync ? "Sync result:" : "Async result:") + " Records/Second = " +
-                   NUMBER_OF_THREADS *
-                   NUMBER_OF_LINES *
-                   1000 /
-                   (endTime - startTime) +
-                   " total time = " +
-                   (endTime - startTime) +
-                   " total number of records = " +
-                   NUMBER_OF_THREADS *
-                   NUMBER_OF_LINES);
+               NUMBER_OF_THREADS *
+               NUMBER_OF_LINES *
+               1000 /
+               (endTime - startTime) +
+               " total time = " +
+               (endTime - startTime) +
+               " total number of records = " +
+               NUMBER_OF_THREADS *
+               NUMBER_OF_LINES);
       }
       finally
       {
@@ -178,9 +176,8 @@
       {
          super.run();
 
-
          ByteBuffer buffer = null;
-         
+
          synchronized (MultiThreadAsynchronousFileTest.class)
          {
             buffer = AsynchronousFileImpl.newBuffer(SIZE);
@@ -220,7 +217,7 @@
                {
                   latchFinishThread = new CountDownLatch(1);
                }
-               CountDownCallback callback = new CountDownCallback(latchFinishThread);
+               CountDownCallback callback = new CountDownCallback(latchFinishThread, null, null, 0);
                if (!sync)
                {
                   list.add(callback);
@@ -230,7 +227,7 @@
                {
                   latchFinishThread.await();
                   assertTrue(callback.doneCalled);
-                  assertFalse(callback.errorCalled);
+                  assertFalse(callback.errorCalled != 0);
                }
             }
             if (!sync)
@@ -241,13 +238,13 @@
             for (CountDownCallback callback : list)
             {
                assertTrue(callback.doneCalled);
-               assertFalse(callback.errorCalled);
+               assertFalse(callback.errorCalled != 0);
             }
 
             for (CountDownCallback callback : list)
             {
                assertTrue(callback.doneCalled);
-               assertFalse(callback.errorCalled);
+               assertFalse(callback.errorCalled != 0);
             }
 
          }



More information about the hornetq-commits mailing list