Author: jmesnil
Date: 2009-11-18 11:58:55 -0500 (Wed, 18 Nov 2009)
New Revision: 8312
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
Log:
test message group with & without direct delivery
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-18
16:47:53 UTC (rev 8311)
+++
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-18
16:58:55 UTC (rev 8312)
@@ -50,121 +50,105 @@
private SimpleString qName = new SimpleString("MessageGroupingTestQueue");
- public void testBasicGrouping() throws Exception
+ public void testBasicGroupingWithDirectDelivery() throws Exception
{
- ClientProducer clientProducer = clientSession.createProducer(qName);
- ClientConsumer consumer = clientSession.createConsumer(qName);
- ClientConsumer consumer2 = clientSession.createConsumer(qName);
- clientSession.start();
- SimpleString groupId = new SimpleString("grp1");
- int numMessages = 100;
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, clientSession);
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
- clientProducer.send(message);
- }
- CountDownLatch latch = new CountDownLatch(numMessages);
- DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
- consumer.setMessageHandler(dummyMessageHandler);
- DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
- consumer2.setMessageHandler(dummyMessageHandler2);
- assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertEquals(100, dummyMessageHandler.list.size());
- assertEquals(0, dummyMessageHandler2.list.size());
- consumer.close();
- consumer2.close();
+ doTestBasicGrouping(true);
}
- public void testMultipleGrouping() throws Exception
+ public void testBasicGroupingWithoutDirectDelivery() throws Exception
{
- ClientProducer clientProducer = clientSession.createProducer(qName);
- ClientConsumer consumer = clientSession.createConsumer(qName);
- ClientConsumer consumer2 = clientSession.createConsumer(qName);
- clientSession.start();
- SimpleString groupId = new SimpleString("grp1");
- SimpleString groupId2 = new SimpleString("grp2");
- int numMessages = 100;
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = createTextMessage("m" + i, clientSession);
- if (i % 2 == 0 || i == 0)
- {
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
- }
- else
- {
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
- }
- clientProducer.send(message);
- }
- CountDownLatch latch = new CountDownLatch(numMessages);
- DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
- consumer.setMessageHandler(dummyMessageHandler);
- DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
- consumer2.setMessageHandler(dummyMessageHandler2);
- assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertEquals(50, dummyMessageHandler.list.size());
- int i = 0;
- for (ClientMessage message : dummyMessageHandler.list)
- {
- assertEquals(message.getBody().readString(), "m" + i);
- i += 2;
- }
- assertEquals(50, dummyMessageHandler2.list.size());
- i = 1;
- for (ClientMessage message : dummyMessageHandler2.list)
- {
- assertEquals(message.getBody().readString(), "m" + i);
- i += 2;
- }
- consumer.close();
- consumer2.close();
+ doTestBasicGrouping(false);
}
+
+ public void testMultipleGroupingWithDirectDelivery() throws Exception
+ {
+ doTestMultipleGrouping(true);
+ }
+
+ public void testMultipleGroupingWithoutDirectDelivery() throws Exception
+ {
+ doTestMultipleGrouping(false);
+ }
+
+ public void testMultipleGroupingSingleConsumerWithDirectDelivery() throws Exception
+ {
+ doTestMultipleGroupingSingleConsumer(true);
+ }
+
+ public void testMultipleGroupingSingleConsumerWithoutDirectDelivery() throws
Exception
+ {
+ doTestMultipleGroupingSingleConsumer(false);
+ }
- public void testMultipleGroupingStartConsumersAfterMessagesSent() throws Exception
+ public void testMultipleGroupingTXCommitWithDirectDelivery() throws Exception
{
+ doTestMultipleGroupingTXCommit(true);
+ }
+
+ public void testMultipleGroupingTXCommitWithoutDirectDelivery() throws Exception
+ {
+ doTestMultipleGroupingTXCommit(false);
+ }
+
+ public void testMultipleGroupingTXRollbackWithDirectDelivery() throws Exception
+ {
+ doTestMultipleGroupingTXRollback(true);
+ }
+
+ public void testMultipleGroupingTXRollbackWithoutDirectDelivery() throws Exception
+ {
+ doTestMultipleGroupingTXRollback(false);
+ }
+
+ public void testMultipleGroupingXACommitWithDirectDelivery() throws Exception
+ {
+ dotestMultipleGroupingXACommit(true);
+ }
+
+ public void testMultipleGroupingXACommitWithoutDirectDelivery() throws Exception
+ {
+ dotestMultipleGroupingXACommit(false);
+ }
+
+ public void testMultipleGroupingXARollbackWithDirectDelivery() throws Exception
+ {
+ doTestMultipleGroupingXARollback(true);
+ }
+
+ public void testMultipleGroupingXARollbackWithoutDirectDelivery() throws Exception
+ {
+ doTestMultipleGroupingXARollback(false);
+ }
+
+ private void doTestBasicGrouping(boolean directDelivery) throws Exception
+ {
ClientProducer clientProducer = clientSession.createProducer(qName);
ClientConsumer consumer = clientSession.createConsumer(qName);
ClientConsumer consumer2 = clientSession.createConsumer(qName);
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
SimpleString groupId = new SimpleString("grp1");
- SimpleString groupId2 = new SimpleString("grp2");
int numMessages = 100;
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if (i % 2 == 0 || i == 0)
- {
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
- }
- else
- {
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
- }
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
clientProducer.send(message);
}
-
- clientSession.start();
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
CountDownLatch latch = new CountDownLatch(numMessages);
DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
consumer.setMessageHandler(dummyMessageHandler);
DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
consumer2.setMessageHandler(dummyMessageHandler2);
assertTrue(latch.await(10, TimeUnit.SECONDS));
- assertEquals(50, dummyMessageHandler.list.size());
- int i = 0;
- for (ClientMessage message : dummyMessageHandler.list)
- {
- assertEquals(message.getBody().readString(), "m" + i);
- i += 2;
- }
- assertEquals(50, dummyMessageHandler2.list.size());
- i = 1;
- for (ClientMessage message : dummyMessageHandler2.list)
- {
- assertEquals(message.getBody().readString(), "m" + i);
- i += 2;
- }
+ assertEquals(100, dummyMessageHandler.list.size());
+ assertEquals(0, dummyMessageHandler2.list.size());
consumer.close();
consumer2.close();
}
@@ -229,11 +213,14 @@
consumer.close();
}
- public void testMultipleGroupingSingleConsumer() throws Exception
+ private void doTestMultipleGroupingSingleConsumer(boolean directDelivery) throws
Exception
{
ClientProducer clientProducer = clientSession.createProducer(qName);
ClientConsumer consumer = clientSession.createConsumer(qName);
- clientSession.start();
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
SimpleString groupId = new SimpleString("grp1");
SimpleString groupId2 = new SimpleString("grp2");
int numMessages = 100;
@@ -250,6 +237,10 @@
}
clientProducer.send(message);
}
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
CountDownLatch latch = new CountDownLatch(numMessages);
DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
consumer.setMessageHandler(dummyMessageHandler);
@@ -264,12 +255,15 @@
consumer.close();
}
- public void testMultipleGroupingTXCommit() throws Exception
+ private void doTestMultipleGroupingTXCommit(boolean directDelivery) throws Exception
{
ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
ClientSession clientSession = sessionFactory.createSession(false, false, false);
ClientProducer clientProducer = this.clientSession.createProducer(qName);
- clientSession.start();
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
ClientConsumer consumer = clientSession.createConsumer(qName);
ClientConsumer consumer2 = clientSession.createConsumer(qName);
@@ -289,6 +283,10 @@
}
clientProducer.send(message);
}
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
CountDownLatch latch = new CountDownLatch(numMessages);
DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
consumer.setMessageHandler(dummyMessageHandler);
@@ -317,7 +315,7 @@
clientSession.close();
}
- public void testMultipleGroupingTXRollback() throws Exception
+ private void doTestMultipleGroupingTXRollback(boolean directDelivery) throws
Exception
{
ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
sessionFactory.setBlockOnAcknowledge(true);
@@ -325,7 +323,10 @@
ClientProducer clientProducer = this.clientSession.createProducer(qName);
ClientConsumer consumer = clientSession.createConsumer(qName);
ClientConsumer consumer2 = clientSession.createConsumer(qName);
- clientSession.start();
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
SimpleString groupId = new SimpleString("grp1");
SimpleString groupId2 = new SimpleString("grp2");
int numMessages = 100;
@@ -342,6 +343,10 @@
}
clientProducer.send(message);
}
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
CountDownLatch latch = new CountDownLatch(numMessages);
DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
consumer.setMessageHandler(dummyMessageHandler);
@@ -386,14 +391,17 @@
clientSession.close();
}
- public void testMultipleGroupingXACommit() throws Exception
+ private void dotestMultipleGroupingXACommit(boolean directDelivery) throws Exception
{
ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
ClientSession clientSession = sessionFactory.createSession(true, false, false);
ClientProducer clientProducer = this.clientSession.createProducer(qName);
ClientConsumer consumer = clientSession.createConsumer(qName);
ClientConsumer consumer2 = clientSession.createConsumer(qName);
- clientSession.start();
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
Xid xid = new XidImpl("bq".getBytes(), 4, "gtid".getBytes());
clientSession.start(xid, XAResource.TMNOFLAGS);
@@ -413,6 +421,10 @@
}
clientProducer.send(message);
}
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
CountDownLatch latch = new CountDownLatch(numMessages);
DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
consumer.setMessageHandler(dummyMessageHandler);
@@ -443,13 +455,16 @@
clientSession.close();
}
- public void testMultipleGroupingXARollback() throws Exception
+ private void doTestMultipleGroupingXARollback(boolean directDelivery) throws
Exception
{
ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
sessionFactory.setBlockOnAcknowledge(true);
ClientSession clientSession = sessionFactory.createSession(true, false, false);
ClientProducer clientProducer = this.clientSession.createProducer(qName);
- clientSession.start();
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
ClientConsumer consumer = clientSession.createConsumer(qName);
ClientConsumer consumer2 = clientSession.createConsumer(qName);
Xid xid = new XidImpl("bq".getBytes(), 4, "gtid".getBytes());
@@ -471,6 +486,10 @@
}
clientProducer.send(message);
}
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
CountDownLatch latch = new CountDownLatch(numMessages);
DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
consumer.setMessageHandler(dummyMessageHandler);
@@ -519,6 +538,59 @@
assertNull(consumer.receiveImmediate());
clientSession.close();
}
+
+ private void doTestMultipleGrouping(boolean directDelivery) throws Exception
+ {
+ ClientProducer clientProducer = clientSession.createProducer(qName);
+ ClientConsumer consumer = clientSession.createConsumer(qName);
+ ClientConsumer consumer2 = clientSession.createConsumer(qName);
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
+ SimpleString groupId = new SimpleString("grp1");
+ SimpleString groupId2 = new SimpleString("grp2");
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, clientSession);
+ if (i % 2 == 0 || i == 0)
+ {
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+ }
+ else
+ {
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ }
+ clientProducer.send(message);
+ }
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+ consumer.setMessageHandler(dummyMessageHandler);
+ DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+ consumer2.setMessageHandler(dummyMessageHandler2);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(50, dummyMessageHandler.list.size());
+ int i = 0;
+ for (ClientMessage message : dummyMessageHandler.list)
+ {
+ assertEquals(message.getBody().readString(), "m" + i);
+ i += 2;
+ }
+ assertEquals(50, dummyMessageHandler2.list.size());
+ i = 1;
+ for (ClientMessage message : dummyMessageHandler2.list)
+ {
+ assertEquals(message.getBody().readString(), "m" + i);
+ i += 2;
+ }
+ consumer.close();
+ consumer2.close();
+ }
protected void tearDown() throws Exception
{
@@ -567,31 +639,7 @@
clientSession = sessionFactory.createSession(false, true, true);
clientSession.createQueue(qName, qName, null, false);
}
-
- // do not swallow exception in DeliverRunner.run() to show the IOOBE on the queue
handlers
- public void testSwallowedIndexOutOfBoundsException() throws Exception
- {
- ClientConsumer consumer = clientSession.createConsumer(qName, null, false);
- ClientConsumer consumer2 = clientSession.createConsumer(qName, null, false);
- ClientProducer producer = clientSession.createProducer(qName);
- ClientMessage message = createTextMessage("m0" , clientSession);
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, new
SimpleString("g1"));
- producer.send(message);
-
- clientSession.start();
-
- ClientMessage msg = consumer.receive();
- assertNotNull(msg);
- msg.acknowledge();
- assertNull(consumer.receive(500));
-
- consumer.close();
- consumer2.close();
- consumer = clientSession.createConsumer(qName, null, false);
- assertNull(consumer.receive(500));
- }
-
private static class DummyMessageHandler implements MessageHandler
{
ArrayList<ClientMessage> list = new ArrayList<ClientMessage>();