[hornetq-commits] JBoss hornetq SVN: r8312 - trunk/tests/src/org/hornetq/tests/integration/client.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 18 11:58:56 EST 2009


Author: jmesnil
Date: 2009-11-18 11:58:55 -0500 (Wed, 18 Nov 2009)
New Revision: 8312

Modified:
   trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
Log:
test message group with & without direct delivery

Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java	2009-11-18 16:47:53 UTC (rev 8311)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java	2009-11-18 16:58:55 UTC (rev 8312)
@@ -50,121 +50,105 @@
 
    private SimpleString qName = new SimpleString("MessageGroupingTestQueue");
 
-   public void testBasicGrouping() throws Exception
+   public void testBasicGroupingWithDirectDelivery() throws Exception
    {
-      ClientProducer clientProducer = clientSession.createProducer(qName);
-      ClientConsumer consumer = clientSession.createConsumer(qName);
-      ClientConsumer consumer2 = clientSession.createConsumer(qName);
-      clientSession.start();
-      SimpleString groupId = new SimpleString("grp1");
-      int numMessages = 100;
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = createTextMessage("m" + i, clientSession);
-         message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
-         clientProducer.send(message);
-      }
-      CountDownLatch latch = new CountDownLatch(numMessages);
-      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
-      consumer.setMessageHandler(dummyMessageHandler);
-      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
-      consumer2.setMessageHandler(dummyMessageHandler2);
-      assertTrue(latch.await(10, TimeUnit.SECONDS));
-      assertEquals(100, dummyMessageHandler.list.size());
-      assertEquals(0, dummyMessageHandler2.list.size());
-      consumer.close();
-      consumer2.close();
+      doTestBasicGrouping(true);
    }
 
-   public void testMultipleGrouping() throws Exception
+   public void testBasicGroupingWithoutDirectDelivery() throws Exception
    {
-      ClientProducer clientProducer = clientSession.createProducer(qName);
-      ClientConsumer consumer = clientSession.createConsumer(qName);
-      ClientConsumer consumer2 = clientSession.createConsumer(qName);
-      clientSession.start();
-      SimpleString groupId = new SimpleString("grp1");
-      SimpleString groupId2 = new SimpleString("grp2");
-      int numMessages = 100;
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = createTextMessage("m" + i, clientSession);
-         if (i % 2 == 0 || i == 0)
-         {
-            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
-         }
-         else
-         {
-            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
-         }
-         clientProducer.send(message);
-      }
-      CountDownLatch latch = new CountDownLatch(numMessages);
-      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
-      consumer.setMessageHandler(dummyMessageHandler);
-      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
-      consumer2.setMessageHandler(dummyMessageHandler2);
-      assertTrue(latch.await(10, TimeUnit.SECONDS));
-      assertEquals(50, dummyMessageHandler.list.size());
-      int i = 0;
-      for (ClientMessage message : dummyMessageHandler.list)
-      {
-         assertEquals(message.getBody().readString(), "m" + i);
-         i += 2;
-      }
-      assertEquals(50, dummyMessageHandler2.list.size());
-      i = 1;
-      for (ClientMessage message : dummyMessageHandler2.list)
-      {
-         assertEquals(message.getBody().readString(), "m" + i);
-         i += 2;
-      }
-      consumer.close();
-      consumer2.close();
+      doTestBasicGrouping(false);
    }
+   
+   public void testMultipleGroupingWithDirectDelivery() throws Exception
+   {
+      doTestMultipleGrouping(true);
+   }
+   
+   public void testMultipleGroupingWithoutDirectDelivery() throws Exception
+   {
+      doTestMultipleGrouping(false);
+   }
+   
+   public void testMultipleGroupingSingleConsumerWithDirectDelivery() throws Exception
+   {
+      doTestMultipleGroupingSingleConsumer(true);
+   }
+   
+   public void testMultipleGroupingSingleConsumerWithoutDirectDelivery() throws Exception
+   {
+      doTestMultipleGroupingSingleConsumer(false);
+   }
 
