[hornetq-commits] JBoss hornetq SVN: r10020 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 9 10:01:10 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-09 10:01:09 -0500 (Thu, 09 Dec 2010)
New Revision: 10020

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
Log:
HORNETQ-538 - Fixing issue with flow control

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-12-09 08:23:56 UTC (rev 10019)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-12-09 15:01:09 UTC (rev 10020)
@@ -654,6 +654,7 @@
     */
    public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException
    {
+      System.err.println("Flow Control being called with clientWindowsize = " + clientWindowSize + " flowControl = " + messageBytes);
       if (clientWindowSize >= 0)
       {
          creditsToSend += messageBytes;
@@ -675,7 +676,10 @@
 
                creditsToSend = 0;
 
-               sendCredits(credits);
+               if (credits > 0)
+               {
+                  sendCredits(credits);
+               }
             }
             else
             {
@@ -688,7 +692,10 @@
 
                creditsToSend = 0;
 
-               sendCredits(credits);
+               if (credits > 0)
+               {
+                  sendCredits(credits);
+               }
             }
          }
       }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2010-12-09 08:23:56 UTC (rev 10019)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2010-12-09 15:01:09 UTC (rev 10020)
@@ -12,6 +12,8 @@
  */
 package org.hornetq.tests.integration.client;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
@@ -19,9 +21,14 @@
 
 import junit.framework.Assert;
 
-import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.postoffice.Binding;
@@ -67,8 +74,7 @@
    protected void tearDown() throws Exception
    {
       locator.close();
-         
-      
+
       super.tearDown();
    }
 
@@ -86,7 +92,6 @@
       return encodeSize;
    }
 
-   
    // https://jira.jboss.org/jira/browse/HORNETQ-385
    public void testReceiveImmediateWithZeroWindow() throws Exception
    {
@@ -99,7 +104,6 @@
 
          ClientSessionFactory sf = locator.createSessionFactory();
 
-
          ClientSession session = sf.createSession(false, false, false);
          session.createQueue("testWindow", "testWindow", true);
          session.close();
@@ -151,7 +155,7 @@
       }
 
    }
-   
+
    // https://jira.jboss.org/jira/browse/HORNETQ-385
    public void testReceiveImmediateWithZeroWindow2() throws Exception
    {
@@ -167,12 +171,12 @@
          ClientSession session = sf.createSession(false, false, false);
          session.createQueue("testReceive", "testReceive", true);
          session.close();
-         
+
          ClientSession sessionProd = sf.createSession(false, false);
          ClientMessage msg = sessionProd.createMessage(true);
          msg.putStringProperty("hello", "world");
          ClientProducer prod = sessionProd.createProducer("testReceive");
-         
+
          prod.send(msg);
          sessionProd.commit();
 
@@ -183,7 +187,7 @@
          Thread.sleep(1000);
          ClientMessage message = null;
          message = consumer.receiveImmediate();
-         //message = consumer.receive(1000); // the test will pass if used receive(1000) instead of receiveImmediate
+         // message = consumer.receive(1000); // the test will pass if used receive(1000) instead of receiveImmediate
          assertNotNull(message);
          System.out.println(message.getStringProperty("hello"));
          message.acknowledge();
@@ -195,7 +199,7 @@
          assertNotNull(message);
          System.out.println(message.getStringProperty("hello"));
          message.acknowledge();
-         
+
          session.close();
          session1.close();
          sessionProd.close();
@@ -206,7 +210,7 @@
          server.stop();
       }
    }
-   
+
    // https://jira.jboss.org/jira/browse/HORNETQ-385
    public void testReceiveImmediateWithZeroWindow3() throws Exception
    {
@@ -219,7 +223,6 @@
 
          ClientSessionFactory sf = locator.createSessionFactory();
 
-
          ClientSession session = sf.createSession(false, false, false);
          session.createQueue("testWindow", "testWindow", true);
          session.close();
@@ -271,7 +274,7 @@
       }
 
    }
