Author: clebert.suconic(a)jboss.com
Date: 2009-11-22 22:30:52 -0500 (Sun, 22 Nov 2009)
New Revision: 8371
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
Improving flush time for NIO with callbacks
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-23
02:51:06 UTC (rev 8370)
+++
branches/ClebertCallback/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-23
03:30:52 UTC (rev 8371)
@@ -259,36 +259,55 @@
}
}
- public synchronized void flush()
+ public void flush()
{
- if (buffer.writerIndex() > 0)
+ ByteBuffer bufferToFlush = null;
+
+ boolean useSync = false;
+
+ List<IOAsyncTask> callbacksToCall = null;
+
+ synchronized (this)
{
- latchTimer.up();
+ if (buffer.writerIndex() > 0)
+ {
+ latchTimer.up();
+
+ int pos = buffer.writerIndex();
+
+ if (logRates)
+ {
+ bytesFlushed += pos;
+ }
+
+ bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+
+ // Putting a byteArray on a native buffer is much faster, since it will do in
a single native call.
+ // Using bufferToFlush.put(buffer) would make several append calls for each
byte
+
+ bufferToFlush.put(buffer.array(), 0, pos);
- int pos = buffer.writerIndex();
-
- if (logRates)
- {
- bytesFlushed += pos;
+ callbacksToCall = callbacks;
+
+ callbacks = new LinkedList<IOAsyncTask>();
+
+ useSync = pendingSync;
+
+ active = false;
+ pendingSync = false;
+
+ buffer.clear();
+ bufferLimit = 0;
}
+ }
+
+ // Execute the flush outside of the lock
+ // This is important for NIO performance while we are using NIO Callbacks
+ if (bufferToFlush != null)
+ {
+ bufferObserver.flushBuffer(bufferToFlush, useSync, callbacksToCall);
+ }
- ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
-
- // Putting a byteArray on a native buffer is much faster, since it will do in a
single native call.
- // Using directBuffer.put(buffer) would make several append calls for each byte
-
- directBuffer.put(buffer.array(), 0, pos);
-
- bufferObserver.flushBuffer(directBuffer, pendingSync, callbacks);
-
- callbacks = new LinkedList<IOAsyncTask>();
-
- active = false;
- pendingSync = false;
-
- buffer.clear();
- bufferLimit = 0;
- }
}
// Package protected ---------------------------------------------
Show replies by date