[jboss-cvs] JBoss Messaging SVN: r6088 - trunk/tests/src/org/jboss/messaging/tests/integration/client.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 16 06:04:15 EDT 2009
Author: ataylor
Date: 2009-03-16 06:04:15 -0400 (Mon, 16 Mar 2009)
New Revision: 6088
Modified:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
Log:
test
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java 2009-03-16 10:04:08 UTC (rev 6087)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java 2009-03-16 10:04:15 UTC (rev 6088)
@@ -37,6 +37,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -1215,27 +1216,86 @@
}
}
- /* public void testTempQueueGetsDeleted() throws Exception
+ public void testMultipleConsumersMessageOrder() throws Exception
{
MessagingService messagingService = createService(false);
try
{
messagingService.start();
ClientSessionFactory cf = createInVMFactory();
- ClientSession session = cf.createSession(false, true, true);
- session.createQueue(addressA, queueA, false, true);
- session.close();
- assertNull(messagingService.getServer().getPostOffice().getBinding(queueA));
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientSession recSession = cf.createSession(false, true, true);
+ sendSession.createQueue(addressA, queueA, false);
+ int numReceivers = 100;
+ AtomicInteger count = new AtomicInteger(0);
+ int numMessage = 10000;
+ ClientConsumer[] clientConsumers = new ClientConsumer[numReceivers];
+ Receiver[] receivers = new Receiver[numReceivers];
+ CountDownLatch latch = new CountDownLatch(numMessage);
+ for(int i = 0; i < numReceivers; i++)
+ {
+ clientConsumers[i] = recSession.createConsumer(queueA);
+ receivers[i] = new Receiver(latch);
+ clientConsumers[i].setMessageHandler(receivers[i]);
+ }
+ recSession.start();
+ ClientProducer clientProducer = sendSession.createProducer(addressA);
+ for(int i = 0; i < numMessage; i++)
+ {
+ ClientMessage cm = sendSession.createClientMessage(false);
+ cm.getBody().writeInt(count.getAndIncrement());
+ clientProducer.send(cm);
+ }
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ for (Receiver receiver : receivers)
+ {
+ assertFalse("" + receiver.lastMessage, receiver.failed);
+ }
+ sendSession.close();
+ recSession.close();
}
finally
{
- if(messagingService.isStarted())
+ if (messagingService.isStarted())
{
messagingService.stop();
}
}
- }*/
+ }
+
+ class Receiver implements MessageHandler
+ {
+ final CountDownLatch latch;
+ int lastMessage = -1;
+ boolean failed = false;
+ public Receiver(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void onMessage(ClientMessage message)
+ {
+ int i = message.getBody().readInt();
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException e)
+ {
+ e.printStackTrace();
+ }
+ if( i <= lastMessage)
+ {
+ failed = true;
+ }
+ lastMessage = i;
+ latch.countDown();
+ }
+
+ }
+
+
private static class MyMessageHandler implements MessageHandler
{
volatile int messagesReceived = 0;
More information about the jboss-cvs-commits
mailing list