-   
+
    public void testReceiveImmediateWithZeroWindow4() throws Exception
    {
       HornetQServer server = createServer(false, isNetty());
@@ -334,8 +337,7 @@
       }
 
    }
-   
-   
+
    /*
    * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
    * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
@@ -634,17 +636,15 @@
 
          server.start();
 
-
          locator.setConsumerWindowSize(0);
 
-
          if (largeMessages)
          {
             locator.setMinLargeMessageSize(100);
          }
 
          ClientSessionFactory sf = locator.createSessionFactory();
-         
+
          session1 = sf.createSession(false, true, true);
 
          session2 = sf.createSession(false, true, true);
@@ -682,7 +682,7 @@
 
             String str = getTextMessage(msg);
             Assert.assertEquals("Msg" + i, str);
-            
+
             log.info("got msg " + str);
 
             msg.acknowledge();
@@ -697,12 +697,11 @@
             ClientMessage msg = cons2.receive(1000);
 
             Assert.assertNotNull("expected message at i = " + i, msg);
-            
+
             String str = getTextMessage(msg);
-            
+
             log.info("got msg " + str);
 
-            
             Assert.assertEquals("Msg" + i, str);
 
             msg.acknowledge();
@@ -810,6 +809,98 @@
       }
    }
 
+   public void testSaveBuffersOnLargeMessage() throws Exception
+   {
+      HornetQServer server = createServer(false, isNetty());
+
+      ClientSession session1 = null;
+
+      try
+      {
+         final int numberOfMessages = 10;
+
+         server.start();
+
+         locator.setConsumerWindowSize(0);
+
+         locator.setMinLargeMessageSize(100);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session1 = sf.createSession(false, true, true);
+
+         session1.start();
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         session1.createQueue(ADDRESS, ADDRESS, true);
+
+         ClientConsumerInternal cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
+
+         // Note we make sure we send the messages *before* cons2 is created
+
+         ClientProducer prod = session1.createProducer(ADDRESS);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = session1.createMessage(true);
+            msg.getBodyBuffer().writeBytes(new byte[600]);
+            prod.send(msg);
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = cons1.receive(1000);
+            Assert.assertNotNull("expected message at i = " + i, msg);
+
+            msg.saveToOutputStream(new FakeOutputStream());
+
+            msg.acknowledge();
+
+            Assert.assertEquals("A slow consumer shouldn't buffer anything on the client side!",
+                                0,
+                                cons1.getBufferSize());
+         }
+
+         session1.close(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
+         session1.close();
+         session1 = null;
+         Assert.assertEquals(0, getMessageCount(server, ADDRESS.toString()));
+
+      }
+      finally
+      {
+         try
+         {
+            if (session1 != null)
+            {
+               session1.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (server.isStarted())
+         {
+            server.stop();
+         }
+      }
+   }
+
+   class FakeOutputStream extends OutputStream
+   {
+
+      /* (non-Javadoc)
+       * @see java.io.OutputStream#write(int)
+       */
+      @Override
+      public void write(int b) throws IOException
+      {
+      }
+
+   }
+
    public void testSlowConsumerOnMessageHandlerNoBuffers() throws Exception
    {
       internalTestSlowConsumerOnMessageHandlerNoBuffers(false);
@@ -842,7 +933,7 @@
          }
 
          ClientSessionFactory sf = locator.createSessionFactory();
-         
+
          session = sf.createSession(false, true, true);
 
          SimpleString ADDRESS = new SimpleString("some-queue");
@@ -1047,7 +1138,7 @@
                   {
                      ConsumerWindowSizeTest.log.trace("Received message " + str);
                   }
-                  
+
                   ConsumerWindowSizeTest.log.info("Received message " + str);
 
                   failed = failed || !str.equals("Msg" + count);
@@ -1093,7 +1184,7 @@
          Assert.assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
 
          log.info("bs " + consReceiveOneAndHold.getBufferSize());
-         
+
          long timeout = System.currentTimeMillis() + 1000 * TIMEOUT;
          while (consReceiveOneAndHold.getBufferSize() == 0 && System.currentTimeMillis() < timeout)
          {



More information about the hornetq-commits mailing list