[hornetq-commits] JBoss hornetq SVN: r11956 - in branches/Branch_2_2_AS7/src/main/org/hornetq: ra/recovery and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jan 3 12:49:09 EST 2012


Author: clebert.suconic at jboss.com
Date: 2012-01-03 12:49:08 -0500 (Tue, 03 Jan 2012)
New Revision: 11956

Modified:
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Log:
Merge changes from EAP

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2012-01-03 16:46:17 UTC (rev 11955)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2012-01-03 17:49:08 UTC (rev 11956)
@@ -17,7 +17,9 @@
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
@@ -47,7 +49,7 @@
 {
    // Constants
    // ------------------------------------------------------------------------------------
-
+   
    private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
    
    private static final boolean isTrace = log.isTraceEnabled();
@@ -568,7 +570,7 @@
          // consumed in, which means that acking all up to won't work
          ackIndividually = true;
       }
-
+      
       // Add it to the buffer
       buffer.addTail(messageToHandle, messageToHandle.getPriority());
 
@@ -823,6 +825,25 @@
       if (clientWindowSize == 0)
       {
          sendCredits(0);
+
+         // If resetting a slow consumer, we need to wait the execution
+         final CountDownLatch latch = new CountDownLatch(1);
+         flowControlExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               latch.countDown();
+            }
+         });
+         
+         try
+         {
+            latch.await(10, TimeUnit.SECONDS);
+         }
+         catch (InterruptedException ignored)
+         {
+            // no big deal
+         }
       }
    }
 

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java	2012-01-03 16:46:17 UTC (rev 11955)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java	2012-01-03 17:49:08 UTC (rev 11956)
@@ -90,7 +90,14 @@
 
       for (int i = 0 ; i < locatorClasses.length; i++)
       {
-         registry = (RecoveryRegistry) safeInitNewInstance(locatorClasses[i]);
+         try
+         {
+            registry = (RecoveryRegistry) safeInitNewInstance(locatorClasses[i]);
+         }
+         catch (Throwable e)
+         {
+            log.debug("unable to load  recovery registry " + locatorClasses[i], e);
+         }
          if (registry != null)
          {
             break;



More information about the hornetq-commits mailing list