[jboss-cvs] JBoss Messaging SVN: r4257 - in trunk: src/main/org/jboss/messaging/core/asyncio/impl and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue May 20 21:51:25 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-05-20 21:51:25 -0400 (Tue, 20 May 2008)
New Revision: 4257

Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/tests/config/ConfigurationTest-config.xml
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
JBMESSAGING-1283 - Few improvements on AIO:
   i - reverting usage of Semaphores to control MaxAIO
   ii- Adding MaxAIO as a configuration parameter
   iii - Few testcases fixes

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/config/jbm-configuration.xml	2008-05-21 01:51:25 UTC (rev 4257)
@@ -78,6 +78,10 @@
       
       <journal-min-files>10</journal-min-files>
       
+      <!-- Maximum of simultaneous asynchronous writes accepted by the native layer.
+           (parameter ignored on NIO) -->
+      <journal-max-aio>5000</journal-max-aio>
+      
       <journal-task-period>5000</journal-task-period>
       
       <security-enabled>true</security-enabled>

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -8,6 +8,7 @@
 package org.jboss.messaging.core.asyncio.impl;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -101,6 +102,7 @@
 	private VariableLatch writeLatch = new VariableLatch();	
 	private ReadWriteLock lock = new ReentrantReadWriteLock();
 	private Lock writeLock = lock.writeLock();
