[hornetq-commits] JBoss hornetq SVN: r9655 - branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Sep 8 12:21:01 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-09-08 12:21:01 -0400 (Wed, 08 Sep 2010)
New Revision: 9655
Modified:
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
Log:
Adding new test on paging and ordering
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2010-09-08 11:30:24 UTC (rev 9654)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java 2010-09-08 16:21:01 UTC (rev 9655)
@@ -13,6 +13,10 @@
package org.hornetq.tests.integration.paging;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import junit.framework.Assert;
import org.hornetq.api.core.SimpleString;
@@ -58,7 +62,7 @@
AddressSettings defaultSetting = new AddressSettings();
defaultSetting.setPageSizeBytes(10 * 1024);
- defaultSetting.setMaxSizeBytes(100 * 1024);
+ defaultSetting.setMaxSizeBytes(20 * 1024);
server.getAddressSettingsRepository().addMatch("#", defaultSetting);
@@ -155,6 +159,115 @@
}
}
+ public void testOrderOverTX() throws Exception
+ {
+ HornetQServer server = newHornetQServer();
+
+ server.start();
+
+ try
+ {
+ ClientSessionFactory sf;
+
+ if (isNetty())
+ {
+ sf = createNettyFactory();
+ }
+ else
+ {
+ sf = createInVMFactory();
+ }
+
+ ClientSession sessionConsumer = sf.createSession(true, true, 0);
+
+ sessionConsumer.createQueue(PagingSendTest.ADDRESS, PagingSendTest.ADDRESS, null, true);
+
+ final ClientSession sessionProducer = sf.createSession(false, false);
+ final ClientProducer producer = sessionProducer.createProducer(PagingSendTest.ADDRESS);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final int TOTAL_MESSAGES = 10000;
+
+ // Consumer will be ready after we have commits
+ final CountDownLatch ready = new CountDownLatch(1);
+
+ Thread tProducer = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ int commit = 0;
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i % 100 == 0 && i > 0)
+ {
+ sessionProducer.commit();
+ if (commit++ > 2)
+ {
+ ready.countDown();
+ }
+ }
+ }
+
+ sessionProducer.commit();
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+
+ ClientConsumer consumer = sessionConsumer.createConsumer(PagingSendTest.ADDRESS);
+
+ sessionConsumer.start();
+
+ tProducer.start();
+
+ assertTrue(ready.await(10, TimeUnit.SECONDS));
+
+ for (int i = 0; i < TOTAL_MESSAGES; i++)
+ {
+ ClientMessage msg = consumer.receive(10000);
+
+ Assert.assertNotNull(msg);
+
+ System.out.println("i = " + i);
+
+ assertEquals(i, msg.getIntProperty("count").intValue());
+
+ msg.acknowledge();
+ }
+
+ tProducer.join();
+
+ sessionConsumer.close();
+
+ sessionProducer.close();
+
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the hornetq-commits
mailing list