[jboss-cvs] JBoss Messaging SVN: r6088 - trunk/tests/src/org/jboss/messaging/tests/integration/client.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Mar 16 06:04:15 EDT 2009


Author: ataylor
Date: 2009-03-16 06:04:15 -0400 (Mon, 16 Mar 2009)
New Revision: 6088

Modified:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
Log:
test

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java	2009-03-16 10:04:08 UTC (rev 6087)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java	2009-03-16 10:04:15 UTC (rev 6088)
@@ -37,6 +37,7 @@
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -1215,27 +1216,86 @@
       }
    }
 
-   /* public void testTempQueueGetsDeleted() throws Exception
+   public void testMultipleConsumersMessageOrder() throws Exception
    {
       MessagingService messagingService = createService(false);
       try
       {
          messagingService.start();
          ClientSessionFactory cf = createInVMFactory();
-         ClientSession session = cf.createSession(false, true, true);
-         session.createQueue(addressA, queueA, false, true);
-         session.close();
-         assertNull(messagingService.getServer().getPostOffice().getBinding(queueA));
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession recSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         int numReceivers = 100;
+         AtomicInteger count = new AtomicInteger(0);
+         int numMessage = 10000;
+         ClientConsumer[] clientConsumers = new ClientConsumer[numReceivers];
+         Receiver[] receivers = new Receiver[numReceivers];
+         CountDownLatch latch = new CountDownLatch(numMessage);
+         for(int i = 0; i < numReceivers; i++)
+         {
+            clientConsumers[i] = recSession.createConsumer(queueA);
+            receivers[i] = new Receiver(latch);
+            clientConsumers[i].setMessageHandler(receivers[i]);
+         }
+         recSession.start();
+         ClientProducer clientProducer = sendSession.createProducer(addressA);
+         for(int i = 0; i < numMessage; i++)
+         {
+            ClientMessage cm = sendSession.createClientMessage(false);
+            cm.getBody().writeInt(count.getAndIncrement());
+            clientProducer.send(cm);   
+         }
+         assertTrue(latch.await(10, TimeUnit.SECONDS));
+         for (Receiver receiver : receivers)
+         {
+            assertFalse("" + receiver.lastMessage, receiver.failed);
+         }
+         sendSession.close();
+         recSession.close();
       }
       finally
       {
-         if(messagingService.isStarted())
+         if (messagingService.isStarted())
          {
             messagingService.stop();
          }
       }
-   }*/
+   }
 
+
+   class Receiver implements MessageHandler
+   {
+      final CountDownLatch latch;
+      int lastMessage = -1;
+      boolean failed = false;
+      public Receiver(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         int i = message.getBody().readInt();
+         try
+         {
+            message.acknowledge();
+         }
+         catch (MessagingException e)
+         {
+            e.printStackTrace();
+         }
+         if( i <= lastMessage)
+         {
+            failed = true;
+         }
+         lastMessage = i;
+         latch.countDown();
+      }
+
+   }
+
+
    private static class MyMessageHandler implements MessageHandler
    {
       volatile int messagesReceived = 0;




More information about the jboss-cvs-commits mailing list