Author: timfox
Date: 2009-12-09 04:52:12 -0500 (Wed, 09 Dec 2009)
New Revision: 8631
Modified:
trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java
trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
Log:
added flush of confirmations on timer
Modified: trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java
===================================================================
--- trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java 2009-12-09
09:27:14 UTC (rev 8630)
+++ trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java 2009-12-09
09:52:12 UTC (rev 8631)
@@ -437,14 +437,13 @@
{
session.commit();
}
-
+
+ session.close();
+
if (useSendAcks)
{
- // Must close the session first since this flushes the confirmations
- session.close();
-
theLatch.await();
- }
+ }
}
finally
{
Modified: trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2009-12-09 09:27:14
UTC (rev 8630)
+++ trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2009-12-09 09:52:12
UTC (rev 8631)
@@ -75,4 +75,6 @@
boolean checkDataReceived();
void removeAllChannels();
+
+ void flushConfirmations();
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-12-09 09:27:14 UTC
(rev 8630)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-12-09 09:52:12 UTC
(rev 8631)
@@ -364,10 +364,11 @@
{
return connection;
}
-
- public void flushConfirmations()
+
+ //Needs to be synchronized since can be called by remoting service timer thread too
for timeout flush
+ public synchronized void flushConfirmations()
{
- if (receivedBytes != 0)
+ if (resendCache != null && receivedBytes != 0)
{
receivedBytes = 0;
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-12-09
09:27:14 UTC (rev 8630)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-12-09
09:52:12 UTC (rev 8631)
@@ -334,6 +334,17 @@
channels.clear();
}
}
+
+ public void flushConfirmations()
+ {
+ synchronized (transferLock)
+ {
+ for (Channel channel: channels.values())
+ {
+ channel.flushConfirmations();
+ }
+ }
+ }
// Buffer Handler implementation
// ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-12-09
09:27:14 UTC (rev 8630)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-12-09
09:52:12 UTC (rev 8631)
@@ -88,7 +88,7 @@
private final ScheduledExecutorService scheduledThreadPool;
- private FailureCheckThread failureCheckThread;
+ private FailureCheckAndFlushThread failureCheckAndFlushThread;
// Static --------------------------------------------------------
@@ -184,9 +184,10 @@
a.start();
}
- failureCheckThread = new
FailureCheckThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
+ //This thread checks connections that need to be closed, and also flushes
confirmations
+ failureCheckAndFlushThread = new
FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
- failureCheckThread.start();
+ failureCheckAndFlushThread.start();
started = true;
}
@@ -220,7 +221,7 @@
return;
}
- failureCheckThread.close();
+ failureCheckAndFlushThread.close();
// We need to stop them accepting first so no new connections are accepted after we
send the disconnect message
for (Acceptor acceptor : acceptors)
@@ -451,13 +452,13 @@
}
}
- private final class FailureCheckThread extends Thread
+ private final class FailureCheckAndFlushThread extends Thread
{
private final long pauseInterval;
private volatile boolean closed;
- FailureCheckThread(final long pauseInterval)
+ FailureCheckAndFlushThread(final long pauseInterval)
{
super("hornetq-failure-check-thread");
@@ -493,15 +494,19 @@
for (ConnectionEntry entry : connections.values())
{
+ RemotingConnection conn = entry.connection;
+
+ boolean flush = true;
+
if (entry.ttl != -1)
{
if (now >= entry.lastCheck + entry.ttl)
{
- RemotingConnection conn = entry.connection;
-
if (!conn.checkDataReceived())
{
idsToRemove.add(conn.getID());
+
+ flush = false;
}
else
{
@@ -509,6 +514,14 @@
}
}
}
+
+ if (flush)
+ {
+ //We flush any confirmations on the connection - this prevents idle
bridges for example
+ //sitting there with many unacked messages
+
+ conn.flushConfirmations();
+ }
}
for (Object id : idsToRemove)
Show replies by date