Author: clebert.suconic(a)jboss.com
Date: 2009-11-18 20:12:10 -0500 (Wed, 18 Nov 2009)
New Revision: 8319
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
Log:
Tweaks
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-18
21:33:55 UTC (rev 8318)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-11-19
01:12:10 UTC (rev 8319)
@@ -15,21 +15,17 @@
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.asyncio.AIOCallback;
import org.hornetq.core.asyncio.AsynchronousFile;
import org.hornetq.core.asyncio.BufferCallback;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
/**
*
@@ -50,12 +46,6 @@
private final BufferCallback bufferCallback;
- /** A context switch on AIO would make it to synchronize the disk before
- switching to the new thread, what would cause
- serious performance problems. Because of that we make all the writes on
- AIO using a single thread. */
- private final Executor executor;
-
/** The pool for Thread pollers */
private final Executor pollerExecutor;
@@ -69,10 +59,9 @@
final Executor executor,
final Executor pollerExecutor)
{
- super(directory, new File(directory + "/" + fileName), factory);
+ super(executor, directory, new File(directory + "/" + fileName),
factory);
this.maxIO = maxIO;
this.bufferCallback = bufferCallback;
- this.executor = executor;
this.pollerExecutor = pollerExecutor;
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-18
21:33:55 UTC (rev 8318)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-19
01:12:10 UTC (rev 8319)
@@ -36,15 +36,14 @@
public class AIOSequentialFileFactory extends AbstractSequentialFactory
{
- // Timeout used to wait executors to shutdown
- private static final int EXECUTOR_TIMEOUT = 60;
-
private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
private static final boolean trace = log.isTraceEnabled();
private final ReuseBuffersController buffersControl = new ReuseBuffersController();
+ protected ExecutorService pollerExecutor;
+
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
@@ -53,13 +52,6 @@
log.trace(message);
}
- /** A single AIO write executor for every AIO File.
- * This is used only for AIO & instant operations. We only need one
executor-thread for the entire journal as we always have only one active file.
- * And even if we had multiple files at a given moment, this should still be ok, as
we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
- private ExecutorService writeExecutor;
-
- private ExecutorService pollerExecutor;
-
public AIOSequentialFileFactory(final String journalDir)
{
this(journalDir,
@@ -152,9 +144,6 @@
{
super.start();
- writeExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
- true));
-
pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
true));
@@ -163,23 +152,8 @@
@Override
public void stop()
{
- super.stop();
-
buffersControl.stop();
- writeExecutor.shutdown();
-
- try
- {
- if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
- {
- log.warn("Timed out on AIO writer shutdown", new
Exception("Timed out on AIO writer shutdown"));
- }
- }
- catch (InterruptedException e)
- {
- }
-
pollerExecutor.shutdown();
try
@@ -192,6 +166,8 @@
catch (InterruptedException e)
{
}
+
+ super.stop();
}
@Override
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-18
21:33:55 UTC (rev 8318)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-19
01:12:10 UTC (rev 8319)
@@ -19,10 +19,14 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
/**
*
@@ -34,17 +38,29 @@
*/
public abstract class AbstractSequentialFactory implements SequentialFileFactory
{
+
+ // Timeout used to wait executors to shutdown
+ protected static final int EXECUTOR_TIMEOUT = 60;
+
private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
+ /** For AIO: A single AIO write executor for every AIO File.
+ * This is used only for AIO & instant operations. We only need one
executor-thread for the entire journal as we always have only one active file.
+ * And even if we had multiple files at a given moment, this should still be ok, as
we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls.
+ *
+ * For NIO: this is used to execute the callbacks.
+ * We can't call the executor holding a lock.
+ * */
+ protected ExecutorService writeExecutor;
+
protected final String journalDir;
protected final TimedBuffer timedBuffer;
-
+
protected final int bufferSize;
protected final long bufferTimeout;
-
public AbstractSequentialFactory(final String journalDir,
final boolean buffered,
final int bufferSize,
@@ -71,6 +87,22 @@
{
timedBuffer.stop();
}
+
+ if (writeExecutor != null)
+ {
+ writeExecutor.shutdown();
+
+ try
+ {
+ if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ {
+ log.warn("Timed out on AIO writer shutdown", new
Exception("Timed out on AIO writer shutdown"));
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
}
public void start()
@@ -79,6 +111,17 @@
{
timedBuffer.start();
}
+
+ if (isSupportsCallbacks())
+ {
+ writeExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-writer-pool" + System.identityHashCode(this),
+
true));
+ }
+ else
+ {
+ writeExecutor = null;
+ }
+
}
/* (non-Javadoc)
@@ -99,7 +142,7 @@
}
}
}
-
+
public void flush()
{
if (timedBuffer != null)
@@ -117,7 +160,6 @@
}
}
-
public void releaseBuffer(ByteBuffer buffer)
{
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-18
21:33:55 UTC (rev 8318)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-19
01:12:10 UTC (rev 8319)
@@ -16,6 +16,7 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.journal.IOAsyncTask;
@@ -42,7 +43,17 @@
private File file;
private final String directory;
+
+ /** on AIO: A context switch on AIO would make it to synchronize the disk before
+ switching to the new thread, what would cause
+ serious performance problems. Because of that we make all the writes on
+ AIO using a single thread.
+ on NIO: We can't execute callbacks while inside the locks, as more IO
operations could be
+ performed later */
+ protected final Executor executor;
+
+
protected final SequentialFileFactory factory;
protected long fileSize = 0;
@@ -63,12 +74,13 @@
* @param file
* @param directory
*/
- public AbstractSequentialFile(final String directory, final File file, final
SequentialFileFactory factory)
+ public AbstractSequentialFile(final Executor executor, final String directory, final
File file, final SequentialFileFactory factory)
{
super();
this.file = file;
this.directory = directory;
this.factory = factory;
+ this.executor = executor;
}
// Public --------------------------------------------------------
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-18
21:33:55 UTC (rev 8318)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-11-19
01:12:10 UTC (rev 8319)
@@ -18,6 +18,7 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.Executor;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
@@ -40,14 +41,14 @@
private RandomAccessFile rfile;
- public NIOSequentialFile(final SequentialFileFactory factory, final String directory,
final String fileName)
+ public NIOSequentialFile(final SequentialFileFactory factory, final Executor executor,
final String directory, final String fileName)
{
- super(directory, new File(directory + "/" + fileName), factory);
+ super(executor, directory, new File(directory + "/" + fileName),
factory);
}
public NIOSequentialFile(final SequentialFileFactory factory, final File file)
{
- super(file.getParent(), new File(file.getPath()), factory);
+ super(null, file.getParent(), new File(file.getPath()), factory);
}
public int getAlignment()
@@ -239,7 +240,20 @@
if (callback != null)
{
- callback.done();
+ if (executor == null)
+ {
+ callback.done();
+ }
+ else
+ {
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ callback.done();
+ }
+ });
+ }
}
}
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-18
21:33:55 UTC (rev 8318)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-19
01:12:10 UTC (rev 8319)
@@ -66,7 +66,7 @@
// maxIO is ignored on NIO
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
- return new NIOSequentialFile(this, journalDir, fileName);
+ return new NIOSequentialFile(this, this.writeExecutor, journalDir, fileName);
}
public boolean isSupportsCallbacks()