[hornetq-commits] JBoss hornetq SVN: r11902 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 13 12:04:05 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-13 12:04:05 -0500 (Tue, 13 Dec 2011)
New Revision: 11902

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
JBPAPP-7730 - Deadlock on testsuite (failover on large messages)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-12-13 13:14:08 UTC (rev 11901)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-12-13 17:04:05 UTC (rev 11902)
@@ -76,6 +76,13 @@
    private final boolean browseOnly;
 
    private final Executor sessionExecutor;
+   
+   // For failover we can't send credits back
+   // while holding a lock or failover could dead lock eventually
+   // And we can't use the sessionExecutor as that's being used for message handlers
+   // for that reason we have a separate flowControlExecutor that's using the thread pool 
+   // Which is a OrderedExecutor
+   private final Executor flowControlExecutor;
 
    private final int clientWindowSize;
 
@@ -135,6 +142,7 @@
                              final int ackBatchSize,
                              final TokenBucketLimiter rateLimiter,
                              final Executor executor,
+                             final Executor flowControlExecutor,
                              final Channel channel,
                              final SessionQueueQueryResponseMessage queueInfo,
                              final ClassLoader contextClassLoader)
@@ -162,6 +170,8 @@
       this.queueInfo = queueInfo;
       
       this.contextClassLoader = contextClassLoader;
+      
+      this.flowControlExecutor = flowControlExecutor;
    }
 
    // ClientConsumer implementation
@@ -846,7 +856,13 @@
     */
    private void sendCredits(final int credits)
    {
-      channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+      flowControlExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+         }
+      });
    }
 
    private void waitForOnMessageToComplete(boolean waitForOnMessage)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-12-13 13:14:08 UTC (rev 11901)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-12-13 17:04:05 UTC (rev 11902)
@@ -830,6 +830,7 @@
                                                                      connection,
                                                                      response.getServerVersion(),
                                                                      sessionChannel,
+                                                                     orderedExecutorFactory.getExecutor(),
                                                                      orderedExecutorFactory.getExecutor());
 
                synchronized (sessions)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-12-13 13:14:08 UTC (rev 11901)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-12-13 17:04:05 UTC (rev 11902)
@@ -129,6 +129,9 @@
    private final boolean xa;
 
    private final Executor executor;
+   
+   // to be sent to consumers as consumers will need a separate consumer for flow control
+   private final Executor flowControlExecutor;
 
    private volatile CoreRemotingConnection remotingConnection;
 
@@ -228,7 +231,8 @@
                             final CoreRemotingConnection remotingConnection,
                             final int version,
                             final Channel channel,
-                            final Executor executor) throws HornetQException
+                            final Executor executor,
+                            final Executor flowControlExecutor) throws HornetQException
    {
       this.sessionFactory = sessionFactory;
 
@@ -241,6 +245,8 @@
       this.remotingConnection = remotingConnection;
 
       this.executor = executor;
+      
+      this.flowControlExecutor = flowControlExecutor;
 
       this.xa = xa;
 
@@ -1795,6 +1801,7 @@
                                                                                                                 false)
                                                                                   : null,
                                                                executor,
+                                                               flowControlExecutor,
                                                                channel,
                                                                queueInfo,
                                                                lookupTCCL());



More information about the hornetq-commits mailing list