Author: clebert.suconic(a)jboss.com
Date: 2012-01-03 12:49:08 -0500 (Tue, 03 Jan 2012)
New Revision: 11956
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java
Log:
Merge changes from EAP
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2012-01-03
16:46:17 UTC (rev 11955)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2012-01-03
17:49:08 UTC (rev 11956)
@@ -17,7 +17,9 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
@@ -47,7 +49,7 @@
{
// Constants
//
------------------------------------------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
private static final boolean isTrace = log.isTraceEnabled();
@@ -568,7 +570,7 @@
// consumed in, which means that acking all up to won't work
ackIndividually = true;
}
-
+
// Add it to the buffer
buffer.addTail(messageToHandle, messageToHandle.getPriority());
@@ -823,6 +825,25 @@
if (clientWindowSize == 0)
{
sendCredits(0);
+
+ // If resetting a slow consumer, we need to wait the execution
+ final CountDownLatch latch = new CountDownLatch(1);
+ flowControlExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ latch.await(10, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException ignored)
+ {
+ // no big deal
+ }
}
}
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2012-01-03
16:46:17 UTC (rev 11955)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/ra/recovery/RecoveryManager.java 2012-01-03
17:49:08 UTC (rev 11956)
@@ -90,7 +90,14 @@
for (int i = 0 ; i < locatorClasses.length; i++)
{
- registry = (RecoveryRegistry) safeInitNewInstance(locatorClasses[i]);
+ try
+ {
+ registry = (RecoveryRegistry) safeInitNewInstance(locatorClasses[i]);
+ }
+ catch (Throwable e)
+ {
+ log.debug("unable to load recovery registry " + locatorClasses[i],
e);
+ }
if (registry != null)
{
break;