[jboss-cvs] JBoss Messaging SVN: r4226 - in trunk/src/main/org/jboss/messaging/core: journal/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat May 17 19:34:18 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-05-17 19:34:18 -0400 (Sat, 17 May 2008)
New Revision: 4226
Modified:
trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
JBMESSAGING-1283 - Executor on closing files
Removing Semaphore usage from poller
Using VariableLatch instead of Semaphroes on AIO on controlling maxIO. (Leaving the lock spinning to the native layer)
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-05-17 09:40:56 UTC (rev 4225)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-05-17 23:34:18 UTC (rev 4226)
@@ -8,7 +8,6 @@
package org.jboss.messaging.core.asyncio.impl;
import java.nio.ByteBuffer;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -17,6 +16,7 @@
import org.jboss.messaging.core.asyncio.AIOCallback;
import org.jboss.messaging.core.asyncio.AsynchronousFile;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.util.VariableLatch;
/**
@@ -98,10 +98,9 @@
private String fileName;
private Thread poller;
private int maxIO;
- private Semaphore writeSemaphore;
+ private VariableLatch writeLatch = new VariableLatch();
private ReadWriteLock lock = new ReentrantReadWriteLock();
- private Lock writeLock = lock.writeLock();
- private Semaphore pollerSemaphore = new Semaphore(1);
+ private Lock writeLock = lock.writeLock();
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -120,8 +119,6 @@
writeLock.lock();
this.maxIO = maxIO;
- this.writeSemaphore = new Semaphore(maxIO);
-
if (opened)
{
throw new IllegalStateException("AsynchronousFile is already opened");
@@ -143,12 +140,12 @@
checkOpened();
writeLock.lock();
- writeSemaphore.acquire(maxIO);
+ writeLatch.waitCompletion();
stopPoller(handler);
- // We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
+ // We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
+ poller.join();
try
{
- pollerSemaphore.acquire();
closeInternal(handler);
addMax(maxIO * -1);
opened = false;
@@ -157,21 +154,20 @@
finally
{
writeLock.unlock();
- pollerSemaphore.release();
}
}
public void write(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
{
checkOpened();
- this.writeSemaphore.acquireUninterruptibly();
+ writeLatch.up();
try
{
write (handler, position, size, directByteBuffer, aioPackage);
}
catch (RuntimeException e)
{
- writeSemaphore.release();
+ writeLatch.down();
throw e;
}
@@ -180,14 +176,14 @@
public void read(final long position, final long size, final ByteBuffer directByteBuffer, final AIOCallback aioPackage)
{
checkOpened();
- this.writeSemaphore.acquireUninterruptibly();
+ writeLatch.up();
try
{
read (handler, position, size, directByteBuffer, aioPackage);
}
catch (RuntimeException e)
{
- writeSemaphore.release();
+ writeLatch.down();
throw e;
}
}
@@ -217,14 +213,14 @@
@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
private void callbackDone(AIOCallback callback)
{
- writeSemaphore.release();
+ writeLatch.down();
callback.done();
}
@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
private void callbackError(AIOCallback callback, int errorCode, String errorMessage)
{
- writeSemaphore.release();
+ writeLatch.down();
callback.onError(errorCode, errorMessage);
}
@@ -244,7 +240,6 @@
poller = new PollerThread();
try
{
- this.pollerSemaphore.acquire();
poller.start();
}
catch (Exception ex)
@@ -305,14 +300,7 @@
public void run()
{
// informing caller that this thread already has the lock
- try
- {
- pollEvents();
- }
- finally
- {
- pollerSemaphore.release();
- }
+ pollEvents();
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-05-17 09:40:56 UTC (rev 4225)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-05-17 23:34:18 UTC (rev 4226)
@@ -100,10 +100,14 @@
int blockSize = aioFile.getBlockSize();
- if (size % (10*1024*1024) == 0)
- {
- blockSize = 10*1024*1024;
- }
+ if (size % (100*1024*1024) == 0)
+ {
+ blockSize = 100*1024*1024;
+ }
+ if (size % (10*1024*1024) == 0)
+ {
+ blockSize = 10*1024*1024;
+ }
else if (size % (1024*1024) == 0)
{
blockSize = 1024*1024;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-17 09:40:56 UTC (rev 4225)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-17 23:34:18 UTC (rev 4226)
@@ -39,7 +39,11 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -162,11 +166,18 @@
private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
private final boolean shouldUseCallback;
+
+ /**
+ * single thread... will shutdown the thread after 60 seconds
+ */
+ private ExecutorService closingExecutor = new ThreadPoolExecutor(1, 1, 60L,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());;
/*
- * We use a semaphore rather than synchronized since it performs better when contended
- */
+ * We use a semaphore rather than synchronized since it performs better when
+ * contended
+ */
//TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
private final Semaphore lock = new Semaphore(1, true);
@@ -1292,6 +1303,9 @@
stopReclaimer();
+ closingExecutor.shutdown();
+ closingExecutor.awaitTermination(120, TimeUnit.SECONDS);
+
if (currentFile != null)
{
currentFile.getFile().close();
@@ -1455,10 +1469,8 @@
if (currentFile == null || fileSize - currentFile.getOffset() < size)
{
- currentFile.getFile().close();
+ closeFile(currentFile);
- dataFiles.add(currentFile);
-
try
{
currentFile = freeFiles.remove();
@@ -1471,6 +1483,23 @@
}
}
+ private void closeFile(final JournalFile file)
+ {
+ this.closingExecutor.execute(new Runnable() { public void run()
+ {
+ try
+ {
+ file.getFile().close();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ dataFiles.add(file);
+ }
+ });
+ }
+
private TransactionNegPos getTransactionInfo(final long txID)
{
TransactionNegPos tx = transactionInfos.get(txID);
More information about the jboss-cvs-commits
mailing list