[hornetq-commits] JBoss hornetq SVN: r10020 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 9 10:01:10 EST 2010
Author: clebert.suconic at jboss.com
Date: 2010-12-09 10:01:09 -0500 (Thu, 09 Dec 2010)
New Revision: 10020
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
Log:
HORNETQ-538 - Fixing issue with flow control
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-12-09 08:23:56 UTC (rev 10019)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-12-09 15:01:09 UTC (rev 10020)
@@ -654,6 +654,7 @@
*/
public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException
{
+ System.err.println("Flow Control being called with clientWindowsize = " + clientWindowSize + " flowControl = " + messageBytes);
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
@@ -675,7 +676,10 @@
creditsToSend = 0;
- sendCredits(credits);
+ if (credits > 0)
+ {
+ sendCredits(credits);
+ }
}
else
{
@@ -688,7 +692,10 @@
creditsToSend = 0;
- sendCredits(credits);
+ if (credits > 0)
+ {
+ sendCredits(credits);
+ }
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2010-12-09 08:23:56 UTC (rev 10019)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2010-12-09 15:01:09 UTC (rev 10020)
@@ -12,6 +12,8 @@
*/
package org.hornetq.tests.integration.client;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
@@ -19,9 +21,14 @@
import junit.framework.Assert;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
@@ -67,8 +74,7 @@
protected void tearDown() throws Exception
{
locator.close();
-
-
+
super.tearDown();
}
@@ -86,7 +92,6 @@
return encodeSize;
}
-
// https://jira.jboss.org/jira/browse/HORNETQ-385
public void testReceiveImmediateWithZeroWindow() throws Exception
{
@@ -99,7 +104,6 @@
ClientSessionFactory sf = locator.createSessionFactory();
-
ClientSession session = sf.createSession(false, false, false);
session.createQueue("testWindow", "testWindow", true);
session.close();
@@ -151,7 +155,7 @@
}
}
-
+
// https://jira.jboss.org/jira/browse/HORNETQ-385
public void testReceiveImmediateWithZeroWindow2() throws Exception
{
@@ -167,12 +171,12 @@
ClientSession session = sf.createSession(false, false, false);
session.createQueue("testReceive", "testReceive", true);
session.close();
-
+
ClientSession sessionProd = sf.createSession(false, false);
ClientMessage msg = sessionProd.createMessage(true);
msg.putStringProperty("hello", "world");
ClientProducer prod = sessionProd.createProducer("testReceive");
-
+
prod.send(msg);
sessionProd.commit();
@@ -183,7 +187,7 @@
Thread.sleep(1000);
ClientMessage message = null;
message = consumer.receiveImmediate();
- //message = consumer.receive(1000); // the test will pass if used receive(1000) instead of receiveImmediate
+ // message = consumer.receive(1000); // the test will pass if used receive(1000) instead of receiveImmediate
assertNotNull(message);
System.out.println(message.getStringProperty("hello"));
message.acknowledge();
@@ -195,7 +199,7 @@
assertNotNull(message);
System.out.println(message.getStringProperty("hello"));
message.acknowledge();
-
+
session.close();
session1.close();
sessionProd.close();
@@ -206,7 +210,7 @@
server.stop();
}
}
-
+
// https://jira.jboss.org/jira/browse/HORNETQ-385
public void testReceiveImmediateWithZeroWindow3() throws Exception
{
@@ -219,7 +223,6 @@
ClientSessionFactory sf = locator.createSessionFactory();
-
ClientSession session = sf.createSession(false, false, false);
session.createQueue("testWindow", "testWindow", true);
session.close();
@@ -271,7 +274,7 @@
}
}
-
+
public void testReceiveImmediateWithZeroWindow4() throws Exception
{
HornetQServer server = createServer(false, isNetty());
@@ -334,8 +337,7 @@
}
}
-
-
+
/*
* tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
* know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
@@ -634,17 +636,15 @@
server.start();
-
locator.setConsumerWindowSize(0);
-
if (largeMessages)
{
locator.setMinLargeMessageSize(100);
}
ClientSessionFactory sf = locator.createSessionFactory();
-
+
session1 = sf.createSession(false, true, true);
session2 = sf.createSession(false, true, true);
@@ -682,7 +682,7 @@
String str = getTextMessage(msg);
Assert.assertEquals("Msg" + i, str);
-
+
log.info("got msg " + str);
msg.acknowledge();
@@ -697,12 +697,11 @@
ClientMessage msg = cons2.receive(1000);
Assert.assertNotNull("expected message at i = " + i, msg);
-
+
String str = getTextMessage(msg);
-
+
log.info("got msg " + str);
-
Assert.assertEquals("Msg" + i, str);
msg.acknowledge();
@@ -810,6 +809,98 @@
}
}
+ public void testSaveBuffersOnLargeMessage() throws Exception
+ {
+ HornetQServer server = createServer(false, isNetty());
+
+ ClientSession session1 = null;
+
+ try
+ {
+ final int numberOfMessages = 10;
+
+ server.start();
+
+ locator.setConsumerWindowSize(0);
+
+ locator.setMinLargeMessageSize(100);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session1 = sf.createSession(false, true, true);
+
+ session1.start();
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ session1.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientConsumerInternal cons1 = (ClientConsumerInternal)session1.createConsumer(ADDRESS);
+
+ // Note we make sure we send the messages *before* cons2 is created
+
+ ClientProducer prod = session1.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session1.createMessage(true);
+ msg.getBodyBuffer().writeBytes(new byte[600]);
+ prod.send(msg);
+ }
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons1.receive(1000);
+ Assert.assertNotNull("expected message at i = " + i, msg);
+
+ msg.saveToOutputStream(new FakeOutputStream());
+
+ msg.acknowledge();
+
+ Assert.assertEquals("A slow consumer shouldn't buffer anything on the client side!",
+ 0,
+ cons1.getBufferSize());
+ }
+
+ session1.close(); // just to make sure everything is flushed and no pending packets on the sending buffer, or
+ session1.close();
+ session1 = null;
+ Assert.assertEquals(0, getMessageCount(server, ADDRESS.toString()));
+
+ }
+ finally
+ {
+ try
+ {
+ if (session1 != null)
+ {
+ session1.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
+ class FakeOutputStream extends OutputStream
+ {
+
+ /* (non-Javadoc)
+ * @see java.io.OutputStream#write(int)
+ */
+ @Override
+ public void write(int b) throws IOException
+ {
+ }
+
+ }
+
public void testSlowConsumerOnMessageHandlerNoBuffers() throws Exception
{
internalTestSlowConsumerOnMessageHandlerNoBuffers(false);
@@ -842,7 +933,7 @@
}
ClientSessionFactory sf = locator.createSessionFactory();
-
+
session = sf.createSession(false, true, true);
SimpleString ADDRESS = new SimpleString("some-queue");
@@ -1047,7 +1138,7 @@
{
ConsumerWindowSizeTest.log.trace("Received message " + str);
}
-
+
ConsumerWindowSizeTest.log.info("Received message " + str);
failed = failed || !str.equals("Msg" + count);
@@ -1093,7 +1184,7 @@
Assert.assertTrue(latchReceived.await(TIMEOUT, TimeUnit.SECONDS));
log.info("bs " + consReceiveOneAndHold.getBufferSize());
-
+
long timeout = System.currentTimeMillis() + 1000 * TIMEOUT;
while (consReceiveOneAndHold.getBufferSize() == 0 && System.currentTimeMillis() < timeout)
{
More information about the hornetq-commits
mailing list