Author: ataylor
Date: 2010-05-27 08:43:14 -0400 (Thu, 27 May 2010)
New Revision: 9272
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-385 - reset window size for slow consumers when
forcing redelivery
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-05-27
07:46:08 UTC (rev 9271)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-05-27
12:43:14 UTC (rev 9272)
@@ -264,7 +264,11 @@
if (seq >= forceDeliveryCount.longValue())
{
// forced delivery messages are discarded, nothing has been
delivered by the queue
- return null;
+ if (forcingDelivery)
+ {
+ resetIfSlowConsumer();
+ return null;
+ }
}
else
{
@@ -307,6 +311,7 @@
}
else
{
+ resetIfSlowConsumer();
return null;
}
}
@@ -703,6 +708,15 @@
}
}
+ private void resetIfSlowConsumer()
+ {
+ if(clientWindowSize == 0)
+ {
+ slowConsumerInitialCreditSent = false;
+ sendCredits(0);
+ }
+ }
+
private void requeueExecutors()
{
for (int i = 0; i < buffer.size(); i++)
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-05-27
07:46:08 UTC (rev 9271)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-05-27
12:43:14 UTC (rev 9272)
@@ -460,6 +460,11 @@
// No flow control
availableCredits = null;
}
+ else if(credits == 0)
+ {
+ //reset, used on slow consumers
+ availableCredits.set(0);
+ }
else
{
int previous = availableCredits.getAndAdd(credits);
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2010-05-27
07:46:08 UTC (rev 9271)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2010-05-27
12:43:14 UTC (rev 9272)
@@ -73,7 +73,7 @@
//
https://jira.jboss.org/jira/browse/HORNETQ-385
- public void disabled_testReceiveImmediateWithZeroWindow() throws Exception
+ public void testReceiveImmediateWithZeroWindow() throws Exception
{
HornetQServer server = createServer(false, isNetty());
try
@@ -110,7 +110,7 @@
ClientMessage sent = senderSession.createMessage(true);
sent.putStringProperty("hello", "world");
-
+ System.out.println("sending message");
producer.send(sent);
senderSession.commit();
@@ -137,7 +137,7 @@
}
//
https://jira.jboss.org/jira/browse/HORNETQ-385
- public void disabled_testReceiveImmediateWithZeroWindow2() throws Exception
+ public void testReceiveImmediateWithZeroWindow2() throws Exception
{
HornetQServer server = createServer(true);
@@ -175,7 +175,7 @@
prod.send(msg);
sessionProd.commit();
- message = consumer.receive(1000);
+ message = consumer.receive(10000);
assertNotNull(message);
System.out.println(message.getStringProperty("hello"));
message.acknowledge();
@@ -191,7 +191,7 @@
}
//
https://jira.jboss.org/jira/browse/HORNETQ-385
- public void disabled_testReceiveImmediateWithZeroWindow3() throws Exception
+ public void testReceiveImmediateWithZeroWindow3() throws Exception
{
HornetQServer server = createServer(false, isNetty());
try
@@ -254,7 +254,7 @@
}
- public void disabled_testReceiveImmediateWithZeroWindow4() throws Exception
+ public void testReceiveImmediateWithZeroWindow4() throws Exception
{
HornetQServer server = createServer(false, isNetty());
try