[hornetq-commits] JBoss hornetq SVN: r11917 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Dec 20 17:17:53 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-12-20 17:17:53 -0500 (Tue, 20 Dec 2011)
New Revision: 11917
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-7706 - fixing client flow control on large message
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-20 20:06:45 UTC (rev 11916)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-12-20 22:17:53 UTC (rev 11917)
@@ -335,7 +335,7 @@
if (expired)
{
m.discardBody();
-
+
session.expire(id, m.getMessageID());
if (clientWindowSize == 0)
@@ -596,10 +596,10 @@
// Flow control for the first packet, we will have others
- flowControl(packet.getPacketSize(), false);
-
ClientLargeMessageInternal currentChunkMessage = (ClientLargeMessageInternal)packet.getLargeMessage();
+ currentChunkMessage.setFlowControlSize(packet.getPacketSize());
+
currentChunkMessage.setDeliveryCount(packet.getDeliveryCount());
File largeMessageCache = null;
@@ -622,8 +622,6 @@
currentChunkMessage.setLargeMessageController(currentLargeMessageController);
}
- currentChunkMessage.setFlowControlSize(0);
-
handleMessage(currentChunkMessage);
}
@@ -756,11 +754,6 @@
{
creditsToSend += messageBytes;
- if (log.isTraceEnabled())
- {
- log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend + ", clientWindowSize = " + clientWindowSize + " messageBytes = " + messageBytes);
- }
-
if (creditsToSend >= clientWindowSize)
{
if (clientWindowSize == 0 && discountSlowConsumer)
@@ -783,9 +776,9 @@
}
else
{
- if (ClientConsumerImpl.trace)
+ if (log.isDebugEnabled())
{
- ClientConsumerImpl.log.trace("Sending " + messageBytes + " from flow-control");
+ ClientConsumerImpl.log.debug("Sending " + messageBytes + " from flow-control");
}
final int credits = creditsToSend;
@@ -1013,7 +1006,8 @@
// Chunk messages will execute the flow control while receiving the chunks
if (message.getFlowControlSize() != 0)
{
- flowControl(message.getFlowControlSize(), true);
+ // on large messages we should discount 1 on the first packets as we need continuity until the last packet
+ flowControl(message.getFlowControlSize(), !message.isLargeMessage());
}
}
Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2011-12-20 22:17:53 UTC (rev 11917)
@@ -0,0 +1,405 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.naming.Context;
+import javax.naming.NamingException;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ *
+ * A TestFlowControlOnIgnoreLargeMessageBodyTest
+ *
+ * @author clebertsuconic
+ * @author Pavel Slavice
+ *
+ *
+ */
+public class TestFlowControlOnIgnoreLargeMessageBodyTest extends JMSTestBase
+{
+
+ Logger log = Logger.getLogger(TestFlowControlOnIgnoreLargeMessageBodyTest.class);
+
+ private Topic topic;
+
+ private static int TOTAL_MESSAGES_COUNT = 50000;
+
+ private static int MSG_SIZE = 150 * 1024;
+
+ private final int CONSUMERS_COUNT = 5;
+
+ private static final String ATTR_MSG_COUNTER = "msgIdex";
+
+ protected int receiveTimeout = 2000;
+
+ private volatile boolean error = false;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ jmsServer.createTopic(true, "topicIn", "/topic/topicIn");
+ topic = (Topic)context.lookup("/topic/topicIn");
+ }
+
+ @Override
+ protected boolean usePersistence()
+ {
+ return false;
+ }
+
+ /**
+ * LoadProducer
+ */
+ class LoadProducer extends Thread
+ {
+ private final ConnectionFactory cf;
+
+ private final Topic topic;
+
+ private final int messagesCount;
+
+ private volatile boolean requestForStop = false;
+
+ private volatile boolean stopped = false;
+
+ private volatile int sentMessages = 0;
+
+ LoadProducer(final String name, final Topic topic, final ConnectionFactory cf, final int messagesCount) throws Exception
+ {
+ super(name);
+ this.cf = cf;
+ this.topic = topic;
+ this.messagesCount = messagesCount;
+ }
+
+ public void sendStopRequest()
+ {
+ stopped = false;
+ requestForStop = true;
+ }
+
+ public boolean isStopped()
+ {
+ return stopped;
+ }
+
+ @Override
+ public void run()
+ {
+ stopped = false;
+ Connection connection = null;
+ Session session = null;
+ MessageProducer prod;
+ log.info("Starting producer for " + topic + " - " + getName());
+ try
+ {
+ connection = cf.createConnection();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ prod = session.createProducer(topic);
+
+ prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i = 1; i <= messagesCount && !requestForStop; i++)
+ {
+ if (error)
+ {
+ break;
+ }
+ sentMessages++;
+ BytesMessage msg = session.createBytesMessage();
+ msg.setIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER, i);
+ msg.writeBytes(new byte[TestFlowControlOnIgnoreLargeMessageBodyTest.MSG_SIZE]);
+ prod.send(msg);
+ if (i % 10 == 0)
+ {
+ session.commit();
+ }
+ if (i % 100 == 0)
+ {
+ log.info("Address " + topic + " sent " + i + " messages");
+ }
+ }
+ System.out.println("Ending producer for " + topic + " - " + getName() + " messages " + sentMessages);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ try
+ {
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ try
+ {
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ stopped = true;
+ }
+
+ public int getSentMessages()
+ {
+ return sentMessages;
+ }
+ }
+
+ /**
+ * LoadConsumer
+ */
+ class LoadConsumer extends Thread
+ {
+ private final ConnectionFactory cf;
+
+ private final Topic topic;
+
+ private volatile boolean requestForStop = false;
+
+ private volatile boolean stopped = false;
+
+ private volatile int receivedMessages = 0;
+
+ private final int numberOfMessages;
+
+ private int receiveTimeout = 0;
+
+ LoadConsumer(final String name,
+ final Topic topic,
+ final ConnectionFactory cf,
+ final int receiveTimeout,
+ final int numberOfMessages)
+ {
+ super(name);
+ this.cf = cf;
+ this.topic = topic;
+ this.receiveTimeout = receiveTimeout;
+ this.numberOfMessages = numberOfMessages;
+ }
+
+ public void sendStopRequest()
+ {
+ stopped = false;
+ requestForStop = true;
+ }
+
+ public boolean isStopped()
+ {
+ return stopped;
+ }
+
+ @Override
+ public void run()
+ {
+ Connection connection = null;
+ Session session = null;
+ stopped = false;
+ requestForStop = false;
+ System.out.println("Starting consumer for " + topic + " - " + getName());
+ try
+ {
+ connection = cf.createConnection();
+ connection.setClientID(getName());
+ connection.start();
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ TopicSubscriber subscriber = session.createDurableSubscriber(topic, getName());
+ int counter = 0;
+ int invalidOrderCounter = 0;
+ while (counter < numberOfMessages && !requestForStop && !error)
+ {
+ if (counter == 0)
+ {
+ System.out.println("Starting to consume for " + topic + " - " + getName());
+ }
+ BytesMessage msg = (BytesMessage)subscriber.receive(receiveTimeout);
+ if (msg == null)
+ {
+ System.out.println("Cannot get message in specified timeout: " + topic + " - " + getName());
+ error = true;
+ }
+ else
+ {
+ counter++;
+ // msg.readBytes(new byte[MSG_SIZE]);
+ if (msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) != counter)
+ {
+ if (invalidOrderCounter < 10)
+ {
+ System.out.println("Invalid messages order! expected: " + counter +
+ ", received " +
+ msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) +
+ " " +
+ topic +
+ " - " +
+ getName());
+ invalidOrderCounter++;
+ }
+ }
+ }
+ if (counter % 10 == 0)
+ {
+ session.commit();
+ }
+ if (counter % 100 == 0)
+ {
+ log.info("## " + getName() + " " + topic + " received " + counter);
+ }
+ receivedMessages = counter;
+ }
+ session.commit();
+ }
+ catch (Exception e)
+ {
+ System.out.println("Exception in consumer " + getName() + " : " + e.getMessage());
+ e.printStackTrace();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (JMSException e)
+ {
+ System.err.println("Cannot close session " + e.getMessage());
+ }
+ }
+ if (connection != null)
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (JMSException e)
+ {
+ System.err.println("Cannot close connection " + e.getMessage());
+ }
+ }
+ }
+ stopped = true;
+ System.out.println("Stopping consumer for " + topic +
+ " - " +
+ getName() +
+ ", received " +
+ getReceivedMessages());
+ }
+
+ public int getReceivedMessages()
+ {
+ return receivedMessages;
+ }
+ }
+
+ public void testFlowControl()
+ {
+ Context context = null;
+ try
+ {
+ LoadProducer producer = new LoadProducer("producer",
+ topic,
+ cf,
+ TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
+
+ LoadConsumer consumers[] = new LoadConsumer[CONSUMERS_COUNT];
+
+ for (int i = 0; i < consumers.length; i++)
+ {
+ consumers[i] = new LoadConsumer("consumer " + i,
+ topic,
+ cf,
+ receiveTimeout,
+ TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
+ }
+
+ for (LoadConsumer consumer : consumers)
+ {
+ consumer.start();
+ }
+ producer.start();
+ producer.join();
+ for (LoadConsumer consumer : consumers)
+ {
+ consumer.join();
+ }
+
+ String errorMessage = null;
+ if (producer.getSentMessages() != TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT)
+ {
+ errorMessage = "Producer did not send defined count of messages";
+ }
+ else
+ {
+ for (LoadConsumer consumer : consumers)
+ {
+ if (consumer.getReceivedMessages() != TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT)
+ {
+ errorMessage = "Producer did not send defined count of messages";
+ break;
+ }
+ }
+ }
+
+ if (errorMessage != null)
+ {
+ System.err.println(" ERROR ERROR ERROR ERROR ERROR ERROR ERROR ERROR ERROR ");
+ System.err.println(errorMessage);
+ }
+ else
+ {
+ System.out.println(" OK ");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ finally
+ {
+ if (context != null)
+ {
+ try
+ {
+ context.close();
+ }
+ catch (NamingException ex)
+ {
+ log.warn(ex.getMessage(), ex);
+ }
+ }
+ }
+ }
+
+}
More information about the hornetq-commits
mailing list