[jboss-cvs] JBoss Messaging SVN: r4226 - in trunk/src/main/org/jboss/messaging/core: journal/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat May 17 19:34:18 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-05-17 19:34:18 -0400 (Sat, 17 May 2008)
New Revision: 4226

Modified:
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
JBMESSAGING-1283 - Executor on closing files
                   Removing Semaphore usage from poller
                   Using VariableLatch instead of Semaphroes on AIO on controlling maxIO. (Leaving the lock spinning to the native layer)

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-05-17 09:40:56 UTC (rev 4225)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2008-05-17 23:34:18 UTC (rev 4226)
@@ -8,7 +8,6 @@
 package org.jboss.messaging.core.asyncio.impl;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -17,6 +16,7 @@
 import org.jboss.messaging.core.asyncio.AIOCallback;
 import org.jboss.messaging.core.asyncio.AsynchronousFile;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.VariableLatch;
 
 
 /**
@@ -98,10 +98,9 @@
 	private String fileName;
 	private Thread poller;	
 	private int maxIO;	
-	private Semaphore writeSemaphore;	
+	private VariableLatch writeLatch = new VariableLatch();	
 	private ReadWriteLock lock = new ReentrantReadWriteLock();
-	private Lock writeLock = lock.writeLock();	
-	private Semaphore pollerSemaphore = new Semaphore(1);
+	private Lock writeLock = lock.writeLock();
 	
 	/**
 	 *  Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -120,8 +119,6 @@
 			writeLock.lock();
 			this.maxIO = maxIO;
 			
-			this.writeSemaphore = new Semaphore(maxIO);
-			
 			if (opened)
 			{
 				throw new IllegalStateException("AsynchronousFile is already opened");
@@ -143,12 +140,12 @@
 		checkOpened();
 		
 		writeLock.lock();
-		writeSemaphore.acquire(maxIO);
+		writeLatch.waitCompletion();
 		stopPoller(handler);
-		// We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
+      // We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
+		poller.join();
 		try
 		{
-			pollerSemaphore.acquire();
 			closeInternal(handler);
 			addMax(maxIO * -1);
 			opened = false;
@@ -157,21 +154,20 @@
 		finally
 		{
 			writeLock.unlock();
-			pollerSemaphore.release();
 		}
 	}
 		
 	public void write(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
 	{
 		checkOpened();
-		this.writeSemaphore.acquireUninterruptibly();
+		writeLatch.up();
 		try
 		{
 			write (handler, position, size, directByteBuffer, aioPackage);
 		}
 		catch (RuntimeException e)
 		{
-			writeSemaphore.release();
+	      writeLatch.down();
 			throw e;
 		}
 		
@@ -180,14 +176,14 @@
 	public void read(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
 	{
 		checkOpened();
-		this.writeSemaphore.acquireUninterruptibly();
+		writeLatch.up();
 		try
 		{
 			read (handler, position, size, directByteBuffer, aioPackage);
 		}
 		catch (RuntimeException e)
 		{
-			writeSemaphore.release();
+		   writeLatch.down();
 			throw e;
 		}		
 	}
@@ -217,14 +213,14 @@
 	@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
 	private void callbackDone(AIOCallback callback)
 	{
-		writeSemaphore.release();
+		writeLatch.down();
 		callback.done();
 	}
 	
 	@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
 	private void callbackError(AIOCallback callback, int errorCode, String errorMessage)
 	{
-		writeSemaphore.release();
+      writeLatch.down();
 		callback.onError(errorCode, errorMessage);
 	}
 	
@@ -244,7 +240,6 @@
 		poller = new PollerThread(); 
 		try
 		{
-			this.pollerSemaphore.acquire();
 			poller.start();
 		}
 		catch (Exception ex)
@@ -305,14 +300,7 @@
       public void run()
       {
          // informing caller that this thread already has the lock
-         try
-         {
-            pollEvents();
-         }
-         finally
-         {
-            pollerSemaphore.release();
-         }
+         pollEvents();
       }
    }	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-05-17 09:40:56 UTC (rev 4225)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-05-17 23:34:18 UTC (rev 4226)
@@ -100,10 +100,14 @@
 		
 		int blockSize = aioFile.getBlockSize();
 		
-		if (size % (10*1024*1024) == 0)
-		{
-			blockSize = 10*1024*1024;
-		}
+      if (size % (100*1024*1024) == 0)
+      {
+         blockSize = 100*1024*1024;
+      }
+      if (size % (10*1024*1024) == 0)
+      {
+         blockSize = 10*1024*1024;
+      }
 		else if (size % (1024*1024) == 0)
 		{
 			blockSize = 1024*1024;

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-05-17 09:40:56 UTC (rev 4225)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-05-17 23:34:18 UTC (rev 4226)
@@ -39,7 +39,11 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -162,11 +166,18 @@
 	private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
 	
 	private final boolean shouldUseCallback;
+	
+   /**
+    * single thread... will shutdown the thread after 60 seconds
+    */ 
+	private ExecutorService closingExecutor = new ThreadPoolExecutor(1, 1, 60L,
+         TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());;
    
 		
 	/*
-	 * We use a semaphore rather than synchronized since it performs better when contended
-	 */
+    * We use a semaphore rather than synchronized since it performs better when
+    * contended
+    */
 	
 	//TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
 	private final Semaphore lock = new Semaphore(1, true);
@@ -1292,6 +1303,9 @@
 		
 		stopReclaimer();
 		
+		closingExecutor.shutdown();
+		closingExecutor.awaitTermination(120, TimeUnit.SECONDS);
+		
 		if (currentFile != null)
 		{
 			currentFile.getFile().close();
@@ -1455,10 +1469,8 @@
 		
 		if (currentFile == null || fileSize - currentFile.getOffset() < size)
 		{
-			currentFile.getFile().close();
+		   closeFile(currentFile);
 			
-			dataFiles.add(currentFile);
-			
 			try
 			{
 			   currentFile = freeFiles.remove();
@@ -1471,6 +1483,23 @@
 		}     
 	}
 	
+	private void closeFile(final JournalFile file)
+	{
+	   this.closingExecutor.execute(new Runnable() { public void run()
+	   {
+	      try
+	      {
+	         file.getFile().close();
+	      }
+	      catch (Exception e)
+	      {
+	         log.warn(e.getMessage(), e);
+	      }
+	      dataFiles.add(file);
+	   }
+	   });
+	}
+	
 	private TransactionNegPos getTransactionInfo(final long txID)
 	{
 		TransactionNegPos tx = transactionInfos.get(txID);




More information about the jboss-cvs-commits mailing list