[hornetq-commits] JBoss hornetq SVN: r11902 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Dec 13 12:04:05 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-12-13 12:04:05 -0500 (Tue, 13 Dec 2011)
New Revision: 11902
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
JBPAPP-7730 - Deadlock on testsuite (failover on large messages)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-13 13:14:08 UTC (rev 11901)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-13 17:04:05 UTC (rev 11902)
@@ -76,6 +76,13 @@
private final boolean browseOnly;
private final Executor sessionExecutor;
+
+ // For failover we can't send credits back
+ // while holding a lock or failover could dead lock eventually
+ // And we can't use the sessionExecutor as that's being used for message handlers
+ // for that reason we have a separate flowControlExecutor that's using the thread pool
+ // Which is a OrderedExecutor
+ private final Executor flowControlExecutor;
private final int clientWindowSize;
@@ -135,6 +142,7 @@
final int ackBatchSize,
final TokenBucketLimiter rateLimiter,
final Executor executor,
+ final Executor flowControlExecutor,
final Channel channel,
final SessionQueueQueryResponseMessage queueInfo,
final ClassLoader contextClassLoader)
@@ -162,6 +170,8 @@
this.queueInfo = queueInfo;
this.contextClassLoader = contextClassLoader;
+
+ this.flowControlExecutor = flowControlExecutor;
}
// ClientConsumer implementation
@@ -846,7 +856,13 @@
*/
private void sendCredits(final int credits)
{
- channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ flowControlExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ }
+ });
}
private void waitForOnMessageToComplete(boolean waitForOnMessage)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-13 13:14:08 UTC (rev 11901)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-13 17:04:05 UTC (rev 11902)
@@ -830,6 +830,7 @@
connection,
response.getServerVersion(),
sessionChannel,
+ orderedExecutorFactory.getExecutor(),
orderedExecutorFactory.getExecutor());
synchronized (sessions)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-13 13:14:08 UTC (rev 11901)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-13 17:04:05 UTC (rev 11902)
@@ -129,6 +129,9 @@
private final boolean xa;
private final Executor executor;
+
+ // to be sent to consumers as consumers will need a separate consumer for flow control
+ private final Executor flowControlExecutor;
private volatile CoreRemotingConnection remotingConnection;
@@ -228,7 +231,8 @@
final CoreRemotingConnection remotingConnection,
final int version,
final Channel channel,
- final Executor executor) throws HornetQException
+ final Executor executor,
+ final Executor flowControlExecutor) throws HornetQException
{
this.sessionFactory = sessionFactory;
@@ -241,6 +245,8 @@
this.remotingConnection = remotingConnection;
this.executor = executor;
+
+ this.flowControlExecutor = flowControlExecutor;
this.xa = xa;
@@ -1795,6 +1801,7 @@
false)
: null,
executor,
+ flowControlExecutor,
channel,
queueInfo,
lookupTCCL());
More information about the hornetq-commits
mailing list