[hornetq-commits] JBoss hornetq SVN: r9399 - trunk/tests/src/org/hornetq/tests/integration/client.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Jul 13 12:22:26 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-07-13 12:22:26 -0400 (Tue, 13 Jul 2010)
New Revision: 9399
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Adding a new test
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-07-11 11:45:37 UTC (rev 9398)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-07-13 16:22:26 UTC (rev 9399)
@@ -360,7 +360,155 @@
}
}
+
+
+ /**
+ * - Make a destination in page mode
+ * - Add stuff to a transaction
+ * - Consume the entire destination (not in page mode any more)
+ * - Add stuff to a transaction again
+ * - Check order
+ *
+ * Test under discussion at : http://community.jboss.org/thread/154061?tstart=0
+ *
+ */
+ public void disabled_testDepageDuringTransaction2() throws Exception
+ {
+ boolean IS_DURABLE_MESSAGE = true;
+ clearData();
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024; // 1k
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ byte[] body = new byte[messageSize];
+
+ ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
+
+ ClientProducer producerTransacted = sessionTransacted.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage firstMessage = sessionTransacted.createMessage(IS_DURABLE_MESSAGE);
+ firstMessage.getBodyBuffer().writeBytes(body);
+ firstMessage.putIntProperty(new SimpleString("id"), 0);
+
+ producerTransacted.send(firstMessage);
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+
+ ClientMessage message = null;
+
+ int numberOfMessages = 0;
+ while (true)
+ {
+ message = session.createMessage(IS_DURABLE_MESSAGE);
+ message.getBodyBuffer().writeBytes(body);
+
+ // Stop sending message as soon as we start paging
+ if (server.getPostOffice().getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging())
+ {
+ break;
+ }
+ numberOfMessages++;
+
+ producer.send(message);
+ }
+
+ Assert.assertTrue(server.getPostOffice().getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
+
+ session.start();
+
+ for (int i = 1; i < 10; i++)
+ {
+ message = session.createMessage(true);
+ message.getBodyBuffer().writeBytes(body);
+ message.putIntProperty(new SimpleString("id"), i);
+
+ // Consume messages to force an eventual out of order delivery
+ if (i == 5)
+ {
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+ for (int j = 0; j < numberOfMessages; j++)
+ {
+ ClientMessage msg = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+ msg.acknowledge();
+ Assert.assertNotNull(msg);
+ }
+
+ Assert.assertNull(consumer.receiveImmediate());
+ consumer.close();
+ }
+
+ Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
+ Assert.assertNotNull(messageID);
+ Assert.assertEquals(messageID.intValue(), i);
+
+ producerTransacted.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ Assert.assertNull(consumer.receiveImmediate());
+
+ sessionTransacted.commit();
+
+ sessionTransacted.close();
+
+ for (int i = 0; i < 10; i++)
+ {
+ message = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message);
+
+ Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
+
+ Assert.assertNotNull(messageID);
+ Assert.assertEquals("message received out of order", messageID.intValue(), i);
+
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+
+
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);
More information about the hornetq-commits
mailing list