[jboss-cvs] JBoss Messaging SVN: r4128 - in branches/trunk_tmp_aio: src/main/org/jboss/messaging/core/asyncio/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Apr 28 15:03:12 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-04-28 15:03:12 -0400 (Mon, 28 Apr 2008)
New Revision: 4128

Modified:
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java
   branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java
Log:
Fixing concurrent broken usage

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java	2008-04-28 19:03:12 UTC (rev 4128)
@@ -18,7 +18,7 @@
 public interface AsynchronousFile
 {
    
-   void close();
+   void close() throws Exception;
    
    /**
     * 

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-04-28 19:03:12 UTC (rev 4128)
@@ -8,6 +8,7 @@
 package org.jboss.messaging.core.asyncio.impl;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -29,14 +30,14 @@
     private String fileName;
     private Thread poller;
     private static boolean loaded = true;
+    private int maxIO;
     
+    Semaphore writeSemaphore;
+    
     ReadWriteLock lock = new ReentrantReadWriteLock();
     Lock writeLock = lock.writeLock();
-    Lock readLock = lock.readLock();
     
-    ReadWriteLock lockPoller = new ReentrantReadWriteLock();
-    Lock readPollerLock = lockPoller.readLock();
-    Lock writePollerLock = lockPoller.writeLock();
+    Semaphore pollerSemaphore = new Semaphore(1);
     
     /**
      *  Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -70,6 +71,10 @@
        try
        {
           writeLock.lock();
+          this.maxIO = maxIO;
+          
+          this.writeSemaphore = new Semaphore(maxIO);
+          
           if (opened)
           {
              throw new IllegalStateException("AsynchronousFile is already opened");
@@ -96,26 +101,26 @@
            // informing caller that this thread already has the lock
            try
            {
-            pollEvents();
+        	   pollEvents();
            }
            finally
            {
-              readPollerLock.unlock();
+              pollerSemaphore.release();
            }
         }
     }
     
-    public void close()
+    public void close() throws Exception
     {
        checkOpened();
        
        writeLock.lock();
+       writeSemaphore.acquire(maxIO);
        stopPoller(handler);
-       
        // We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
-       writePollerLock.lock();
        try
        {
+        pollerSemaphore.acquire();
         closeInternal(handler);
         opened = false;
         handler = 0;
@@ -123,7 +128,7 @@
        finally
        {
           writeLock.unlock();
-          writePollerLock.unlock();
+          pollerSemaphore.release();
        }
     }
     
@@ -131,14 +136,14 @@
     public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
     {
         checkOpened();
-        readLock.lock();
+        this.writeSemaphore.acquireUninterruptibly();
         try
         {
            write (handler, position, size, directByteBuffer, aioPackage);
         }
         catch (RuntimeException e)
         {
-           readLock.unlock();
+           writeSemaphore.release();
            throw e;
         }
         
@@ -147,14 +152,14 @@
     public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
     {
         checkOpened();
-        readLock.lock();
+        this.writeSemaphore.acquireUninterruptibly();
         try
         {
            read (handler, position, size, directByteBuffer, aioPackage);
         }
         catch (RuntimeException e)
         {
-           readLock.unlock();
+           writeSemaphore.release();
            throw e;
         }
         
@@ -183,14 +188,14 @@
     @SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
    private void callbackDone(AIOCallback callback)
     {
-       readLock.unlock();
+       writeSemaphore.release();
        callback.done();
     }
     
     @SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
    private void callbackError(AIOCallback callback, int errorCode, String errorMessage)
     {
-       readLock.unlock();
+       writeSemaphore.release();
        callback.onError(errorCode, errorMessage);
     }
     
@@ -206,12 +211,12 @@
     private synchronized void  startPoller()
     {
         checkOpened();
-        readPollerLock.lock();
 
         poller = new PollerThread(); 
         try
         {
-           poller.start();
+          this.pollerSemaphore.acquire();
+          poller.start();
         }
         catch (Exception ex)
         {

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-04-28 19:03:12 UTC (rev 4128)
@@ -111,7 +111,7 @@
    public void open() throws Exception
    {
       aioFile = new AsynchronousFileImpl();
-      aioFile.open(journalDir + "/" + fileName, 500);
+      aioFile.open(journalDir + "/" + fileName, 1000);
       position.set(0);
       
    }

Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-04-28 19:03:12 UTC (rev 4128)
@@ -963,7 +963,9 @@
 		
 		for (int i = 0; i < filesToCreate; i++)
 		{
-			freeFiles.add(createFile());
+			// Keeping all files opened can be very costly (mainly on AIO)
+			JournalFile createdFile = createFile();
+			freeFiles.add(createdFile);
 		}
 												
 		//The current file is the last one
@@ -991,6 +993,7 @@
 		else
 		{
 			currentFile = freeFiles.remove();
+			currentFile.getFile().open();
 		}				
 								
 		for (RecordInfo record: records)
@@ -1263,6 +1266,8 @@
 		
 		info.extendOffset(bytesWritten);
 		
+		info.getFile().close();
+		
 		return info;
 	}
 	
@@ -1311,10 +1316,12 @@
 			if (!freeFiles.isEmpty())
 			{
 				currentFile = freeFiles.remove();
+				currentFile.getFile().open();
 			}
 			else
 			{
 				currentFile = createFile();
+				currentFile.getFile().open();
 			}
 		}		
 	}

Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java	2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java	2008-04-28 19:03:12 UTC (rev 4128)
@@ -166,7 +166,7 @@
 
    public void testAddAsyncData() throws Exception
    {
-       asyncData(150000,1024,20000);
+       asyncData(500000,1024,30000);
    }
    
    public void testValidateData() throws Exception
@@ -599,7 +599,7 @@
            for (LocalAIO tmp: list)
            {
                controller.write(counter * size, size, block, tmp);
-               if (++counter % 5000 == 0)
+               if (++counter % 20000 == 0)
                {
                    System.out.println(5000*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Async)");
                    lastTime = System.currentTimeMillis();

Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java	2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java	2008-04-28 19:03:12 UTC (rev 4128)
@@ -65,15 +65,20 @@
 	public void testSpeedNonTransactional() throws Exception
 	{
 	   
+		final int numMessages = 3000000;
+		
+		long numFiles =  (int)(((numMessages * 1024 + 512) / (10 * 1024 * 1024)) * 1.1);
+		
+		log.info("num Files=" + numFiles);
+
 		Journal journal =
-			new JournalImpl(10 * 1024 * 1024, 10, true, new AIOSequentialFileFactory(journalDir),
+			new JournalImpl(10 * 1024 * 1024,  100, true, new AIOSequentialFileFactory(journalDir),
 					5000, "jbm-data", "jbm");
 		
 		journal.start();
 		
 		journal.load(new ArrayList<RecordInfo>(), null);
 		
-		final int numMessages = 50000;
 
 		final CountDownLatch latch = new CountDownLatch(numMessages);
 		
@@ -121,12 +126,9 @@
 		
 		long start = System.currentTimeMillis();
 		
-		ArrayList<LocalCallback> callbacks = new ArrayList<LocalCallback>();
-		
+		LocalCallback callback = new LocalCallback(1, latch);
 		for (int i = 0; i < numMessages; i++)
 		{
-		   LocalCallback callback = new LocalCallback(i, latch);
-		   callbacks.add(callback);
 			journal.appendAddRecord(i, data, callback);
 		}
 		
@@ -138,21 +140,6 @@
 		
 		boolean failed = false;
 		
-		for (LocalCallback callback: callbacks)
-		{
-	      if (callback.message != null)
-	      {
-	         fail(callback.message);
-	      }
-	      
-	      if (!callback.done)
-	      {
-	         System.out.println("callback i=" + callback.i + " was not received!");
-	         failed = true;
-	      }
-		}
-		
-		
 		// If this fails it is probably because JournalImpl it is closing the files without waiting all the completes to arrive first
 		assertFalse(failed);
 		




More information about the jboss-cvs-commits mailing list