Author: clebert.suconic(a)jboss.com
Date: 2010-12-28 23:50:11 -0500 (Tue, 28 Dec 2010)
New Revision: 10084
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
fixing testcases on large message & flow control
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-12-28
22:33:12 UTC (rev 10083)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-12-29
04:50:11 UTC (rev 10084)
@@ -102,8 +102,6 @@
private volatile int creditsToSend;
- private volatile boolean slowConsumerInitialCreditSent = false;
-
private volatile Exception lastException;
private volatile int ackBytes;
@@ -667,8 +665,6 @@
ClientConsumerImpl.log.trace("Sending " + creditsToSend +
" -1, for slow consumer");
}
- slowConsumerInitialCreditSent = false;
-
// sending the credits - 1 initially send to fire the slow consumer, or
the slow consumer would be
// always buffering one after received the first message
final int credits = creditsToSend - 1;
@@ -717,22 +713,17 @@
* */
private void startSlowConsumer()
{
- if (!slowConsumerInitialCreditSent)
+ if (ClientConsumerImpl.trace)
{
- if (ClientConsumerImpl.trace)
- {
- ClientConsumerImpl.log.trace("Sending 1 credit to start delivering of
one message to slow consumer");
- }
- slowConsumerInitialCreditSent = true;
- sendCredits(1);
+ ClientConsumerImpl.log.trace("Sending 1 credit to start delivering of one
message to slow consumer");
}
+ sendCredits(1);
}
private void resetIfSlowConsumer()
{
if (clientWindowSize == 0)
{
- slowConsumerInitialCreditSent = false;
sendCredits(0);
}
}
Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2010-12-28
22:33:12 UTC (rev 10083)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2010-12-29
04:50:11 UTC (rev 10084)
@@ -271,7 +271,10 @@
outStream = output;
}
- consumerInternal.flowControl(totalFlowControl, !continues);
+ if (totalFlowControl > 0)
+ {
+ consumerInternal.flowControl(totalFlowControl, !continues);
+ }
}
public synchronized void saveBuffer(final OutputStream output) throws
HornetQException
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2010-12-28
22:33:12 UTC (rev 10083)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2010-12-29
04:50:11 UTC (rev 10084)
@@ -73,11 +73,12 @@
protected void tearDown() throws Exception
{
locator.close();
-
- super.tearDown();
+
+ super.tearDown();
}
-// TODO need to test crashing a producer with unused credits returns them to the pool
+ // TODO need to test crashing a producer with unused credits returns them to the pool
+
public void testFlowControlSingleConsumer() throws Exception
{
testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
@@ -107,7 +108,7 @@
{
testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 0, 1, 1, 0, false);
}
-
+
public void testFlowControlSingleConsumerSlowConsumer() throws Exception
{
testFlowControl(100, 500, 1024, 512, 512, 512, 1, 1, 10, false);
@@ -219,7 +220,6 @@
server.start();
-
locator.setProducerWindowSize(producerWindowSize);
locator.setConsumerWindowSize(consumerWindowSize);
locator.setAckBatchSize(ackBatchSize);
@@ -346,7 +346,7 @@
ProducerFlowControlTest.log.info("rate is " + rate + " msgs /
sec");
session.close();
-
+
sf.close();
server.stop();
@@ -367,7 +367,6 @@
server.start();
-
locator.setProducerWindowSize(1024);
locator.setConsumerWindowSize(1024);
locator.setAckBatchSize(1024);