-   public void testMultipleGroupingStartConsumersAfterMessagesSent() throws Exception
+   public void testMultipleGroupingTXCommitWithDirectDelivery() throws Exception
    {
+      doTestMultipleGroupingTXCommit(true);
+   }
+   
+   public void testMultipleGroupingTXCommitWithoutDirectDelivery() throws Exception
+   {
+      doTestMultipleGroupingTXCommit(false);
+   }
+   
+   public void testMultipleGroupingTXRollbackWithDirectDelivery() throws Exception
+   {
+      doTestMultipleGroupingTXRollback(true);
+   }  
+
+   public void testMultipleGroupingTXRollbackWithoutDirectDelivery() throws Exception
+   {
+      doTestMultipleGroupingTXRollback(false);
+   }  
+
+   public void testMultipleGroupingXACommitWithDirectDelivery() throws Exception
+   {
+      dotestMultipleGroupingXACommit(true);
+   }
+   
+   public void testMultipleGroupingXACommitWithoutDirectDelivery() throws Exception
+   {
+      dotestMultipleGroupingXACommit(false);
+   }
+   
+   public void testMultipleGroupingXARollbackWithDirectDelivery() throws Exception
+   {
+      doTestMultipleGroupingXARollback(true);
+   }
+
+   public void testMultipleGroupingXARollbackWithoutDirectDelivery() throws Exception
+   {
+      doTestMultipleGroupingXARollback(false);
+   }
+
+   private void doTestBasicGrouping(boolean directDelivery) throws Exception
+   {
       ClientProducer clientProducer = clientSession.createProducer(qName);
       ClientConsumer consumer = clientSession.createConsumer(qName);
       ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      if (directDelivery)
+      {
+         clientSession.start();
+      }
       SimpleString groupId = new SimpleString("grp1");
-      SimpleString groupId2 = new SimpleString("grp2");
       int numMessages = 100;
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, clientSession);
-         if (i % 2 == 0 || i == 0)
-         {
-            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
-         }
-         else
-         {
-            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
-         }
+         message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
          clientProducer.send(message);
       }
-
-      clientSession.start();
+      if (!directDelivery)
+      {
+         clientSession.start();
+      }
       CountDownLatch latch = new CountDownLatch(numMessages);
       DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
       consumer.setMessageHandler(dummyMessageHandler);
       DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
       consumer2.setMessageHandler(dummyMessageHandler2);
       assertTrue(latch.await(10, TimeUnit.SECONDS));
-      assertEquals(50, dummyMessageHandler.list.size());
-      int i = 0;
-      for (ClientMessage message : dummyMessageHandler.list)
-      {
-         assertEquals(message.getBody().readString(), "m" + i);
-         i += 2;
-      }
-      assertEquals(50, dummyMessageHandler2.list.size());
-      i = 1;
-      for (ClientMessage message : dummyMessageHandler2.list)
-      {
-         assertEquals(message.getBody().readString(), "m" + i);
-         i += 2;
-      }
+      assertEquals(100, dummyMessageHandler.list.size());
+      assertEquals(0, dummyMessageHandler2.list.size());
       consumer.close();
       consumer2.close();
    }
@@ -229,11 +213,14 @@
       consumer.close();
    }
 
-   public void testMultipleGroupingSingleConsumer() throws Exception
+   private void doTestMultipleGroupingSingleConsumer(boolean directDelivery) throws Exception
    {
       ClientProducer clientProducer = clientSession.createProducer(qName);
       ClientConsumer consumer = clientSession.createConsumer(qName);
-      clientSession.start();
+      if (directDelivery)
+      {
+         clientSession.start();
+      }
       SimpleString groupId = new SimpleString("grp1");
       SimpleString groupId2 = new SimpleString("grp2");
       int numMessages = 100;
@@ -250,6 +237,10 @@
          }
          clientProducer.send(message);
       }
+      if (!directDelivery)
+      {
+         clientSession.start();
+      }
       CountDownLatch latch = new CountDownLatch(numMessages);
       DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
       consumer.setMessageHandler(dummyMessageHandler);
@@ -264,12 +255,15 @@
       consumer.close();
    }
 
-   public void testMultipleGroupingTXCommit() throws Exception
+   private void doTestMultipleGroupingTXCommit(boolean directDelivery) throws Exception
    {
       ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
       ClientSession clientSession = sessionFactory.createSession(false, false, false);
       ClientProducer clientProducer = this.clientSession.createProducer(qName);
-      clientSession.start();
+      if (directDelivery)
+      {
+         clientSession.start();
+      }
       ClientConsumer consumer = clientSession.createConsumer(qName);
       ClientConsumer consumer2 = clientSession.createConsumer(qName);
 
@@ -289,6 +283,10 @@
          }
          clientProducer.send(message);
       }
+      if (!directDelivery)
+      {
+         clientSession.start();
+      }
       CountDownLatch latch = new CountDownLatch(numMessages);
       DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
       consumer.setMessageHandler(dummyMessageHandler);
@@ -317,7 +315,7 @@
       clientSession.close();
    }
 
