[hornetq-commits] JBoss hornetq SVN: r11919 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 20 21:13:14 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-20 21:13:14 -0500 (Tue, 20 Dec 2011)
New Revision: 11919

Modified:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7706 - fixing test

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java	2011-12-21 01:27:21 UTC (rev 11918)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java	2011-12-21 02:13:14 UTC (rev 11919)
@@ -13,6 +13,8 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.util.concurrent.CountDownLatch;
+
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -44,7 +46,7 @@
 
    private Topic topic;
 
-   private static int TOTAL_MESSAGES_COUNT = 50000;
+   private static int TOTAL_MESSAGES_COUNT = 20000;
 
    private static int MSG_SIZE = 150 * 1024;
 
@@ -119,7 +121,7 @@
             connection = cf.createConnection();
             session = connection.createSession(true, Session.SESSION_TRANSACTED);
             prod = session.createProducer(topic);
-            
+
             prod.setDeliveryMode(DeliveryMode.PERSISTENT);
 
             for (int i = 1; i <= messagesCount && !requestForStop; i++)
@@ -195,8 +197,11 @@
       private final int numberOfMessages;
 
       private int receiveTimeout = 0;
+      
+      private final CountDownLatch consumerCreated;
 
-      LoadConsumer(final String name,
+      LoadConsumer(final CountDownLatch consumerCreated,
+                   final String name,
                    final Topic topic,
                    final ConnectionFactory cf,
                    final int receiveTimeout,
@@ -207,6 +212,7 @@
          this.topic = topic;
          this.receiveTimeout = receiveTimeout;
          this.numberOfMessages = numberOfMessages;
+         this.consumerCreated = consumerCreated;
       }
 
       public void sendStopRequest()
@@ -231,12 +237,19 @@
          try
          {
             connection = cf.createConnection();
+
             connection.setClientID(getName());
+
             connection.start();
+
             session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
             TopicSubscriber subscriber = session.createDurableSubscriber(topic, getName());
+            
+            consumerCreated.countDown();
+
             int counter = 0;
-            int invalidOrderCounter = 0;
+
             while (counter < numberOfMessages && !requestForStop && !error)
             {
                if (counter == 0)
@@ -252,21 +265,9 @@
                else
                {
                   counter++;
-                  // msg.readBytes(new byte[MSG_SIZE]);
                   if (msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) != counter)
                   {
-                     if (invalidOrderCounter < 10)
-                     {
-                        error = true;
-                        System.out.println("Invalid messages order! expected: " + counter +
-                                           ", received " +
-                                           msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) +
-                                           "    " +
-                                           topic +
-                                           " - " +
-                                           getName());
-                        invalidOrderCounter++;
-                     }
+                     error = true;
                   }
                }
                if (counter % 10 == 0)
@@ -336,10 +337,13 @@
                                                   TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
 
          LoadConsumer consumers[] = new LoadConsumer[CONSUMERS_COUNT];
+         
+         CountDownLatch latch = new CountDownLatch(CONSUMERS_COUNT);
 
          for (int i = 0; i < consumers.length; i++)
          {
-            consumers[i] = new LoadConsumer("consumer " + i,
+            consumers[i] = new LoadConsumer(latch,
+                                            "consumer " + i,
                                             topic,
                                             cf,
                                             receiveTimeout,
@@ -350,6 +354,9 @@
          {
             consumer.start();
          }
+         
+         latch.await();
+
          producer.start();
          producer.join();
          for (LoadConsumer consumer : consumers)
@@ -383,7 +390,7 @@
          {
             System.out.println(" OK ");
          }
-         
+
          assertFalse(error);
       }
       catch (Exception e)



More information about the hornetq-commits mailing list