[hornetq-commits] JBoss hornetq SVN: r11917 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 20 17:17:53 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-20 17:17:53 -0500 (Tue, 20 Dec 2011)
New Revision: 11917

Added:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
Log:
https://issues.jboss.org/browse/JBPAPP-7706 - fixing client flow control on large message

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-12-20 20:06:45 UTC (rev 11916)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-12-20 22:17:53 UTC (rev 11917)
@@ -335,7 +335,7 @@
                if (expired)
                {
                   m.discardBody();
-
+                  
                   session.expire(id, m.getMessageID());
 
                   if (clientWindowSize == 0)
@@ -596,10 +596,10 @@
 
       // Flow control for the first packet, we will have others
 
-      flowControl(packet.getPacketSize(), false);
-
       ClientLargeMessageInternal currentChunkMessage = (ClientLargeMessageInternal)packet.getLargeMessage();
 
+      currentChunkMessage.setFlowControlSize(packet.getPacketSize());
+
       currentChunkMessage.setDeliveryCount(packet.getDeliveryCount());
 
       File largeMessageCache = null;
@@ -622,8 +622,6 @@
          currentChunkMessage.setLargeMessageController(currentLargeMessageController);
       }
 
-      currentChunkMessage.setFlowControlSize(0);
-
       handleMessage(currentChunkMessage);
    }
 
@@ -756,11 +754,6 @@
       {
          creditsToSend += messageBytes;
          
-         if (log.isTraceEnabled())
-         {
-            log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend + ", clientWindowSize = " + clientWindowSize + " messageBytes = " + messageBytes);
-         }
-
          if (creditsToSend >= clientWindowSize)
          {
             if (clientWindowSize == 0 && discountSlowConsumer)
@@ -783,9 +776,9 @@
             }
             else
             {
-               if (ClientConsumerImpl.trace)
+               if (log.isDebugEnabled())
                {
-                  ClientConsumerImpl.log.trace("Sending " + messageBytes + " from flow-control");
+                  ClientConsumerImpl.log.debug("Sending " + messageBytes + " from flow-control");
                }
 
                final int credits = creditsToSend;
@@ -1013,7 +1006,8 @@
       // Chunk messages will execute the flow control while receiving the chunks
       if (message.getFlowControlSize() != 0)
       {
-         flowControl(message.getFlowControlSize(), true);
+         // on large messages we should discount 1 on the first packets as we need continuity until the last packet
+         flowControl(message.getFlowControlSize(), !message.isLargeMessage());
       }
    }
 

