Author: clebert.suconic(a)jboss.com
Date: 2009-11-12 21:05:29 -0500 (Thu, 12 Nov 2009)
New Revision: 8278
Modified:
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-214 - performance tweaks
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-12
21:08:49 UTC (rev 8277)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-13
02:05:29 UTC (rev 8278)
@@ -55,9 +55,6 @@
* This is the class returned to the factory when the file is being activated. */
protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
-
-
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -66,7 +63,7 @@
* @param file
* @param directory
*/
- public AbstractSequentialFile(String directory, File file, SequentialFileFactory
factory)
+ public AbstractSequentialFile(final String directory, final File file, final
SequentialFileFactory factory)
{
super();
this.file = file;
@@ -86,7 +83,6 @@
return file.getName();
}
-
public final void delete() throws Exception
{
if (isOpen())
@@ -107,12 +103,10 @@
return position.get();
}
-
public final void renameTo(final String newFileName) throws Exception
{
close();
File newFile = new File(directory + "/" + newFileName);
-
if (!file.equals(newFile))
{
@@ -120,13 +114,12 @@
file = newFile;
}
}
-
- public final boolean fits(int size)
+ public final boolean fits(final int size)
{
if (timedBuffer == null)
{
- return this.position.get() + size <= fileSize;
+ return position.get() + size <= fileSize;
}
else
{
@@ -150,22 +143,22 @@
}
}
- public void setTimedBuffer(TimedBuffer buffer)
+ public void setTimedBuffer(final TimedBuffer buffer)
{
if (timedBuffer != null)
{
timedBuffer.setObserver(null);
}
- this.timedBuffer = buffer;
+ timedBuffer = buffer;
if (buffer != null)
{
- buffer.setObserver(this.timedBufferObserver);
+ buffer.setObserver(timedBufferObserver);
}
}
-
+
public void write(final HornetQBuffer bytes, final boolean sync, final IOCompletion
callback) throws Exception
{
if (timedBuffer != null)
@@ -196,8 +189,6 @@
write(bytes, false, DummyCallback.getInstance());
}
}
-
-
// Package protected ---------------------------------------------
@@ -208,7 +199,6 @@
return file;
}
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -217,7 +207,7 @@
{
final List<IOCompletion> delegates;
- DelegateCallback(List<IOCompletion> delegates)
+ DelegateCallback(final List<IOCompletion> delegates)
{
this.delegates = delegates;
}
@@ -237,7 +227,7 @@
}
}
- public void onError(int errorCode, String errorMessage)
+ public void onError(final int errorCode, final String errorMessage)
{
for (IOCompletion callback : delegates)
{
@@ -259,7 +249,7 @@
protected class LocalBufferObserver implements TimedBufferObserver
{
- public void flushBuffer(ByteBuffer buffer, List<IOCompletion> callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final
List<IOCompletion> callbacks)
{
buffer.flip();
@@ -269,7 +259,7 @@
}
else
{
- writeDirect(buffer, true, new DelegateCallback(callbacks));
+ writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
}
}
@@ -295,6 +285,7 @@
}
}
+ @Override
public String toString()
{
return "TimedBufferObserver on file (" + getFile().getName() +
")";
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-12 21:08:49 UTC
(rev 8277)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-13 02:05:29 UTC
(rev 8278)
@@ -241,18 +241,15 @@
if (sync)
{
+ if (!pendingSync)
+ {
+ pendingSync = true;
+ }
+
if (flushOnSync)
{
flush();
}
- else
- {
- // We should flush on the next timeout, no matter what other activity happens
on the buffer
- if (!pendingSync)
- {
- pendingSync = true;
- }
- }
}
if (buffer.writerIndex() == bufferLimit)
@@ -281,7 +278,7 @@
directBuffer.put(buffer.array(), 0, pos);
- bufferObserver.flushBuffer(directBuffer, callbacks);
+ bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
callbacks = new ArrayList<IOCompletion>();
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-12
21:08:49 UTC (rev 8277)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java 2009-11-13
02:05:29 UTC (rev 8278)
@@ -39,7 +39,7 @@
// Public --------------------------------------------------------
- public void flushBuffer(ByteBuffer buffer, List<IOCompletion> callbacks);
+ public void flushBuffer(ByteBuffer buffer, boolean syncRequested,
List<IOCompletion> callbacks);
/** Return the number of remaining bytes that still fit on the observer (file) */
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-12
21:08:49 UTC (rev 8277)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-13
02:05:29 UTC (rev 8278)
@@ -226,7 +226,7 @@
log.info("AIO journal selected");
if (!AIOSequentialFileFactory.isSupported())
{
- log.warn("AIO wasn't located on this platform, it will fall back to
using pure Java NIO. " + "If your platform is Linux, install LibAIO to enable
the AIO journal");
+ log.warn("AIO wasn't located on this platform, it will fall back to
using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO
journal");
journalFF = new NIOSequentialFileFactory(journalDir,
true,
config.getJournalBufferSize(),
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-12
21:08:49 UTC (rev 8277)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-11-13
02:05:29 UTC (rev 8278)
@@ -64,7 +64,7 @@
final AtomicInteger flushTimes = new AtomicInteger(0);
class TestObserver implements TimedBufferObserver
{
- public void flushBuffer(final ByteBuffer buffer, final List<IOCompletion>
callbacks)
+ public void flushBuffer(final ByteBuffer buffer, final boolean sync, final
List<IOCompletion> callbacks)
{
buffers.add(buffer);
flushTimes.incrementAndGet();