Author: clebert.suconic(a)jboss.com
Date: 2010-09-08 12:21:01 -0400 (Wed, 08 Sep 2010)
New Revision: 9655
Modified:
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
Log:
Adding new test on paging and ordering
Modified:
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
===================================================================
---
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2010-09-08
11:30:24 UTC (rev 9654)
+++
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2010-09-08
16:21:01 UTC (rev 9655)
@@ -13,6 +13,10 @@
package org.hornetq.tests.integration.paging;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
@@ -58,7 +62,7 @@
AddressSettings defaultSetting = new AddressSettings();
defaultSetting.setPageSizeBytes(10 * 1024);
- defaultSetting.setMaxSizeBytes(100 * 1024);
+ defaultSetting.setMaxSizeBytes(20 * 1024);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
@@ -155,6 +159,115 @@
}
}
+ public void testOrderOverTX() throws Exception
+ {
+ HornetQServer server = newHornetQServer();
+
+ server.start();
+
+ try
+ {
+ ClientSessionFactory sf;
+
+ if (isNetty())
+ {
+ sf = createNettyFactory();
+ }
+ else
+ {
+ sf = createInVMFactory();
+ }
+
+ ClientSession sessionConsumer = sf.createSession(true, true, 0);
+
+ sessionConsumer.createQueue(PagingSendTest.ADDRESS, PagingSendTest.ADDRESS,
null, true);
+
+ final ClientSession sessionProducer = sf.createSession(false, false);
+ final ClientProducer producer =
sessionProducer.createProducer(PagingSendTest.ADDRESS);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final int TOTAL_MESSAGES = 10000;
+
+ // Consumer will be ready after we have commits
+ final CountDownLatch ready = new CountDownLatch(1);
+
+ Thread tProducer = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ int commit = 0;
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i % 100 == 0 && i > 0)
+ {
+ sessionProducer.commit();
+ if (commit++ > 2)
+ {
+ ready.countDown();
+ }
+ }
+ }
+
+ sessionProducer.commit();
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+
+ ClientConsumer consumer =
sessionConsumer.createConsumer(PagingSendTest.ADDRESS);
+
+ sessionConsumer.start();
+
+ tProducer.start();
+
+ assertTrue(ready.await(10, TimeUnit.SECONDS));
+
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = consumer.receive(10000);
+
+ Assert.assertNotNull(msg);
+
+ System.out.println("i = " + i);
+
+ assertEquals(i, msg.getIntProperty("count").intValue());
+
+ msg.acknowledge();
+ }
+
+ tProducer.join();
+
+ sessionConsumer.close();
+
+ sessionProducer.close();
+
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------