Added: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java	                        (rev 0)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java	2011-12-20 22:17:53 UTC (rev 11917)
@@ -0,0 +1,405 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import javax.naming.Context;
+import javax.naming.NamingException;
+
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ * 
+ * A TestFlowControlOnIgnoreLargeMessageBodyTest
+ *
+ * @author clebertsuconic
+ * @author Pavel Slavice
+ *
+ *
+ */
+public class TestFlowControlOnIgnoreLargeMessageBodyTest extends JMSTestBase
+{
+
+   Logger log = Logger.getLogger(TestFlowControlOnIgnoreLargeMessageBodyTest.class);
+
+   private Topic topic;
+
+   private static int TOTAL_MESSAGES_COUNT = 50000;
+
+   private static int MSG_SIZE = 150 * 1024;
+
+   private final int CONSUMERS_COUNT = 5;
+
+   private static final String ATTR_MSG_COUNTER = "msgIdex";
+
+   protected int receiveTimeout = 2000;
+
+   private volatile boolean error = false;
+
+   @Override
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      jmsServer.createTopic(true, "topicIn", "/topic/topicIn");
+      topic = (Topic)context.lookup("/topic/topicIn");
+   }
+
+   @Override
+   protected boolean usePersistence()
+   {
+      return false;
+   }
+
+   /**
+    * LoadProducer
+    */
+   class LoadProducer extends Thread
+   {
+      private final ConnectionFactory cf;
+
+      private final Topic topic;
+
+      private final int messagesCount;
+
+      private volatile boolean requestForStop = false;
+
+      private volatile boolean stopped = false;
+
+      private volatile int sentMessages = 0;
+
+      LoadProducer(final String name, final Topic topic, final ConnectionFactory cf, final int messagesCount) throws Exception
+      {
+         super(name);
+         this.cf = cf;
+         this.topic = topic;
+         this.messagesCount = messagesCount;
+      }
+
+      public void sendStopRequest()
+      {
+         stopped = false;
+         requestForStop = true;
+      }
+
+      public boolean isStopped()
+      {
+         return stopped;
+      }
+
+      @Override
+      public void run()
+      {
+         stopped = false;
+         Connection connection = null;
+         Session session = null;
+         MessageProducer prod;
+         log.info("Starting producer for " + topic + " - " + getName());
+         try
+         {
+            connection = cf.createConnection();
+            session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            prod = session.createProducer(topic);
+            
+            prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            for (int i = 1; i <= messagesCount && !requestForStop; i++)
+            {
+               if (error)
+               {
+                  break;
+               }
+               sentMessages++;
+               BytesMessage msg = session.createBytesMessage();
+               msg.setIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER, i);
+               msg.writeBytes(new byte[TestFlowControlOnIgnoreLargeMessageBodyTest.MSG_SIZE]);
+               prod.send(msg);
+               if (i % 10 == 0)
+               {
+                  session.commit();
+               }
+               if (i % 100 == 0)
+               {
+                  log.info("Address " + topic + " sent " + i + " messages");
+               }
+            }
+            System.out.println("Ending producer for " + topic + " - " + getName() + " messages " + sentMessages);
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+         finally
+         {
+            try
+            {
+               session.commit();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+            try
+            {
+               connection.close();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+         stopped = true;
+      }
+
+      public int getSentMessages()
+      {
+         return sentMessages;
+      }
+   }
+
+   /**
+    * LoadConsumer
+    */
+   class LoadConsumer extends Thread
+   {
+      private final ConnectionFactory cf;
+
+      private final Topic topic;
+
+      private volatile boolean requestForStop = false;
+
+      private volatile boolean stopped = false;
+
+      private volatile int receivedMessages = 0;
+
+      private final int numberOfMessages;
+
+      private int receiveTimeout = 0;
+
+      LoadConsumer(final String name,
+                   final Topic topic,
+                   final ConnectionFactory cf,
+                   final int receiveTimeout,
+                   final int numberOfMessages)
+      {
+         super(name);
+         this.cf = cf;
+         this.topic = topic;
+         this.receiveTimeout = receiveTimeout;
+         this.numberOfMessages = numberOfMessages;
+      }
+
+      public void sendStopRequest()
+      {
+         stopped = false;
+         requestForStop = true;
+      }
+
+      public boolean isStopped()
+      {
+         return stopped;
+      }
+
+      @Override
+      public void run()
+      {
+         Connection connection = null;
+         Session session = null;
+         stopped = false;
+         requestForStop = false;
+         System.out.println("Starting consumer for " + topic + " - " + getName());
+         try
+         {
+            connection = cf.createConnection();
+            connection.setClientID(getName());
+            connection.start();
+            session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            TopicSubscriber subscriber = session.createDurableSubscriber(topic, getName());
+            int counter = 0;
+            int invalidOrderCounter = 0;
+            while (counter < numberOfMessages && !requestForStop && !error)
+            {
+               if (counter == 0)
+               {
+                  System.out.println("Starting to consume for " + topic + " - " + getName());
+               }
+               BytesMessage msg = (BytesMessage)subscriber.receive(receiveTimeout);
+               if (msg == null)
+               {
+                  System.out.println("Cannot get message in specified timeout: " + topic + " - " + getName());
+                  error = true;
+               }
+               else
+               {
+                  counter++;
+                  // msg.readBytes(new byte[MSG_SIZE]);
+                  if (msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) != counter)
+                  {
+                     if (invalidOrderCounter < 10)
+                     {
+                        System.out.println("Invalid messages order! expected: " + counter +
+                                           ", received " +
+                                           msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) +
+                                           "    " +
+                                           topic +
+                                           " - " +
+                                           getName());
+                        invalidOrderCounter++;
+                     }
+                  }
+               }
+               if (counter % 10 == 0)
+               {
+                  session.commit();
+               }
+               if (counter % 100 == 0)
+               {
+                  log.info("## " + getName() + " " + topic + " received " + counter);
+               }
+               receivedMessages = counter;
+            }
+            session.commit();
+         }
+         catch (Exception e)
+         {
+            System.out.println("Exception in consumer " + getName() + " : " + e.getMessage());
+            e.printStackTrace();
+         }
+         finally
+         {
+            if (session != null)
+            {
+               try
+               {
+                  session.close();
+               }
+               catch (JMSException e)
+               {
+                  System.err.println("Cannot close session " + e.getMessage());
+               }
+            }
+            if (connection != null)
+            {
+               try
+               {
+                  connection.close();
+               }
+               catch (JMSException e)
+               {
+                  System.err.println("Cannot close connection " + e.getMessage());
+               }
+            }
+         }
+         stopped = true;
+         System.out.println("Stopping consumer for " + topic +
+                            " - " +
+                            getName() +
+                            ", received " +
+                            getReceivedMessages());
+      }
+
+      public int getReceivedMessages()
+      {
+         return receivedMessages;
+      }
+   }
+
+   public void testFlowControl()
+   {
+      Context context = null;
+      try
+      {
+         LoadProducer producer = new LoadProducer("producer",
+                                                  topic,
+                                                  cf,
+                                                  TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
+
+         LoadConsumer consumers[] = new LoadConsumer[CONSUMERS_COUNT];
+
+         for (int i = 0; i < consumers.length; i++)
+         {
+            consumers[i] = new LoadConsumer("consumer " + i,
+                                            topic,
+                                            cf,
+                                            receiveTimeout,
+                                            TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
+         }
+
+         for (LoadConsumer consumer : consumers)
+         {
+            consumer.start();
+         }
+         producer.start();
+         producer.join();
+         for (LoadConsumer consumer : consumers)
+         {
+            consumer.join();
+         }
+
+         String errorMessage = null;
+         if (producer.getSentMessages() != TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT)
+         {
+            errorMessage = "Producer did not send defined count of messages";
+         }
+         else
+         {
+            for (LoadConsumer consumer : consumers)
+            {
+               if (consumer.getReceivedMessages() != TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT)
+               {
+                  errorMessage = "Producer did not send defined count of messages";
+                  break;
+               }
+            }
+         }
+
+         if (errorMessage != null)
+         {
+            System.err.println(" ERROR ERROR ERROR ERROR ERROR ERROR ERROR ERROR ERROR ");
+            System.err.println(errorMessage);
+         }
+         else
+         {
+            System.out.println(" OK ");
+         }
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+      }
+      finally
+      {
+         if (context != null)
+         {
+            try
+            {
+               context.close();
+            }
+            catch (NamingException ex)
+            {
+               log.warn(ex.getMessage(), ex);
+            }
+         }
+      }
+   }
+
+}



More information about the hornetq-commits mailing list