[hornetq-commits] JBoss hornetq SVN: r10084 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 28 23:50:12 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-28 23:50:11 -0500 (Tue, 28 Dec 2010)
New Revision: 10084

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
fixing testcases on large message & flow control

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-12-28 22:33:12 UTC (rev 10083)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-12-29 04:50:11 UTC (rev 10084)
@@ -102,8 +102,6 @@
 
    private volatile int creditsToSend;
 
-   private volatile boolean slowConsumerInitialCreditSent = false;
-
    private volatile Exception lastException;
 
    private volatile int ackBytes;
@@ -667,8 +665,6 @@
                   ClientConsumerImpl.log.trace("Sending " + creditsToSend + " -1, for slow consumer");
                }
 
-               slowConsumerInitialCreditSent = false;
-
                // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
                // always buffering one after received the first message
                final int credits = creditsToSend - 1;
@@ -717,22 +713,17 @@
     * */
    private void startSlowConsumer()
    {
-      if (!slowConsumerInitialCreditSent)
+      if (ClientConsumerImpl.trace)
       {
-         if (ClientConsumerImpl.trace)
-         {
-            ClientConsumerImpl.log.trace("Sending 1 credit to start delivering of one message to slow consumer");
-         }
-         slowConsumerInitialCreditSent = true;
-         sendCredits(1);
+         ClientConsumerImpl.log.trace("Sending 1 credit to start delivering of one message to slow consumer");
       }
+      sendCredits(1);
    }
 
    private void resetIfSlowConsumer()
    {
       if (clientWindowSize == 0)
       {
-         slowConsumerInitialCreditSent = false;
          sendCredits(0);
       }
    }

Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java	2010-12-28 22:33:12 UTC (rev 10083)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java	2010-12-29 04:50:11 UTC (rev 10084)
@@ -271,7 +271,10 @@
          outStream = output;
       }
 
-      consumerInternal.flowControl(totalFlowControl, !continues);
+      if (totalFlowControl > 0)
+      {
+         consumerInternal.flowControl(totalFlowControl, !continues);
+      }
    }
 
    public synchronized void saveBuffer(final OutputStream output) throws HornetQException

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2010-12-28 22:33:12 UTC (rev 10083)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2010-12-29 04:50:11 UTC (rev 10084)
@@ -73,11 +73,12 @@
    protected void tearDown() throws Exception
    {
       locator.close();
-      
-      super.tearDown(); 
+
+      super.tearDown();
    }
-// TODO need to test crashing a producer with unused credits returns them to the pool
 
+   // TODO need to test crashing a producer with unused credits returns them to the pool
+
    public void testFlowControlSingleConsumer() throws Exception
    {
       testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
@@ -107,7 +108,7 @@
    {
       testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 0, 1, 1, 0, false);
    }
-   
+
    public void testFlowControlSingleConsumerSlowConsumer() throws Exception
    {
       testFlowControl(100, 500, 1024, 512, 512, 512, 1, 1, 10, false);
@@ -219,7 +220,6 @@
 
       server.start();
 
-
       locator.setProducerWindowSize(producerWindowSize);
       locator.setConsumerWindowSize(consumerWindowSize);
       locator.setAckBatchSize(ackBatchSize);
@@ -346,7 +346,7 @@
       ProducerFlowControlTest.log.info("rate is " + rate + " msgs / sec");
 
       session.close();
-      
+
       sf.close();
 
       server.stop();
@@ -367,7 +367,6 @@
 
       server.start();
 
-
       locator.setProducerWindowSize(1024);
       locator.setConsumerWindowSize(1024);
       locator.setAckBatchSize(1024);



More information about the hornetq-commits mailing list