Author: clebert.suconic(a)jboss.com
Date: 2011-12-20 21:13:14 -0500 (Tue, 20 Dec 2011)
New Revision: 11919
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7706 - fixing test
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2011-12-21
01:27:21 UTC (rev 11918)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TestFlowControlOnIgnoreLargeMessageBodyTest.java 2011-12-21
02:13:14 UTC (rev 11919)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.client;
+import java.util.concurrent.CountDownLatch;
+
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -44,7 +46,7 @@
private Topic topic;
- private static int TOTAL_MESSAGES_COUNT = 50000;
+ private static int TOTAL_MESSAGES_COUNT = 20000;
private static int MSG_SIZE = 150 * 1024;
@@ -119,7 +121,7 @@
connection = cf.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
prod = session.createProducer(topic);
-
+
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i <= messagesCount && !requestForStop; i++)
@@ -195,8 +197,11 @@
private final int numberOfMessages;
private int receiveTimeout = 0;
+
+ private final CountDownLatch consumerCreated;
- LoadConsumer(final String name,
+ LoadConsumer(final CountDownLatch consumerCreated,
+ final String name,
final Topic topic,
final ConnectionFactory cf,
final int receiveTimeout,
@@ -207,6 +212,7 @@
this.topic = topic;
this.receiveTimeout = receiveTimeout;
this.numberOfMessages = numberOfMessages;
+ this.consumerCreated = consumerCreated;
}
public void sendStopRequest()
@@ -231,12 +237,19 @@
try
{
connection = cf.createConnection();
+
connection.setClientID(getName());
+
connection.start();
+
session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
TopicSubscriber subscriber = session.createDurableSubscriber(topic,
getName());
+
+ consumerCreated.countDown();
+
int counter = 0;
- int invalidOrderCounter = 0;
+
while (counter < numberOfMessages && !requestForStop &&
!error)
{
if (counter == 0)
@@ -252,21 +265,9 @@
else
{
counter++;
- // msg.readBytes(new byte[MSG_SIZE]);
if
(msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) !=
counter)
{
- if (invalidOrderCounter < 10)
- {
- error = true;
- System.out.println("Invalid messages order! expected: "
+ counter +
- ", received " +
-
msg.getIntProperty(TestFlowControlOnIgnoreLargeMessageBodyTest.ATTR_MSG_COUNTER) +
- " " +
- topic +
- " - " +
- getName());
- invalidOrderCounter++;
- }
+ error = true;
}
}
if (counter % 10 == 0)
@@ -336,10 +337,13 @@
TestFlowControlOnIgnoreLargeMessageBodyTest.TOTAL_MESSAGES_COUNT);
LoadConsumer consumers[] = new LoadConsumer[CONSUMERS_COUNT];
+
+ CountDownLatch latch = new CountDownLatch(CONSUMERS_COUNT);
for (int i = 0; i < consumers.length; i++)
{
- consumers[i] = new LoadConsumer("consumer " + i,
+ consumers[i] = new LoadConsumer(latch,
+ "consumer " + i,
topic,
cf,
receiveTimeout,
@@ -350,6 +354,9 @@
{
consumer.start();
}
+
+ latch.await();
+
producer.start();
producer.join();
for (LoadConsumer consumer : consumers)
@@ -383,7 +390,7 @@
{
System.out.println(" OK ");
}
-
+
assertFalse(error);
}
catch (Exception e)