[hornetq-commits] JBoss hornetq SVN: r8658 - in trunk: src/main/org/hornetq/core/journal/impl and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 10 04:04:09 EST 2009
Author: timfox
Date: 2009-12-10 04:04:09 -0500 (Thu, 10 Dec 2009)
New Revision: 8658
Modified:
trunk/examples/core/perf/server0/hornetq-configuration.xml
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
Log:
simplified timed buffer fix + fix extra hang in receiveimmediate code
Modified: trunk/examples/core/perf/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-12-10 09:04:09 UTC (rev 8658)
@@ -17,7 +17,7 @@
<persistence-enabled>true</persistence-enabled>
- <journal-sync-non-transactional>false</journal-sync-non-transactional>
+ <journal-sync-non-transactional>true</journal-sync-non-transactional>
<journal-sync-transactional>true</journal-sync-transactional>
<journal-type>ASYNCIO</journal-type>
<journal-min-files>20</journal-min-files>
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-10 09:04:09 UTC (rev 8658)
@@ -20,7 +20,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.buffers.HornetQBuffer;
@@ -47,8 +47,13 @@
private TimedBufferObserver bufferObserver;
- private CheckTimer timer;
+ // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
+ // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
+ // prevent that
+ private final Semaphore spinLimiter = new Semaphore(1);
+ private CheckTimer timerRunnable = new CheckTimer();
+
private final int bufferSize;
private final HornetQBuffer buffer;
@@ -85,6 +90,8 @@
private final AtomicLong lastFlushTime = new AtomicLong(0);
+ private boolean spinning = false;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -121,9 +128,9 @@
return;
}
- timer = new CheckTimer();
+ timerRunnable = new CheckTimer();
- timerThread = new Thread(timer, "hornetq-buffer-timeout");
+ timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout");
timerThread.start();
@@ -134,6 +141,15 @@
logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
}
+ // Need to start with the spin limiter acquired
+ try
+ {
+ spinLimiter.acquire();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
started = true;
}
@@ -148,8 +164,10 @@
bufferObserver = null;
- timer.stop();
+ spinLimiter.release();
+ timerRunnable.close();
+
if (logRates)
{
logRatesTimerTask.cancel();
@@ -232,7 +250,6 @@
public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
{
-
delayFlush = false;
bytes.encode(buffer);
@@ -252,69 +269,78 @@
// flush();
// }
- timer.resumeSpin();
+ if (!spinning)
+ {
+ spinLimiter.release();
+
+ spinning = true;
+ }
}
}
-
- /**
- * This method will verify if it a flush is required.
- * It is called directly by the CheckTimer.
- *
- * @return true means you can pause spinning for a while
- * */
- private synchronized boolean checkFlush()
+ public void flush()
{
- // delayFlush and pendingSync are changed inside synchronized blocks
- // They need to be done atomically
- if (!delayFlush && pendingSync && bufferObserver != null)
- {
- flush();
- return true;
- }
- else return !delayFlush;
+ flush(false);
}
-
+
/**
- * Note: Flush could be called by either the checkFlush (and timer), or by the Journal directly before moving to a new file
+ * force means the Journal is moving to a new file. Any pending write need to be done immediately
+ * or data could be lost
* */
- public synchronized void flush()
+ public void flush(final boolean force)
{
- if (buffer.writerIndex() > 0)
+ synchronized (this)
{
- int pos = buffer.writerIndex();
-
- if (logRates)
+ if ((force || !delayFlush) && buffer.writerIndex() > 0)
{
- bytesFlushed.addAndGet(pos);
- }
+ int pos = buffer.writerIndex();
- ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+ if (logRates)
+ {
+ bytesFlushed.addAndGet(pos);
+ }
- // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
- // Using bufferToFlush.put(buffer) would make several append calls for each byte
+ ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
- bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
+ // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+ // Using bufferToFlush.put(buffer) would make several append calls for each byte
- if (bufferToFlush != null)
- {
- bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
- }
+ bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
- lastFlushTime.set(System.nanoTime());
+ if (bufferToFlush != null)
+ {
+ bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+ }
- pendingSync = false;
+ if (spinning)
+ {
+ try
+ {
+ // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
+ // when the buffer is inactive
+ spinLimiter.acquire();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
- callbacks = new LinkedList<IOAsyncTask>();
+ spinning = false;
+ }
- buffer.clear();
+ lastFlushTime.set(System.nanoTime());
- bufferLimit = 0;
+ pendingSync = false;
- flushesDone.incrementAndGet();
+ callbacks = new LinkedList<IOAsyncTask>();
- timer.pauseSpin();
+ buffer.clear();
+
+ bufferLimit = 0;
+
+ flushesDone.incrementAndGet();
+ }
}
}
@@ -376,69 +402,19 @@
private class CheckTimer implements Runnable
{
- private volatile boolean stopped = false;
+ private volatile boolean closed = false;
- private boolean spinning = false;
-
- // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
- // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
- // prevent that
- private final Semaphore spinLimiter = new Semaphore(1);
-
- public CheckTimer()
- {
- if (!spinLimiter.tryAcquire())
- {
- // JDK would be screwed up if this was happening
- throw new IllegalStateException("InternalError: Semaphore not working properly!");
- }
- spinning = false;
- }
-
- // Needs to be called within synchronized blocks on TimedBuffer
- public void resumeSpin()
- {
- spinning = true;
- spinLimiter.release();
- }
-
- // Needs to be called within synchronized blocks on TimedBuffer
- public void pauseSpin()
- {
- if (spinning)
- {
- spinning = false;
- try
- {
- if (!spinLimiter.tryAcquire(60, TimeUnit.SECONDS))
- {
- throw new IllegalStateException("Internal error on TimedBuffer. Can't stop spinning");
- }
- }
- catch (InterruptedException ignored)
- {
- }
- }
- }
-
public void run()
{
- while (!stopped)
+ while (!closed)
{
// We flush on the timer if there are pending syncs there and we've waited waited at least one
// timeout since the time of the last flush
// Effectively flushing "resets" the timer
- if (System.nanoTime() > lastFlushTime.get() + timeout)
+ if (pendingSync && bufferObserver != null && System.nanoTime() > lastFlushTime.get() + timeout)
{
- if (checkFlush())
- {
- if (!stopped)
- {
- // can't pause spin if stopped, or we would hang the thread
- pauseSpin();
- }
- }
+ flush();
}
try
@@ -455,10 +431,9 @@
}
}
- public void stop()
+ public void close()
{
- stopped = true;
- resumeSpin();
+ closed = true;
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-10 09:04:09 UTC (rev 8658)
@@ -48,6 +48,7 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
/**
@@ -120,10 +121,9 @@
private final ManagementService managementService;
private final Binding binding;
-
+
private boolean transferring = false;
-
// Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id,
@@ -201,7 +201,7 @@
// should go back into the
// queue for delivery later.
if (!started || transferring)
- {
+ {
return HandleStatus.BUSY;
}
@@ -416,25 +416,59 @@
promptDelivery(true);
}
}
-
+
public void setTransferring(final boolean transferring)
{
lock.lock();
try
{
this.transferring = transferring;
+
+ if (transferring)
+ {
+ // Now we must wait for any large message delivery to finish
+ while (largeMessageInDelivery)
+ {
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
}
finally
{
lock.unlock();
}
-
+
+ //Outside the lock
+ if (transferring)
+ {
+ // And we must wait for any force delivery to be executed - this is executed async so we add a future to the
+ // executor and
+ // wait for it to complete
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ boolean ok = future.await(10000);
+
+ if (!ok)
+ {
+ log.warn("Timed out waiting for executor to complete");
+ }
+ }
+
if (!transferring)
{
promptDelivery(true);
}
}
-
+
public void receiveCredits(final int credits) throws Exception
{
if (credits == -1)
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2009-12-10 09:04:09 UTC (rev 8658)
@@ -742,7 +742,7 @@
for (MyHandler handler : handlers)
{
- boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+ boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ok);
}
More information about the hornetq-commits
mailing list