+   private Semaphore writeSemaphore;   
 	
 	/**
 	 *  Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -117,7 +119,8 @@
 		try
 		{
 			writeLock.lock();
-			this.maxIO = maxIO;
+         this.maxIO = maxIO;
+ 			writeSemaphore = new Semaphore(this.maxIO);
 			
 			if (opened)
 			{
@@ -125,8 +128,8 @@
 			}
 			opened = true;
 			this.fileName=fileName;
-			handler = init (fileName, maxIO, log);
-			addMax(maxIO);
+			handler = init (fileName, this.maxIO, log);
+			addMax(this.maxIO);
 			startPoller();
 		}
 		finally
@@ -139,14 +142,16 @@
 	{
 		checkOpened();
 		
-		writeLock.lock();
-		writeLatch.waitCompletion();
-		stopPoller(handler);
-      // We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
-		poller.join();
 		try
 		{
-			closeInternal(handler);
+	      writeLock.lock();
+	      writeLatch.waitCompletion(120);
+	      writeSemaphore = null;
+	      stopPoller(handler);
+	      // We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
+	      poller.join();
+
+	      closeInternal(handler);
 			addMax(maxIO * -1);
 			opened = false;
 			handler = 0;
@@ -161,12 +166,14 @@
 	{
 		checkOpened();
 		writeLatch.up();
+      writeSemaphore.acquireUninterruptibly();
 		try
 		{
 			write (handler, position, size, directByteBuffer, aioPackage);
 		}
 		catch (RuntimeException e)
 		{
+         writeSemaphore.release();
 	      writeLatch.down();
 			throw e;
 		}
@@ -177,12 +184,14 @@
 	{
 		checkOpened();
 		writeLatch.up();
+      writeSemaphore.acquireUninterruptibly();
 		try
 		{
 			read (handler, position, size, directByteBuffer, aioPackage);
 		}
 		catch (RuntimeException e)
 		{
+         writeSemaphore.release();
 		   writeLatch.down();
 			throw e;
 		}		
@@ -213,6 +222,7 @@
 	@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
 	private void callbackDone(AIOCallback callback)
 	{
+      writeSemaphore.release();
 		writeLatch.down();
 		callback.done();
 	}
@@ -220,6 +230,7 @@
 	@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
 	private void callbackError(AIOCallback callback, int errorCode, String errorMessage)
 	{
+      writeSemaphore.release();
       writeLatch.down();
 		callback.onError(errorCode, errorMessage);
 	}

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -106,6 +106,10 @@
    int getJournalFileSize();
 
    int getJournalMinFiles();
+   
+   int getJournalMaxAIO();
+   
+   void setJournalMaxAIO(int max);
 
    long getJournalTaskPeriod();
 

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -51,6 +51,7 @@
    public static final int DEFAULT_REQRES_TIMEOUT = 5; // in seconds
    public static final boolean DEFAULT_INVM_DISABLED = false;
    public static final boolean DEFAULT_SSL_ENABLED = false;
+   public static final int DEFAULT_MAX_AIO = 3000;
    
    protected List<String> defaultInterceptors = new ArrayList<String>();
 
@@ -82,6 +83,8 @@
    
    protected int journalMinFiles;
    
+   protected int maxAIO;
+   
    protected long journalTaskPeriod;
    
    protected boolean securityEnabled = true;
@@ -405,6 +408,16 @@
 		return journalFileSize;
 	}
 
+	public int getJournalMaxAIO()
+	{
+	   return maxAIO;
+	}
+	
+	public void setJournalMaxAIO(int max)
+	{
+	   this.maxAIO = max;
+	}
+	
 	public int getJournalMinFiles()
 	{
 		return journalMinFiles;

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -136,6 +136,8 @@
       
       this.journalTaskPeriod = getLong(e, "journal-task-period", 5000L);
       
+      this.maxAIO = getInteger(e, "journal-max-aio", DEFAULT_MAX_AIO);
+      
       this.securityEnabled = getBoolean(e, "security-enabled", true);
        
       NodeList defaultInterceptors = e.getElementsByTagName("default-interceptors-config");

Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -5,6 +5,7 @@
  * A TestableJournal
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
 public interface TestableJournal extends Journal
@@ -17,6 +18,10 @@
 	
 	int getIDMapSize();
 	
+   String debug() throws Exception;
+
+   void debugWait() throws Exception;
+	
 	//void dump();
 	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -40,7 +40,6 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -1187,8 +1186,8 @@
 		
 		reclaimer.scan(dataFiles.toArray(files));		
 	}
-	
-   public String debug() throws Exception
+
+	public String debug() throws Exception
    {
       this.checkReclaimStatus();
       
@@ -1209,6 +1208,28 @@
             
       return builder.toString();
    }
+   
+   /** Method for use on testcases.
+    *  It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
+   public void debugWait() throws Exception
+   {
+      for (TransactionCallback callback: transactionCallbacks.values())
+      {
+         callback.waitCompletion();
+      }
+      
+      final CountDownLatch latch = new CountDownLatch(1);
+      
+      this.closingExecutor.execute(new Runnable(){
+         public void run()
+         {
+            latch.countDown();
+         }
+      });
+      
+      // just to make sure the closing thread is empty
+      latch.await();
+   }
 
    // TestableJournal implementation --------------------------------------------------------------
 	

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -147,7 +147,7 @@
 	      
 	   messageJournal = new JournalImpl(config.getJournalFileSize(), 
 	   		config.getJournalMinFiles(), config.isJournalSync(), journalFF,
-	   		config.getJournalTaskPeriod(), "jbm-data", "jbm", 2000);
+	   		config.getJournalTaskPeriod(), "jbm-data", "jbm", config.getJournalMaxAIO());
 	}
 	
 	public long generateMessageID()

Modified: trunk/tests/config/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-config.xml	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/tests/config/ConfigurationTest-config.xml	2008-05-21 01:51:25 UTC (rev 4257)
@@ -50,7 +50,9 @@
       <remoting-ssl-keystore-path>messaging.keystore</remoting-ssl-keystore-path>      
       <remoting-ssl-keystore-password>secureexample keystore</remoting-ssl-keystore-password>     
       <remoting-ssl-truststore-path>messaging.truststore</remoting-ssl-truststore-path>            
-      <remoting-ssl-truststore-password>secureexample truststore</remoting-ssl-truststore-password>    
+      <remoting-ssl-truststore-password>secureexample truststore</remoting-ssl-truststore-password>
+      
+      <journal-max-aio>123</journal-max-aio>    
 
    </configuration>
 

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageTestBase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageTestBase.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageTestBase.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -93,7 +93,8 @@
 
       log.debug("Message sent");
 
-      Message r = queueCons.receive();
+      Message r = queueCons.receive(2000);
+      assertNotNull(r);
 
       log.debug("Message received");
 
@@ -110,7 +111,8 @@
 
       queueProd.send(message);
 
-      Message r = queueCons.receive();
+      Message r = queueCons.receive(1000);
+      assertNotNull(r);
 
       assertEquals(DeliveryMode.PERSISTENT, r.getJMSDeliveryMode());
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -126,6 +126,11 @@
       assertEquals("org.jboss.tst2", configuration.getDefaultInterceptors().get(1));
    }
    
+   public void testMaxAIO() throws Exception
+   {
+      assertEquals(123, configuration.getJournalMaxAIO());
+   }
+   
    //config is supposed to be immutable??
 //   public void testPropertyChangeListener() throws Exception
 //   {

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -127,7 +127,7 @@
       this.minFiles = minFreeFiles;
       this.fileSize = fileSize;
       this.sync = sync;
-      this.maxAIO = 1000;
+      this.maxAIO = 50;
    }
    
 	public void createJournal() throws Exception

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2008-05-21 01:51:25 UTC (rev 4257)
@@ -739,6 +739,8 @@
 			addTx(1, i);
 		}
 		
+		journal.debugWait();
+		
 		assertEquals(calculateNumberOfFiles(fileSize , journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
 		assertEquals(0, journal.getFreeFilesCount());
 		assertEquals(0, journal.getIDMapSize());
@@ -776,6 +778,8 @@
 			updateTx(1, i);
 		}
 		
+		journal.debugWait();
+		
 		assertEquals(calculateNumberOfFiles(fileSize , journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
 		assertEquals(0, journal.getFreeFilesCount());
 		assertEquals(0, journal.getIDMapSize());
@@ -813,6 +817,7 @@
 			deleteTx(1, i);
 		}
 		
+		journal.debugWait();
 		
 		assertEquals(calculateNumberOfFiles(fileSize , journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
 		
@@ -921,6 +926,8 @@
 		
 		deleteTx(1, 1);        // in file 1
 		
+		journal.debugWait();
+		
 		List<String> files2 = fileFactory.listFiles(fileExtension);
 		
 		assertEquals(2, files2.size());
@@ -975,7 +982,7 @@
 		assertEquals(0, journal.getFreeFilesCount());
 		assertEquals(2, journal.getIDMapSize());     
 		
-		Thread.sleep(1000);
+		journal.debugWait();
 		//Now restart
 		
 		stopJournal();
@@ -1229,6 +1236,8 @@
 		
 		addTx(1, 1);
 		
+		journal.debugWait();
+		
 		List<String> files2 = fileFactory.listFiles(fileExtension);
 		
 		assertEquals(2, files2.size());




More information about the jboss-cvs-commits mailing list