[jboss-cvs] JBoss Messaging SVN: r6077 - trunk/tests/src/org/jboss/messaging/tests/integration/client.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 13 10:21:50 EDT 2009


Author: ataylor
Date: 2009-03-13 10:21:50 -0400 (Fri, 13 Mar 2009)
New Revision: 6077

Modified:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
Log:
some end to end tests - more to follow

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java	2009-03-13 13:19:02 UTC (rev 6076)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java	2009-03-13 14:21:50 UTC (rev 6077)
@@ -36,6 +36,7 @@
 import org.jboss.messaging.utils.SimpleString;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -43,14 +44,21 @@
 public class ClientEndToEndTest extends ServiceTestBase
 {
    public final SimpleString addressA = new SimpleString("addressA");
+
    public final SimpleString queueA = new SimpleString("queueA");
-   private final SimpleString groupTestQ = new SimpleString("testGroupQueue");;
 
+   public final SimpleString queueB = new SimpleString("queueB");
+
+   public final SimpleString queueC = new SimpleString("queueC");
+
+   private final SimpleString groupTestQ = new SimpleString("testGroupQueue");
+
    /*ackbatchSize tests*/
 
    /*
    * tests that wed don't acknowledge until the correct ackBatchSize is reached
    * */
+
    public void testAckBatchSize() throws Exception
    {
       MessagingService messagingService = createService(false);
@@ -70,14 +78,14 @@
          ClientSession session = cf.createSession(false, true, true);
          session.createQueue(addressA, queueA, false);
          ClientProducer cp = sendSession.createProducer(addressA);
-         for(int i = 0 ; i < numMessages; i ++)
+         for (int i = 0; i < numMessages; i++)
          {
             cp.send(sendSession.createClientMessage(false));
          }
 
          ClientConsumer consumer = session.createConsumer(queueA);
          session.start();
-         for(int i = 0; i < numMessages - 1; i++)
+         for (int i = 0; i < numMessages - 1; i++)
          {
             ClientMessage m = consumer.receive(5000);
             m.acknowledge();
@@ -93,7 +101,7 @@
       }
       finally
       {
-         if(messagingService.isStarted())
+         if (messagingService.isStarted())
          {
             messagingService.stop();
          }
@@ -120,7 +128,7 @@
          ClientSession session = cf.createSession(false, true, true);
          session.createQueue(addressA, queueA, false);
          ClientProducer cp = sendSession.createProducer(addressA);
-         for(int i = 0 ; i < numMessages; i ++)
+         for (int i = 0; i < numMessages; i++)
          {
             cp.send(sendSession.createClientMessage(false));
          }
@@ -129,12 +137,12 @@
          session.start();
          Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
          ClientMessage[] messages = new ClientMessage[numMessages];
-         for(int i = 0; i < numMessages; i++)
+         for (int i = 0; i < numMessages; i++)
          {
             messages[i] = consumer.receive(5000);
             assertNotNull(messages[i]);
          }
-         for(int i = 0; i < numMessages; i++)
+         for (int i = 0; i < numMessages; i++)
          {
             messages[i].acknowledge();
             assertEquals(numMessages - i - 1, q.getDeliveringCount());
@@ -144,7 +152,7 @@
       }
       finally
       {
-         if(messagingService.isStarted())
+         if (messagingService.isStarted())
          {
             messagingService.stop();
          }
@@ -156,6 +164,7 @@
    /*
    * tests when the autogroupid is set only 1 consumer (out of 2) gets all the messages from a single producer
    * */
+
    public void testGroupIdAutomaticallySet() throws Exception
    {
       MessagingService messagingService = createService(false);
@@ -163,7 +172,7 @@
       {
          AddressSettings qs = new AddressSettings();
          qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-         messagingService.getServer().getAddressSettingsRepository().addMatch("testGroupQueue", qs);
+         messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
          messagingService.start();
 
          ClientSessionFactory sf = createInVMFactory();
@@ -201,7 +210,7 @@
       }
       finally
       {
-         if(messagingService.isStarted())
+         if (messagingService.isStarted())
          {
             messagingService.stop();
          }
@@ -266,7 +275,7 @@
       }
       finally
       {
-         if(messagingService.isStarted())
+         if (messagingService.isStarted())
          {
             messagingService.stop();
          }
@@ -285,7 +294,7 @@
          AddressSettings qs = new AddressSettings();
          qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
 
-         messagingService.getServer().getAddressSettingsRepository().addMatch("testGroupQueue", qs);
+         messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
          messagingService.start();
 
          ClientSessionFactory sf = createInVMFactory();
@@ -323,7 +332,7 @@
       }
       finally
       {
-         if(messagingService.isStarted())
+         if (messagingService.isStarted())
          {
             messagingService.stop();
          }
@@ -331,6 +340,817 @@
 
    }
 
+   /*
+   * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
+   * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
+   * to its window size
+   * */
+   public void testSendWindowSize() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      ClientSessionFactory cf = createInVMFactory();
+      try
+      {
+         messagingService.start();
+         cf.setBlockOnNonPersistentSend(true);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession receiveSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
+         ClientMessage cm = sendSession.createClientMessage(false);
+         cm.setDestination(addressA);
+         int encodeSize = cm.getEncodeSize();
+         int numMessage = 100;
+         cf.setConsumerWindowSize(numMessage * encodeSize);
+         ClientSession session = cf.createSession(false, true, true);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         session.start();
+         receiveSession.start();
+         for (int i = 0; i < numMessage * 4; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+
+         for (int i = 0; i < numMessage * 2; i++)
+         {
+            ClientMessage m = receivingConsumer.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         receiveSession.close();
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessage, q.getDeliveringCount());
+
+         session.close();
+         sendSession.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToMultipleQueues() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         sendSession.createQueue(addressA, queueB, false);
+         sendSession.createQueue(addressA, queueC, false);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(sendSession.createClientMessage(false));
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         ClientConsumer c2 = session.createConsumer(queueB);
+         ClientConsumer c3 = session.createConsumer(queueC);
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+            c2.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+            c3.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         assertNull(c2.receiveImmediate());
+         assertNull(c3.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToSingleNonDurableQueue() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(sendSession.createClientMessage(false));
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToSingleDurableQueue() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, true);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(sendSession.createClientMessage(false));
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToSingleQueueWithFilter() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, new SimpleString("foo = 'bar'"), false, false);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage clientMessage = sendSession.createClientMessage(false);
+            clientMessage.putStringProperty(new SimpleString("foo"), new SimpleString("bar"));
+            p.send(clientMessage);
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToMultipleQueueWithFilters() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, new SimpleString("foo = 'bar'"), false, false);
+         sendSession.createQueue(addressA, queueB, new SimpleString("x = 1"), false, false);
+         sendSession.createQueue(addressA, queueC, new SimpleString("b = false"), false, false);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage clientMessage = sendSession.createClientMessage(false);
+            if (i % 3 == 0)
+            {
+               clientMessage.putStringProperty(new SimpleString("foo"), new SimpleString("bar"));
+            }
+            else if (i % 3 == 1)
+            {
+               clientMessage.putIntProperty(new SimpleString("x"), 1);
+            }
+            else
+            {
+               clientMessage.putBooleanProperty(new SimpleString("b"), false);
+            }
+            p.send(clientMessage);
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         ClientConsumer c2 = session.createConsumer(queueB);
+         ClientConsumer c3 = session.createConsumer(queueC);
+         session.start();
+         for (int i = 0; i < numMessages / 3; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+            m = c2.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+            m = c3.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         assertNull(c2.receiveImmediate());
+         assertNull(c3.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToSingleTemporaryQueue() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false, true);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(sendSession.createClientMessage(false));
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testSendWithCommit() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession session = cf.createSession(false, false, false);
+         session.createQueue(addressA, queueA, false);
+         ClientProducer cp = session.createProducer(addressA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(session.createClientMessage(false));
+         }
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(q.getMessageCount(), 0);
+         session.commit();
+         assertEquals(q.getMessageCount(), numMessages);
+         //now send some more
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(session.createClientMessage(false));
+         }
+         assertEquals(q.getMessageCount(), numMessages);
+         session.commit();
+         assertEquals(q.getMessageCount(), numMessages * 2);
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testSendWithRollback() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession session = cf.createSession(false, false, false);
+         session.createQueue(addressA, queueA, false);
+         ClientProducer cp = session.createProducer(addressA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(session.createClientMessage(false));
+         }
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(q.getMessageCount(), 0);
+         session.rollback();
+         assertEquals(q.getMessageCount(), 0);
+         //now send some more
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(session.createClientMessage(false));
+         }
+         assertEquals(q.getMessageCount(), 0);
+         session.commit();
+         assertEquals(q.getMessageCount(), numMessages);
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testReceiveWithCommit() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession session = cf.createSession(false, false, false);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage cm = cc.receive(5000);
+            assertNotNull(cm);
+            cm.acknowledge();
+         }
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         session.commit();
+         assertEquals(0, q.getDeliveringCount());
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testReceiveWithRollback() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession session = cf.createSession(false, false, false);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage cm = cc.receive(5000);
+            assertNotNull(cm);
+            cm.acknowledge();
+         }
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         session.rollback();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage cm = cc.receive(5000);
+            assertNotNull(cm);
+            cm.acknowledge();
+         }
+         assertEquals(numMessages, q.getDeliveringCount());
+         session.close();
+         sendSession.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testReceiveAckLastMessageOnly() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setAckBatchSize(0);
+         cf.setBlockOnAcknowledge(true);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession session = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         session.start();
+         ClientMessage cm = null;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cm = cc.receive(5000);
+            assertNotNull(cm);
+         }
+         cm.acknowledge();
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+
+         assertEquals(0, q.getDeliveringCount());
+         session.close();
+         sendSession.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testAsyncConsumerNoAck() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession session = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         final CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new MessageHandler()
+         {
+            public void onMessage(ClientMessage message)
+            {
+               latch.countDown();
+            }
+         });
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testAsyncConsumerAck() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setBlockOnAcknowledge(true);
+         cf.setAckBatchSize(0);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         final ClientSession session = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         final CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new MessageHandler()
+         {
+            public void onMessage(ClientMessage message)
+            {
+               try
+               {
+                  message.acknowledge();
+               }
+               catch (MessagingException e)
+               {
+                  try
+                  {
+                     session.close();
+                  }
+                  catch (MessagingException e1)
+                  {
+                     e1.printStackTrace();
+                  }
+               }
+               latch.countDown();
+            }
+         });
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(0, q.getDeliveringCount());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testAsyncConsumerAckLastMessageOnly() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setBlockOnAcknowledge(true);
+         cf.setAckBatchSize(0);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         final ClientSession session = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         final CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new MessageHandler()
+         {
+            public void onMessage(ClientMessage message)
+            {
+               if (latch.getCount() == 1)
+               {
+                  try
+                  {
+                     message.acknowledge();
+                  }
+                  catch (MessagingException e)
+                  {
+                     try
+                     {
+                        session.close();
+                     }
+                     catch (MessagingException e1)
+                     {
+                        e1.printStackTrace();
+                     }
+                  }
+               }
+               latch.countDown();
+            }
+         });
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(0, q.getDeliveringCount());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testAsyncConsumerCommit() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setBlockOnAcknowledge(true);
+         cf.setAckBatchSize(0);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         final ClientSession session = cf.createSession(false, true, false);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         final CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new MessageHandler()
+         {
+            public void onMessage(ClientMessage message)
+            {
+               try
+               {
+                  message.acknowledge();
+               }
+               catch (MessagingException e)
+               {
+                  try
+                  {
+                     session.close();
+                  }
+                  catch (MessagingException e1)
+                  {
+                     e1.printStackTrace();
+                  }
+               }
+               latch.countDown();
+            }
+         });
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         assertEquals(numMessages, q.getMessageCount());
+         session.commit();
+         assertEquals(0, q.getDeliveringCount());
+         assertEquals(0, q.getMessageCount());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testAsyncConsumerRollback() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setBlockOnAcknowledge(true);
+         cf.setAckBatchSize(0);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         final ClientSession session = cf.createSession(false, true, false);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new ackHandler(session, latch));
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         assertEquals(numMessages, q.getMessageCount());
+         session.rollback();
+         assertEquals(0, q.getDeliveringCount());
+         assertEquals(numMessages, q.getMessageCount());
+         latch = new CountDownLatch(numMessages);
+         cc.setMessageHandler(new ackHandler(session, latch));
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+  /* public void testTempQueueGetsDeleted() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession session = cf.createSession(false, true, true);
+         session.createQueue(addressA, queueA, false, true);
+         session.close();
+         assertNull(messagingService.getServer().getPostOffice().getBinding(queueA));
+      }
+      finally
+      {
+         if(messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }*/
+
    private static class MyMessageHandler implements MessageHandler
    {
       volatile int messagesReceived = 0;
@@ -356,4 +1176,36 @@
          latch.countDown();
       }
    }
+   private static class ackHandler implements MessageHandler
+   {
+      private final ClientSession session;
+
+      private final CountDownLatch latch;
+
+      public ackHandler(ClientSession session, CountDownLatch latch)
+      {
+         this.session = session;
+         this.latch = latch;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         try
+         {
+            message.acknowledge();
+         }
+         catch (MessagingException e)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (MessagingException e1)
+            {
+               e1.printStackTrace();
+            }
+         }
+         latch.countDown();
+      }
+   }
 }




More information about the jboss-cvs-commits mailing list