Author: clebert.suconic(a)jboss.com
Date: 2011-12-13 12:04:05 -0500 (Tue, 13 Dec 2011)
New Revision: 11902
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
JBPAPP-7730 - Deadlock on testsuite (failover on large messages)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-13
13:14:08 UTC (rev 11901)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-13
17:04:05 UTC (rev 11902)
@@ -76,6 +76,13 @@
private final boolean browseOnly;
private final Executor sessionExecutor;
+
+ // For failover we can't send credits back
+ // while holding a lock or failover could dead lock eventually
+ // And we can't use the sessionExecutor as that's being used for message
handlers
+ // for that reason we have a separate flowControlExecutor that's using the thread
pool
+ // Which is a OrderedExecutor
+ private final Executor flowControlExecutor;
private final int clientWindowSize;
@@ -135,6 +142,7 @@
final int ackBatchSize,
final TokenBucketLimiter rateLimiter,
final Executor executor,
+ final Executor flowControlExecutor,
final Channel channel,
final SessionQueueQueryResponseMessage queueInfo,
final ClassLoader contextClassLoader)
@@ -162,6 +170,8 @@
this.queueInfo = queueInfo;
this.contextClassLoader = contextClassLoader;
+
+ this.flowControlExecutor = flowControlExecutor;
}
// ClientConsumer implementation
@@ -846,7 +856,13 @@
*/
private void sendCredits(final int credits)
{
- channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ flowControlExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ }
+ });
}
private void waitForOnMessageToComplete(boolean waitForOnMessage)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-13
13:14:08 UTC (rev 11901)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-12-13
17:04:05 UTC (rev 11902)
@@ -830,6 +830,7 @@
connection,
response.getServerVersion(),
sessionChannel,
+
orderedExecutorFactory.getExecutor(),
orderedExecutorFactory.getExecutor());
synchronized (sessions)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-13
13:14:08 UTC (rev 11901)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2011-12-13
17:04:05 UTC (rev 11902)
@@ -129,6 +129,9 @@
private final boolean xa;
private final Executor executor;
+
+ // to be sent to consumers as consumers will need a separate consumer for flow
control
+ private final Executor flowControlExecutor;
private volatile CoreRemotingConnection remotingConnection;
@@ -228,7 +231,8 @@
final CoreRemotingConnection remotingConnection,
final int version,
final Channel channel,
- final Executor executor) throws HornetQException
+ final Executor executor,
+ final Executor flowControlExecutor) throws HornetQException
{
this.sessionFactory = sessionFactory;
@@ -241,6 +245,8 @@
this.remotingConnection = remotingConnection;
this.executor = executor;
+
+ this.flowControlExecutor = flowControlExecutor;
this.xa = xa;
@@ -1795,6 +1801,7 @@
false)
:
null,
executor,
+ flowControlExecutor,
channel,
queueInfo,
lookupTCCL());