Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 16:07:15 -0500 (Wed, 09 Dec 2009)
New Revision: 8656
Modified:
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
Tweak to TimedBuffer.
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 20:53:57 UTC
(rev 8655)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 21:07:15 UTC
(rev 8656)
@@ -148,7 +148,7 @@
bufferObserver = null;
- timer.close();
+ timer.stop();
if (logRates)
{
@@ -258,8 +258,26 @@
/**
- * Note: Flush could be called by either the CheckTime, or by the Journal directly
when moving to a new file
+ * This method will verify if it a flush is required.
+ * It is called directly by the CheckTimer.
+ *
+ * @return true means you can pause spinning for a while
* */
+ private synchronized boolean checkFlush()
+ {
+ // delayFlush and pendingSync are changed inside synchronized blocks
+ // They need to be done atomically
+ if (!delayFlush && pendingSync && bufferObserver != null)
+ {
+ flush();
+ return true;
+ }
+ else return !delayFlush;
+ }
+
+ /**
+ * Note: Flush could be called by either the checkFlush (and timer), or by the Journal
directly before moving to a new file
+ * */
public synchronized void flush()
{
if (buffer.writerIndex() > 0)
@@ -357,7 +375,7 @@
private class CheckTimer implements Runnable
{
- private volatile boolean closed = false;
+ private volatile boolean stopped = false;
private boolean spinning = false;
@@ -404,7 +422,7 @@
public void run()
{
- while (!closed)
+ while (!stopped)
{
// We flush on the timer if there are pending syncs there and we've
waited waited at least one
// timeout since the time of the last flush
@@ -412,17 +430,11 @@
if (System.nanoTime() > lastFlushTime.get() + timeout)
{
- // delayFlush and pendingSync are changed inside synchronized blocks
- // They need to be done atomically
- synchronized (TimedBuffer.this)
+ if (checkFlush())
{
- if (!delayFlush && pendingSync && bufferObserver !=
null)
+ if (!stopped)
{
- flush();
- }
- else if (!closed && !delayFlush)
- {
- // if delayFlush is set, it means we have to keep trying, we
can't stop spinning on this case
+ // can't pause spin if stopped, or we would hang the thread
pauseSpin();
}
}
@@ -442,9 +454,9 @@
}
}
- public void close()
+ public void stop()
{
- closed = true;
+ stopped = true;
resumeSpin();
}
}