-   public void testMultipleGroupingTXRollback() throws Exception
+   private void doTestMultipleGroupingTXRollback(boolean directDelivery) throws Exception
    {
       ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
       sessionFactory.setBlockOnAcknowledge(true);
@@ -325,7 +323,10 @@
       ClientProducer clientProducer = this.clientSession.createProducer(qName);
       ClientConsumer consumer = clientSession.createConsumer(qName);
       ClientConsumer consumer2 = clientSession.createConsumer(qName);
-      clientSession.start();
+      if (directDelivery)
+      {
+         clientSession.start();
+      }
       SimpleString groupId = new SimpleString("grp1");
       SimpleString groupId2 = new SimpleString("grp2");
       int numMessages = 100;
@@ -342,6 +343,10 @@
          }
          clientProducer.send(message);
       }
+      if (!directDelivery)
+      {
+         clientSession.start();
+      }
       CountDownLatch latch = new CountDownLatch(numMessages);
       DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
       consumer.setMessageHandler(dummyMessageHandler);
@@ -386,14 +391,17 @@
       clientSession.close();
    }
 
-   public void testMultipleGroupingXACommit() throws Exception
+   private void dotestMultipleGroupingXACommit(boolean directDelivery) throws Exception
    {
       ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
       ClientSession clientSession = sessionFactory.createSession(true, false, false);
       ClientProducer clientProducer = this.clientSession.createProducer(qName);
       ClientConsumer consumer = clientSession.createConsumer(qName);
       ClientConsumer consumer2 = clientSession.createConsumer(qName);
-      clientSession.start();
+      if (directDelivery)
+      {
+         clientSession.start();
+      }
       Xid xid = new XidImpl("bq".getBytes(), 4, "gtid".getBytes());
       clientSession.start(xid, XAResource.TMNOFLAGS);
 
@@ -413,6 +421,10 @@
          }
          clientProducer.send(message);
       }
+      if (!directDelivery)
+      {
+         clientSession.start();
+      }
       CountDownLatch latch = new CountDownLatch(numMessages);
       DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
       consumer.setMessageHandler(dummyMessageHandler);
@@ -443,13 +455,16 @@
       clientSession.close();
    }
 
-   public void testMultipleGroupingXARollback() throws Exception
+   private void doTestMultipleGroupingXARollback(boolean directDelivery) throws Exception
    {
       ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
       sessionFactory.setBlockOnAcknowledge(true);
       ClientSession clientSession = sessionFactory.createSession(true, false, false);
       ClientProducer clientProducer = this.clientSession.createProducer(qName);
-      clientSession.start();
+      if (directDelivery)
+      {
+         clientSession.start();
+      }
       ClientConsumer consumer = clientSession.createConsumer(qName);
       ClientConsumer consumer2 = clientSession.createConsumer(qName);
       Xid xid = new XidImpl("bq".getBytes(), 4, "gtid".getBytes());
@@ -471,6 +486,10 @@
          }
          clientProducer.send(message);
       }
+      if (!directDelivery)
+      {
+         clientSession.start();
+      }
       CountDownLatch latch = new CountDownLatch(numMessages);
       DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
       consumer.setMessageHandler(dummyMessageHandler);
@@ -519,6 +538,59 @@
       assertNull(consumer.receiveImmediate());
       clientSession.close();
    }
+   
+   private void doTestMultipleGrouping(boolean directDelivery) throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      if (directDelivery)
+      {
+         clientSession.start();
+      }
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if (i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      if (!directDelivery)
+      {
+         clientSession.start();
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(50, dummyMessageHandler.list.size());
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(50, dummyMessageHandler2.list.size());
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer.close();
+      consumer2.close();
+   }
 
    protected void tearDown() throws Exception
    {
@@ -567,31 +639,7 @@
       clientSession = sessionFactory.createSession(false, true, true);
       clientSession.createQueue(qName, qName, null, false);
    }
-   
-   // do not swallow exception in DeliverRunner.run() to show the IOOBE on the queue handlers
-   public void testSwallowedIndexOutOfBoundsException() throws Exception
-   {
-      ClientConsumer consumer = clientSession.createConsumer(qName, null, false);
-      ClientConsumer consumer2 = clientSession.createConsumer(qName, null, false);
 
-      ClientProducer producer = clientSession.createProducer(qName);
-      ClientMessage message = createTextMessage("m0" , clientSession);
-      message.putStringProperty(MessageImpl.HDR_GROUP_ID, new SimpleString("g1"));
-      producer.send(message);
-      
-      clientSession.start();
-      
-      ClientMessage msg = consumer.receive();
-      assertNotNull(msg);
-      msg.acknowledge();
-      assertNull(consumer.receive(500));
-      
-      consumer.close();
-      consumer2.close();
-      consumer = clientSession.createConsumer(qName, null, false);
-      assertNull(consumer.receive(500));
-   }
-
    private static class DummyMessageHandler implements MessageHandler
    {
       ArrayList<ClientMessage> list = new ArrayList<ClientMessage>();



More information about the hornetq-commits mailing list