Author: clebert.suconic(a)jboss.com
Date: 2009-12-09 11:59:54 -0500 (Wed, 09 Dec 2009)
New Revision: 8642
Modified:
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-241 - Changing TimedBuffer to stop spinning
when there's no pending syncs
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
---
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2009-12-09
16:09:45 UTC (rev 8641)
+++
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2009-12-09
16:59:54 UTC (rev 8642)
@@ -143,7 +143,7 @@
if (timedBuffer != null)
{
// When moving to a new file, we need to make sure any pending buffer will be
transfered to the buffer
- timedBuffer.flush(true);
+ timedBuffer.flush();
timedBuffer.setObserver(null);
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 16:09:45 UTC
(rev 8641)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 16:59:54 UTC
(rev 8642)
@@ -20,6 +20,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.buffers.HornetQBuffer;
@@ -46,13 +47,8 @@
private TimedBufferObserver bufferObserver;
- // If the TimedBuffer is idle - i.e. no records are being added, then it's
pointless the timer flush thread
- // in spinning and checking the time - and using up CPU in the process - this
semaphore is used to
- // prevent that
- private final Semaphore spinLimiter = new Semaphore(1);
+ private CheckTimer timer;
- private CheckTimer timerRunnable = new CheckTimer();
-
private final int bufferSize;
private final HornetQBuffer buffer;
@@ -125,9 +121,9 @@
return;
}
- timerRunnable = new CheckTimer();
+ timer = new CheckTimer();
- timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout");
+ timerThread = new Thread(timer, "hornetq-buffer-timeout");
timerThread.start();
@@ -152,10 +148,8 @@
bufferObserver = null;
- spinLimiter.release();
+ timer.close();
- timerRunnable.close();
-
if (logRates)
{
logRatesTimerTask.cancel();
@@ -238,15 +232,9 @@
public synchronized void addBytes(final EncodingSupport bytes, final boolean sync,
final IOAsyncTask callback)
{
+
delayFlush = false;
- if (buffer.writerIndex() == 0)
- {
- // More bytes have been added so the timer flush thread can resume
-
- spinLimiter.release();
- }
-
bytes.encode(buffer);
callbacks.add(callback);
@@ -265,22 +253,18 @@
// }
}
+ timer.resumeSpin();
}
- public void flush()
- {
- flush(false);
- }
-
/**
* force means the Journal is moving to a new file. Any pending write need to be done
immediately
* or data could be lost
* */
- public void flush(final boolean force)
+ public synchronized void flush()
{
synchronized (this)
{
- if ((force || !delayFlush) && buffer.writerIndex() > 0)
+ if (buffer.writerIndex() > 0)
{
int pos = buffer.writerIndex();
@@ -301,17 +285,6 @@
bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
}
- try
- {
- // We acquire the spinLimiter semaphore - this prevents the timer flush
thread unnecessarily spinning
- // when the buffer is inactive
- spinLimiter.acquire();
- }
- catch (InterruptedException e)
- {
- // Ignore
- }
-
lastFlushTime.set(System.nanoTime());
pendingSync = false;
@@ -323,6 +296,8 @@
bufferLimit = 0;
flushesDone.incrementAndGet();
+
+ timer.pauseSpin();
}
}
}
@@ -387,6 +362,58 @@
{
private volatile boolean closed = false;
+ private boolean spinning = false;
+
+ // If the TimedBuffer is idle - i.e. no records are being added, then it's
pointless the timer flush thread
+ // in spinning and checking the time - and using up CPU in the process - this
semaphore is used to
+ // prevent that
+ private final Semaphore spinLimiter = new Semaphore(1);
+
+ public CheckTimer()
+ {
+ if (!spinLimiter.tryAcquire())
+ {
+ // JDK would be screwed up if this was happening
+ throw new IllegalStateException("InternalError: Semaphore not working
properly!");
+ }
+ spinning = false;
+ }
+
+ // Needs to be called within synchronized blocks on TimedBuffer
+ public void resumeSpin()
+ {
+ synchronized (TimedBuffer.this)
+ {
+ if (!spinning)
+ {
+ spinning = true;
+ spinLimiter.release();
+ }
+ }
+ }
+
+ // Needs to be called within synchronized blocks on TimedBuffer
+ public void pauseSpin()
+ {
+ synchronized (TimedBuffer.this)
+ {
+ if (spinning)
+ {
+ spinning = false;
+ try
+ {
+ if (!spinLimiter.tryAcquire(60, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Internal error on TimedBuffer.
Can't stop spinning");
+ }
+ }
+ catch (InterruptedException ignored)
+ {
+ }
+ }
+ }
+ }
+
public void run()
{
while (!closed)
@@ -395,9 +422,22 @@
// timeout since the time of the last flush
// Effectively flushing "resets" the timer
- if (pendingSync && bufferObserver != null &&
System.nanoTime() > lastFlushTime.get() + timeout)
+ if (System.nanoTime() > lastFlushTime.get() + timeout)
{
- flush();
+ // delayFlush and pendingSync are changed inside synchronized blocks
+ // They need to be done atomically
+ synchronized (TimedBuffer.this)
+ {
+ if (!delayFlush && pendingSync && bufferObserver !=
null)
+ {
+ flush();
+ }
+ else if (!closed && !delayFlush)
+ {
+ // if delayFlush is set, it means we have to keep trying, we
can't stop spinning on this case
+ pauseSpin();
+ }
+ }
}
try
@@ -417,6 +457,7 @@
public void close()
{
closed = true;
+ resumeSpin();
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-12-09
16:09:45 UTC (rev 8641)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-12-09
16:59:54 UTC (rev 8642)
@@ -41,6 +41,8 @@
// Attributes ----------------------------------------------------
+ private static final int ONE_SECOND = 1000000000; // in nanoseconds
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -85,13 +87,91 @@
}
}
- TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false); // Any big
timeout
+ TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND, false);
- timedBuffer.setObserver(new TestObserver());
+ timedBuffer.start();
- int x = 0;
- for (int i = 0; i < 10; i++)
+ try
{
+
+ timedBuffer.setObserver(new TestObserver());
+
+ int x = 0;
+ for (int i = 0; i < 10; i++)
+ {
+ byte[] bytes = new byte[10];
+ for (int j = 0; j < 10; j++)
+ {
+ bytes[j] = UnitTestCase.getSamplebyte(x++);
+ }
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(bytes);
+
+ timedBuffer.checkSize(10);
+ timedBuffer.addBytes(buff, false, dummyCallback);
+ }
+
+ timedBuffer.checkSize(1);
+
+ Assert.assertEquals(1, flushTimes.get());
+
+ ByteBuffer flushedBuffer = buffers.get(0);
+
+ Assert.assertEquals(100, flushedBuffer.limit());
+
+ Assert.assertEquals(100, flushedBuffer.capacity());
+
+ flushedBuffer.rewind();
+
+ for (int i = 0; i < 100; i++)
+ {
+ Assert.assertEquals(UnitTestCase.getSamplebyte(i), flushedBuffer.get());
+ }
+ }
+ finally
+ {
+ timedBuffer.stop();
+ }
+
+ }
+
+ public void testTimingAndFlush() throws Exception
+ {
+ final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ final AtomicInteger flushTimes = new AtomicInteger(0);
+ class TestObserver implements TimedBufferObserver
+ {
+ public void flushBuffer(final ByteBuffer buffer, final boolean sync, final
List<IOAsyncTask> callbacks)
+ {
+ buffers.add(buffer);
+ flushTimes.incrementAndGet();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+ */
+ public ByteBuffer newBuffer(final int minSize, final int maxSize)
+ {
+ return ByteBuffer.allocate(maxSize);
+ }
+
+ public int getRemainingBytes()
+ {
+ return 1024 * 1024;
+ }
+ }
+
+ TimedBuffer timedBuffer = new TimedBuffer(100, TimedBufferTest.ONE_SECOND / 10,
false);
+
+ timedBuffer.start();
+
+ try
+ {
+
+ timedBuffer.setObserver(new TestObserver());
+
+ int x = 0;
+
byte[] bytes = new byte[10];
for (int j = 0; j < 10; j++)
{
@@ -102,23 +182,42 @@
timedBuffer.checkSize(10);
timedBuffer.addBytes(buff, false, dummyCallback);
- }
- timedBuffer.checkSize(1);
+ Thread.sleep(200);
- Assert.assertEquals(1, flushTimes.get());
+ Assert.assertEquals(0, flushTimes.get());
- ByteBuffer flushedBuffer = buffers.get(0);
+ bytes = new byte[10];
+ for (int j = 0; j < 10; j++)
+ {
+ bytes[j] = UnitTestCase.getSamplebyte(x++);
+ }
- Assert.assertEquals(100, flushedBuffer.limit());
+ buff = HornetQBuffers.wrappedBuffer(bytes);
- Assert.assertEquals(100, flushedBuffer.capacity());
+ timedBuffer.checkSize(10);
+ timedBuffer.addBytes(buff, true, dummyCallback);
- flushedBuffer.rewind();
+ Thread.sleep(500);
- for (int i = 0; i < 100; i++)
+ Assert.assertEquals(1, flushTimes.get());
+
+ ByteBuffer flushedBuffer = buffers.get(0);
+
+ Assert.assertEquals(20, flushedBuffer.limit());
+
+ Assert.assertEquals(20, flushedBuffer.capacity());
+
+ flushedBuffer.rewind();
+
+ for (int i = 0; i < 20; i++)
+ {
+ Assert.assertEquals(UnitTestCase.getSamplebyte(i), flushedBuffer.get());
+ }
+ }
+ finally
{
- Assert.assertEquals(UnitTestCase.getSamplebyte(i), flushedBuffer.get());
+ timedBuffer.stop();
}
}