[jboss-cvs] JBoss Messaging SVN: r6157 - trunk/tests/src/org/jboss/messaging/tests/integration/client.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 25 06:14:36 EDT 2009


Author: ataylor
Date: 2009-03-25 06:14:36 -0400 (Wed, 25 Mar 2009)
New Revision: 6157

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAckBatchSizeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAcknowledgeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAutogroupIdTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientCommitRollbackTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerRoundRobinTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientDeliveryOrderTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientFileMessageTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientReceiveTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientRoutingTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSendTest.java
Removed:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
Log:
repackaged tests

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAckBatchSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAckBatchSizeTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAckBatchSizeTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,152 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientAckBatchSizeTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   public final SimpleString queueB = new SimpleString("queueB");
+
+   public final SimpleString queueC = new SimpleString("queueC");
+
+   /*ackbatchSize tests*/
+
+   /*
+   * tests that wed don't acknowledge until the correct ackBatchSize is reached
+   * */
+
+   public void testAckBatchSize() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientMessage message = sendSession.createClientMessage(false);
+         // we need to set the destination so we can calculate the encodesize correctly
+         message.setDestination(addressA);
+         int encodeSize = message.getEncodeSize();
+         int numMessages = 100;
+         cf.setAckBatchSize(numMessages * encodeSize);
+         cf.setBlockOnAcknowledge(true);
+         ClientSession session = cf.createSession(false, true, true);
+         session.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+
+         ClientConsumer consumer = session.createConsumer(queueA);
+         session.start();
+         for (int i = 0; i < numMessages - 1; i++)
+         {
+            ClientMessage m = consumer.receive(5000);
+            m.acknowledge();
+         }
+
+         ClientMessage m = consumer.receive(5000);
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         m.acknowledge();
+         assertEquals(0, q.getDeliveringCount());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   /*
+   * tests that when the ackBatchSize is 0 we ack every message directly
+   * */
+   public void testAckBatchSizeZero() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientMessage message = sendSession.createClientMessage(false);
+         message.setDestination(addressA);
+         int numMessages = 100;
+         cf.setAckBatchSize(0);
+         cf.setBlockOnAcknowledge(true);
+         ClientSession session = cf.createSession(false, true, true);
+         session.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+
+         ClientConsumer consumer = session.createConsumer(queueA);
+         session.start();
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         ClientMessage[] messages = new ClientMessage[numMessages];
+         for (int i = 0; i < numMessages; i++)
+         {
+            messages[i] = consumer.receive(5000);
+            assertNotNull(messages[i]);
+         }
+         for (int i = 0; i < numMessages; i++)
+         {
+            messages[i].acknowledge();
+            assertEquals(numMessages - i - 1, q.getDeliveringCount());
+         }
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAcknowledgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAcknowledgeTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAcknowledgeTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,255 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientAcknowledgeTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   public final SimpleString queueB = new SimpleString("queueB");
+
+   public final SimpleString queueC = new SimpleString("queueC");
+
+
+   public void testReceiveAckLastMessageOnly() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setAckBatchSize(0);
+         cf.setBlockOnAcknowledge(true);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession session = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         session.start();
+         ClientMessage cm = null;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cm = cc.receive(5000);
+            assertNotNull(cm);
+         }
+         cm.acknowledge();
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+
+         assertEquals(0, q.getDeliveringCount());
+         session.close();
+         sendSession.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testAsyncConsumerNoAck() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession session = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         final CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new MessageHandler()
+         {
+            public void onMessage(ClientMessage message)
+            {
+               latch.countDown();
+            }
+         });
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testAsyncConsumerAck() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setBlockOnAcknowledge(true);
+         cf.setAckBatchSize(0);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         final ClientSession session = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         final CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new MessageHandler()
+         {
+            public void onMessage(ClientMessage message)
+            {
+               try
+               {
+                  message.acknowledge();
+               }
+               catch (MessagingException e)
+               {
+                  try
+                  {
+                     session.close();
+                  }
+                  catch (MessagingException e1)
+                  {
+                     e1.printStackTrace();
+                  }
+               }
+               latch.countDown();
+            }
+         });
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(0, q.getDeliveringCount());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testAsyncConsumerAckLastMessageOnly() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setBlockOnAcknowledge(true);
+         cf.setAckBatchSize(0);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         final ClientSession session = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         final CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new MessageHandler()
+         {
+            public void onMessage(ClientMessage message)
+            {
+               if (latch.getCount() == 1)
+               {
+                  try
+                  {
+                     message.acknowledge();
+                  }
+                  catch (MessagingException e)
+                  {
+                     try
+                     {
+                        session.close();
+                     }
+                     catch (MessagingException e1)
+                     {
+                        e1.printStackTrace();
+                     }
+                  }
+               }
+               latch.countDown();
+            }
+         });
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(0, q.getDeliveringCount());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAutogroupIdTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAutogroupIdTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientAutogroupIdTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,260 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientAutogroupIdTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   public final SimpleString queueB = new SimpleString("queueB");
+
+   public final SimpleString queueC = new SimpleString("queueC");
+
+   private final SimpleString groupTestQ = new SimpleString("testGroupQueue");
+
+   /* auto group id tests*/
+
+   /*
+  * tests when the autogroupid is set only 1 consumer (out of 2) gets all the messages from a single producer
+  * */
+
+   public void testGroupIdAutomaticallySet() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         AddressSettings qs = new AddressSettings();
+         qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
+         messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
+         messagingService.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setAutoGroup(true);
+         ClientSession session = sf.createSession(false, true, true);
+
+         session.createQueue(groupTestQ, groupTestQ, null, false, false);
+
+         ClientProducer producer = session.createProducer(groupTestQ);
+
+         final CountDownLatch latch = new CountDownLatch(100);
+
+         MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+         MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+
+         ClientConsumer consumer = session.createConsumer(groupTestQ);
+         consumer.setMessageHandler(myMessageHandler);
+         ClientConsumer consumer2 = session.createConsumer(groupTestQ);
+         consumer2.setMessageHandler(myMessageHandler2);
+
+         session.start();
+
+         final int numMessages = 100;
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            producer.send(session.createClientMessage(false));
+         }
+         latch.await();
+
+         session.close();
+
+         assertEquals(myMessageHandler.messagesReceived, 100);
+         assertEquals(myMessageHandler2.messagesReceived, 0);
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+
+   }
+
+   /*
+  * tests when the autogroupid is set only 2 consumers (out of 3) gets all the messages from 2 producers
+  * */
+   public void testGroupIdAutomaticallySetMultipleProducers() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         AddressSettings qs = new AddressSettings();
+         qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
+         messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
+         messagingService.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+         sf.setAutoGroup(true);
+         ClientSession session = sf.createSession(false, true, true);
+
+         session.createQueue(groupTestQ, groupTestQ, null, false, false);
+
+         ClientProducer producer = session.createProducer(groupTestQ);
+         ClientProducer producer2 = session.createProducer(groupTestQ);
+
+         final CountDownLatch latch = new CountDownLatch(200);
+
+         MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+         MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+         MyMessageHandler myMessageHandler3 = new MyMessageHandler(latch);
+
+         ClientConsumer consumer = session.createConsumer(groupTestQ);
+         consumer.setMessageHandler(myMessageHandler);
+         ClientConsumer consumer2 = session.createConsumer(groupTestQ);
+         consumer2.setMessageHandler(myMessageHandler2);
+         ClientConsumer consumer3 = session.createConsumer(groupTestQ);
+         consumer3.setMessageHandler(myMessageHandler3);
+
+         session.start();
+
+         final int numMessages = 100;
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            producer.send(session.createClientMessage(false));
+         }
+         for (int i = 0; i < numMessages; i++)
+         {
+            producer2.send(session.createClientMessage(false));
+         }
+         latch.await();
+
+         session.close();
+
+         assertEquals(myMessageHandler.messagesReceived, 100);
+         assertEquals(myMessageHandler2.messagesReceived, 100);
+         assertEquals(myMessageHandler3.messagesReceived, 0);
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+
+   }
+
+   /*
+  * tests that even tho we have an grouping round robin distributor we don't pin the consumer as autogroup is false
+  * */
+   public void testGroupIdAutomaticallyNotSet() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         AddressSettings qs = new AddressSettings();
+         qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
+
+         messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
+         messagingService.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         ClientSession session = sf.createSession(false, true, true);
+
+         session.createQueue(groupTestQ, groupTestQ, null, false, false);
+
+         ClientProducer producer = session.createProducer(groupTestQ);
+
+         final CountDownLatch latch = new CountDownLatch(100);
+
+         MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
+         MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
+
+         ClientConsumer consumer = session.createConsumer(groupTestQ);
+         consumer.setMessageHandler(myMessageHandler);
+         ClientConsumer consumer2 = session.createConsumer(groupTestQ);
+         consumer2.setMessageHandler(myMessageHandler2);
+
+         session.start();
+
+         final int numMessages = 100;
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            producer.send(session.createClientMessage(false));
+         }
+         latch.await();
+
+         session.close();
+
+         assertEquals(myMessageHandler.messagesReceived, 50);
+         assertEquals(myMessageHandler2.messagesReceived, 50);
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+
+   }
+
+   private static class MyMessageHandler implements MessageHandler
+   {
+      volatile int messagesReceived = 0;
+
+      private final CountDownLatch latch;
+
+      public MyMessageHandler(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         messagesReceived++;
+         try
+         {
+            message.acknowledge();
+         }
+         catch (MessagingException e)
+         {
+            e.printStackTrace();
+         }
+         latch.countDown();
+      }
+   }
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientCommitRollbackTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientCommitRollbackTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientCommitRollbackTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,278 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientCommitRollbackTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   public final SimpleString queueB = new SimpleString("queueB");
+
+   public final SimpleString queueC = new SimpleString("queueC");
+
+
+   public void testReceiveWithCommit() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession session = cf.createSession(false, false, false);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage cm = cc.receive(5000);
+            assertNotNull(cm);
+            cm.acknowledge();
+         }
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         session.commit();
+         assertEquals(0, q.getDeliveringCount());
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testReceiveWithRollback() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession session = cf.createSession(false, false, false);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage cm = cc.receive(5000);
+            assertNotNull(cm);
+            cm.acknowledge();
+         }
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         session.rollback();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage cm = cc.receive(5000);
+            assertNotNull(cm);
+            cm.acknowledge();
+         }
+         assertEquals(numMessages, q.getDeliveringCount());
+         session.close();
+         sendSession.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testAsyncConsumerCommit() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setBlockOnAcknowledge(true);
+         cf.setAckBatchSize(0);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         final ClientSession session = cf.createSession(false, true, false);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         final CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new MessageHandler()
+         {
+            public void onMessage(ClientMessage message)
+            {
+               try
+               {
+                  message.acknowledge();
+               }
+               catch (MessagingException e)
+               {
+                  try
+                  {
+                     session.close();
+                  }
+                  catch (MessagingException e1)
+                  {
+                     e1.printStackTrace();
+                  }
+               }
+               latch.countDown();
+            }
+         });
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         assertEquals(numMessages, q.getMessageCount());
+         session.commit();
+         assertEquals(0, q.getDeliveringCount());
+         assertEquals(0, q.getMessageCount());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testAsyncConsumerRollback() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setBlockOnAcknowledge(true);
+         cf.setAckBatchSize(0);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         final ClientSession session = cf.createSession(false, true, false);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+         CountDownLatch latch = new CountDownLatch(numMessages);
+         session.start();
+         cc.setMessageHandler(new ackHandler(session, latch));
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessages, q.getDeliveringCount());
+         assertEquals(numMessages, q.getMessageCount());
+         session.stop();
+         session.rollback();
+         assertEquals(0, q.getDeliveringCount());
+         assertEquals(numMessages, q.getMessageCount());
+         latch = new CountDownLatch(numMessages);
+         cc.setMessageHandler(new ackHandler(session, latch));
+         session.start();
+         assertTrue(latch.await(5, TimeUnit.SECONDS));
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   private static class ackHandler implements MessageHandler
+   {
+      private final ClientSession session;
+
+      private final CountDownLatch latch;
+
+      public ackHandler(ClientSession session, CountDownLatch latch)
+      {
+         this.session = session;
+         this.latch = latch;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         try
+         {
+            message.acknowledge();
+         }
+         catch (MessagingException e)
+         {
+            try
+            {
+               session.close();
+            }
+            catch (MessagingException e1)
+            {
+               e1.printStackTrace();
+            }
+         }
+         latch.countDown();
+      }
+   }
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerRoundRobinTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerRoundRobinTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerRoundRobinTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientConsumerRoundRobinTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   public void testConsumersRoundRobinCorrectly() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession session = cf.createSession(false, true, true);
+         session.createQueue(addressA, queueA, false);
+
+         ClientConsumer[] consumers = new ClientConsumer[5];
+         // start the session before we create the consumers, this is because start is non blocking and we have to
+         // gaurantee
+         // all consumers have been started before sending messages
+         session.start();
+         consumers[0] = session.createConsumer(queueA);
+         consumers[1] = session.createConsumer(queueA);
+         consumers[2] = session.createConsumer(queueA);
+         consumers[3] = session.createConsumer(queueA);
+         consumers[4] = session.createConsumer(queueA);
+
+         //ClientSession sendSession = cf.createSession(false, true, true);
+         ClientProducer cp = session.createProducer(addressA);
+         int numMessage = 100;
+         for (int i = 0; i < numMessage; i++)
+         {
+            ClientMessage cm = session.createClientMessage(false);
+            cm.getBody().writeInt(i);
+            cp.send(cm);
+         }
+         int currMessage = 0;
+         for (int i = 0; i < numMessage / 5; i++)
+         {
+            for (int j = 0; j < 5; j++)
+            {
+               ClientMessage cm = consumers[j].receive(5000);
+               assertNotNull(cm);
+               assertEquals(currMessage++, cm.getBody().readInt());
+            }
+         }
+         //sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerWindowSizeTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientConsumerWindowSizeTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   public final SimpleString queueB = new SimpleString("queueB");
+
+   public final SimpleString queueC = new SimpleString("queueC");
+
+   private final SimpleString groupTestQ = new SimpleString("testGroupQueue");
+
+
+
+   /*
+   * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
+   * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
+   * to its window size
+   * */
+   public void testSendWindowSize() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      ClientSessionFactory cf = createInVMFactory();
+      try
+      {
+         messagingService.start();
+         cf.setBlockOnNonPersistentSend(true);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession receiveSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
+         ClientMessage cm = sendSession.createClientMessage(false);
+         cm.setDestination(addressA);
+         int encodeSize = cm.getEncodeSize();
+         int numMessage = 100;
+         cf.setConsumerWindowSize(numMessage * encodeSize);
+         ClientSession session = cf.createSession(false, true, true);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = session.createConsumer(queueA);
+         session.start();
+         receiveSession.start();
+         for (int i = 0; i < numMessage * 4; i++)
+         {
+            cp.send(sendSession.createClientMessage(false));
+         }
+
+         for (int i = 0; i < numMessage * 2; i++)
+         {
+            ClientMessage m = receivingConsumer.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         receiveSession.close();
+         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(numMessage, q.getDeliveringCount());
+
+         session.close();
+         sendSession.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientDeliveryOrderTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientDeliveryOrderTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientDeliveryOrderTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,224 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientDeliveryOrderTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+      public final SimpleString queueA = new SimpleString("queueA");
+
+      public final SimpleString queueB = new SimpleString("queueB");
+
+      public final SimpleString queueC = new SimpleString("queueC");
+
+
+
+      public void testSendDeliveryOrderOnCommit() throws Exception
+      {
+         MessagingService messagingService = createService(false);
+         try
+         {
+            messagingService.start();
+            ClientSessionFactory cf = createInVMFactory();
+            ClientSession sendSession = cf.createSession(false, false, true);
+            ClientProducer cp = sendSession.createProducer(addressA);
+            int numMessages = 1000;
+            sendSession.createQueue(addressA, queueA, false);
+            for (int i = 0; i < numMessages; i++)
+            {
+               ClientMessage cm = sendSession.createClientMessage(false);
+               cm.getBody().writeInt(i);
+               cp.send(cm);
+               if (i % 10 == 0)
+               {
+                  sendSession.commit();
+               }
+               sendSession.commit();
+            }
+            ClientConsumer c = sendSession.createConsumer(queueA);
+            sendSession.start();
+            for (int i = 0; i < numMessages; i++)
+            {
+               ClientMessage cm = c.receive(5000);
+               assertNotNull(cm);
+               assertEquals(i, cm.getBody().readInt());
+            }
+         }
+         finally
+         {
+            if (messagingService.isStarted())
+            {
+               messagingService.stop();
+            }
+         }
+      }
+
+      public void testReceiveDeliveryOrderOnRollback() throws Exception
+      {
+         MessagingService messagingService = createService(false);
+         try
+         {
+            messagingService.start();
+            ClientSessionFactory cf = createInVMFactory();
+            ClientSession sendSession = cf.createSession(false, true, false);
+            ClientProducer cp = sendSession.createProducer(addressA);
+            int numMessages = 1000;
+            sendSession.createQueue(addressA, queueA, false);
+            for (int i = 0; i < numMessages; i++)
+            {
+               ClientMessage cm = sendSession.createClientMessage(false);
+               cm.getBody().writeInt(i);
+               cp.send(cm);
+            }
+            ClientConsumer c = sendSession.createConsumer(queueA);
+            sendSession.start();
+            for (int i = 0; i < numMessages; i++)
+            {
+               ClientMessage cm = c.receive(5000);
+               assertNotNull(cm);
+               cm.acknowledge();
+               assertEquals(i, cm.getBody().readInt());
+            }
+            sendSession.rollback();
+            for (int i = 0; i < numMessages; i++)
+            {
+               ClientMessage cm = c.receive(5000);
+               assertNotNull(cm);
+               cm.acknowledge();
+               assertEquals(i, cm.getBody().readInt());
+            }
+            sendSession.close();
+         }
+         finally
+         {
+            if (messagingService.isStarted())
+            {
+               messagingService.stop();
+            }
+         }
+      }
+
+      public void testMultipleConsumersMessageOrder() throws Exception
+      {
+         MessagingService messagingService = createService(false);
+         try
+         {
+            messagingService.start();
+            ClientSessionFactory cf = createInVMFactory();
+            ClientSession sendSession = cf.createSession(false, true, true);
+            ClientSession recSession = cf.createSession(false, true, true);
+            sendSession.createQueue(addressA, queueA, false);
+            int numReceivers = 100;
+            AtomicInteger count = new AtomicInteger(0);
+            int numMessage = 10000;
+            ClientConsumer[] clientConsumers = new ClientConsumer[numReceivers];
+            Receiver[] receivers = new Receiver[numReceivers];
+            CountDownLatch latch = new CountDownLatch(numMessage);
+            for (int i = 0; i < numReceivers; i++)
+            {
+               clientConsumers[i] = recSession.createConsumer(queueA);
+               receivers[i] = new Receiver(latch);
+               clientConsumers[i].setMessageHandler(receivers[i]);
+            }
+            recSession.start();
+            ClientProducer clientProducer = sendSession.createProducer(addressA);
+            for (int i = 0; i < numMessage; i++)
+            {
+               ClientMessage cm = sendSession.createClientMessage(false);
+               cm.getBody().writeInt(count.getAndIncrement());
+               clientProducer.send(cm);
+            }
+            assertTrue(latch.await(10, TimeUnit.SECONDS));
+            for (Receiver receiver : receivers)
+            {
+               assertFalse("" + receiver.lastMessage, receiver.failed);
+            }
+            sendSession.close();
+            recSession.close();
+         }
+         finally
+         {
+            if (messagingService.isStarted())
+            {
+               messagingService.stop();
+            }
+         }
+      }
+
+
+
+
+
+      class Receiver implements MessageHandler
+      {
+         final CountDownLatch latch;
+
+         int lastMessage = -1;
+
+         boolean failed = false;
+
+         public Receiver(CountDownLatch latch)
+         {
+            this.latch = latch;
+         }
+
+         public void onMessage(ClientMessage message)
+         {
+            int i = message.getBody().readInt();
+            try
+            {
+               message.acknowledge();
+            }
+            catch (MessagingException e)
+            {
+               e.printStackTrace();
+            }
+            if (i <= lastMessage)
+            {
+               failed = true;
+            }
+            lastMessage = i;
+            latch.countDown();
+         }
+
+      }
+     
+}

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java	2009-03-25 10:03:03 UTC (rev 6156)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -1,1536 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.client;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientFileMessage;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.client.impl.ClientFileMessageImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.SimpleString;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class ClientEndToEndTest extends ServiceTestBase
-{
-   public final SimpleString addressA = new SimpleString("addressA");
-
-   public final SimpleString queueA = new SimpleString("queueA");
-
-   public final SimpleString queueB = new SimpleString("queueB");
-
-   public final SimpleString queueC = new SimpleString("queueC");
-
-   private final SimpleString groupTestQ = new SimpleString("testGroupQueue");
-
-   /*ackbatchSize tests*/
-
-   /*
-   * tests that wed don't acknowledge until the correct ackBatchSize is reached
-   * */
-
-   public void testAckBatchSize() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientMessage message = sendSession.createClientMessage(false);
-         // we need to set the destination so we can calculate the encodesize correctly
-         message.setDestination(addressA);
-         int encodeSize = message.getEncodeSize();
-         int numMessages = 100;
-         cf.setAckBatchSize(numMessages * encodeSize);
-         cf.setBlockOnAcknowledge(true);
-         ClientSession session = cf.createSession(false, true, true);
-         session.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-
-         ClientConsumer consumer = session.createConsumer(queueA);
-         session.start();
-         for (int i = 0; i < numMessages - 1; i++)
-         {
-            ClientMessage m = consumer.receive(5000);
-            m.acknowledge();
-         }
-
-         ClientMessage m = consumer.receive(5000);
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(numMessages, q.getDeliveringCount());
-         m.acknowledge();
-         assertEquals(0, q.getDeliveringCount());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   /*
-   * tests that when the ackBatchSize is 0 we ack every message directly
-   * */
-   public void testAckBatchSizeZero() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientMessage message = sendSession.createClientMessage(false);
-         message.setDestination(addressA);
-         int numMessages = 100;
-         cf.setAckBatchSize(0);
-         cf.setBlockOnAcknowledge(true);
-         ClientSession session = cf.createSession(false, true, true);
-         session.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-
-         ClientConsumer consumer = session.createConsumer(queueA);
-         session.start();
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         ClientMessage[] messages = new ClientMessage[numMessages];
-         for (int i = 0; i < numMessages; i++)
-         {
-            messages[i] = consumer.receive(5000);
-            assertNotNull(messages[i]);
-         }
-         for (int i = 0; i < numMessages; i++)
-         {
-            messages[i].acknowledge();
-            assertEquals(numMessages - i - 1, q.getDeliveringCount());
-         }
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   /* auto group id tests*/
-
-   /*
-   * tests when the autogroupid is set only 1 consumer (out of 2) gets all the messages from a single producer
-   * */
-
-   public void testGroupIdAutomaticallySet() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         AddressSettings qs = new AddressSettings();
-         qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-         messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
-         messagingService.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setAutoGroup(true);
-         ClientSession session = sf.createSession(false, true, true);
-
-         session.createQueue(groupTestQ, groupTestQ, null, false, false);
-
-         ClientProducer producer = session.createProducer(groupTestQ);
-
-         final CountDownLatch latch = new CountDownLatch(100);
-
-         MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
-         MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
-
-         ClientConsumer consumer = session.createConsumer(groupTestQ);
-         consumer.setMessageHandler(myMessageHandler);
-         ClientConsumer consumer2 = session.createConsumer(groupTestQ);
-         consumer2.setMessageHandler(myMessageHandler2);
-
-         session.start();
-
-         final int numMessages = 100;
-
-         for (int i = 0; i < numMessages; i++)
-         {
-            producer.send(session.createClientMessage(false));
-         }
-         latch.await();
-
-         session.close();
-
-         assertEquals(myMessageHandler.messagesReceived, 100);
-         assertEquals(myMessageHandler2.messagesReceived, 0);
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-
-   }
-
-   /*
-   * tests when the autogroupid is set only 2 consumers (out of 3) gets all the messages from 2 producers
-   * */
-   public void testGroupIdAutomaticallySetMultipleProducers() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         AddressSettings qs = new AddressSettings();
-         qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-         messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
-         messagingService.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-         sf.setAutoGroup(true);
-         ClientSession session = sf.createSession(false, true, true);
-
-         session.createQueue(groupTestQ, groupTestQ, null, false, false);
-
-         ClientProducer producer = session.createProducer(groupTestQ);
-         ClientProducer producer2 = session.createProducer(groupTestQ);
-
-         final CountDownLatch latch = new CountDownLatch(200);
-
-         MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
-         MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
-         MyMessageHandler myMessageHandler3 = new MyMessageHandler(latch);
-
-         ClientConsumer consumer = session.createConsumer(groupTestQ);
-         consumer.setMessageHandler(myMessageHandler);
-         ClientConsumer consumer2 = session.createConsumer(groupTestQ);
-         consumer2.setMessageHandler(myMessageHandler2);
-         ClientConsumer consumer3 = session.createConsumer(groupTestQ);
-         consumer3.setMessageHandler(myMessageHandler3);
-
-         session.start();
-
-         final int numMessages = 100;
-
-         for (int i = 0; i < numMessages; i++)
-         {
-            producer.send(session.createClientMessage(false));
-         }
-         for (int i = 0; i < numMessages; i++)
-         {
-            producer2.send(session.createClientMessage(false));
-         }
-         latch.await();
-
-         session.close();
-
-         assertEquals(myMessageHandler.messagesReceived, 100);
-         assertEquals(myMessageHandler2.messagesReceived, 100);
-         assertEquals(myMessageHandler3.messagesReceived, 0);
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-
-   }
-
-   /*
-   * tests that even tho we have an grouping round robin distributor we don't pin the consumer as autogroup is false
-   * */
-   public void testGroupIdAutomaticallyNotSet() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         AddressSettings qs = new AddressSettings();
-         qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
-
-         messagingService.getServer().getAddressSettingsRepository().addMatch(groupTestQ.toString(), qs);
-         messagingService.start();
-
-         ClientSessionFactory sf = createInVMFactory();
-
-         ClientSession session = sf.createSession(false, true, true);
-
-         session.createQueue(groupTestQ, groupTestQ, null, false, false);
-
-         ClientProducer producer = session.createProducer(groupTestQ);
-
-         final CountDownLatch latch = new CountDownLatch(100);
-
-         MyMessageHandler myMessageHandler = new MyMessageHandler(latch);
-         MyMessageHandler myMessageHandler2 = new MyMessageHandler(latch);
-
-         ClientConsumer consumer = session.createConsumer(groupTestQ);
-         consumer.setMessageHandler(myMessageHandler);
-         ClientConsumer consumer2 = session.createConsumer(groupTestQ);
-         consumer2.setMessageHandler(myMessageHandler2);
-
-         session.start();
-
-         final int numMessages = 100;
-
-         for (int i = 0; i < numMessages; i++)
-         {
-            producer.send(session.createClientMessage(false));
-         }
-         latch.await();
-
-         session.close();
-
-         assertEquals(myMessageHandler.messagesReceived, 50);
-         assertEquals(myMessageHandler2.messagesReceived, 50);
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-
-   }
-
-   /*
-   * tests send window size. we do this by having 2 receivers on the q. since we roundrobin the consumer for delivery we
-   * know if consumer 1 has received n messages then consumer 2 must have also have received n messages or at least up
-   * to its window size
-   * */
-   public void testSendWindowSize() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      ClientSessionFactory cf = createInVMFactory();
-      try
-      {
-         messagingService.start();
-         cf.setBlockOnNonPersistentSend(true);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession receiveSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientConsumer receivingConsumer = receiveSession.createConsumer(queueA);
-         ClientMessage cm = sendSession.createClientMessage(false);
-         cm.setDestination(addressA);
-         int encodeSize = cm.getEncodeSize();
-         int numMessage = 100;
-         cf.setConsumerWindowSize(numMessage * encodeSize);
-         ClientSession session = cf.createSession(false, true, true);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         session.start();
-         receiveSession.start();
-         for (int i = 0; i < numMessage * 4; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-
-         for (int i = 0; i < numMessage * 2; i++)
-         {
-            ClientMessage m = receivingConsumer.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-         receiveSession.close();
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(numMessage, q.getDeliveringCount());
-
-         session.close();
-         sendSession.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testRouteToMultipleQueues() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         sendSession.createQueue(addressA, queueB, false);
-         sendSession.createQueue(addressA, queueC, false);
-         int numMessages = 300;
-         ClientProducer p = sendSession.createProducer(addressA);
-         for (int i = 0; i < numMessages; i++)
-         {
-            p.send(sendSession.createClientMessage(false));
-         }
-         ClientSession session = cf.createSession(false, true, true);
-         ClientConsumer c1 = session.createConsumer(queueA);
-         ClientConsumer c2 = session.createConsumer(queueB);
-         ClientConsumer c3 = session.createConsumer(queueC);
-         session.start();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage m = c1.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-            c2.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-            c3.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-         assertNull(c1.receiveImmediate());
-         assertNull(c2.receiveImmediate());
-         assertNull(c3.receiveImmediate());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testRouteToSingleNonDurableQueue() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         int numMessages = 300;
-         ClientProducer p = sendSession.createProducer(addressA);
-         for (int i = 0; i < numMessages; i++)
-         {
-            p.send(sendSession.createClientMessage(false));
-         }
-         ClientSession session = cf.createSession(false, true, true);
-         ClientConsumer c1 = session.createConsumer(queueA);
-         session.start();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage m = c1.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-         assertNull(c1.receiveImmediate());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testRouteToSingleDurableQueue() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, true);
-         int numMessages = 300;
-         ClientProducer p = sendSession.createProducer(addressA);
-         for (int i = 0; i < numMessages; i++)
-         {
-            p.send(sendSession.createClientMessage(false));
-         }
-         ClientSession session = cf.createSession(false, true, true);
-         ClientConsumer c1 = session.createConsumer(queueA);
-         session.start();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage m = c1.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-         assertNull(c1.receiveImmediate());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testRouteToSingleQueueWithFilter() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, new SimpleString("foo = 'bar'"), false, false);
-         int numMessages = 300;
-         ClientProducer p = sendSession.createProducer(addressA);
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage clientMessage = sendSession.createClientMessage(false);
-            clientMessage.putStringProperty(new SimpleString("foo"), new SimpleString("bar"));
-            p.send(clientMessage);
-         }
-         ClientSession session = cf.createSession(false, true, true);
-         ClientConsumer c1 = session.createConsumer(queueA);
-         session.start();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage m = c1.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-         assertNull(c1.receiveImmediate());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testRouteToMultipleQueueWithFilters() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, new SimpleString("foo = 'bar'"), false, false);
-         sendSession.createQueue(addressA, queueB, new SimpleString("x = 1"), false, false);
-         sendSession.createQueue(addressA, queueC, new SimpleString("b = false"), false, false);
-         int numMessages = 300;
-         ClientProducer p = sendSession.createProducer(addressA);
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage clientMessage = sendSession.createClientMessage(false);
-            if (i % 3 == 0)
-            {
-               clientMessage.putStringProperty(new SimpleString("foo"), new SimpleString("bar"));
-            }
-            else if (i % 3 == 1)
-            {
-               clientMessage.putIntProperty(new SimpleString("x"), 1);
-            }
-            else
-            {
-               clientMessage.putBooleanProperty(new SimpleString("b"), false);
-            }
-            p.send(clientMessage);
-         }
-         ClientSession session = cf.createSession(false, true, true);
-         ClientConsumer c1 = session.createConsumer(queueA);
-         ClientConsumer c2 = session.createConsumer(queueB);
-         ClientConsumer c3 = session.createConsumer(queueC);
-         session.start();
-         for (int i = 0; i < numMessages / 3; i++)
-         {
-            ClientMessage m = c1.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-            m = c2.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-            m = c3.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-         assertNull(c1.receiveImmediate());
-         assertNull(c2.receiveImmediate());
-         assertNull(c3.receiveImmediate());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testRouteToSingleTemporaryQueue() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false, true);
-         int numMessages = 300;
-         ClientProducer p = sendSession.createProducer(addressA);
-         for (int i = 0; i < numMessages; i++)
-         {
-            p.send(sendSession.createClientMessage(false));
-         }
-         ClientSession session = cf.createSession(false, true, true);
-         ClientConsumer c1 = session.createConsumer(queueA);
-         session.start();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage m = c1.receive(5000);
-            assertNotNull(m);
-            m.acknowledge();
-         }
-         assertNull(c1.receiveImmediate());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testSendWithCommit() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession session = cf.createSession(false, false, false);
-         session.createQueue(addressA, queueA, false);
-         ClientProducer cp = session.createProducer(addressA);
-         int numMessages = 100;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(session.createClientMessage(false));
-         }
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(q.getMessageCount(), 0);
-         session.commit();
-         assertEquals(q.getMessageCount(), numMessages);
-         // now send some more
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(session.createClientMessage(false));
-         }
-         assertEquals(q.getMessageCount(), numMessages);
-         session.commit();
-         assertEquals(q.getMessageCount(), numMessages * 2);
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testSendWithRollback() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession session = cf.createSession(false, false, false);
-         session.createQueue(addressA, queueA, false);
-         ClientProducer cp = session.createProducer(addressA);
-         int numMessages = 100;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(session.createClientMessage(false));
-         }
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(q.getMessageCount(), 0);
-         session.rollback();
-         assertEquals(q.getMessageCount(), 0);
-         // now send some more
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(session.createClientMessage(false));
-         }
-         assertEquals(q.getMessageCount(), 0);
-         session.commit();
-         assertEquals(q.getMessageCount(), numMessages);
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testReceiveWithCommit() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession session = cf.createSession(false, false, false);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         int numMessages = 100;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-         session.start();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage cm = cc.receive(5000);
-            assertNotNull(cm);
-            cm.acknowledge();
-         }
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(numMessages, q.getDeliveringCount());
-         session.commit();
-         assertEquals(0, q.getDeliveringCount());
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testReceiveWithRollback() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession session = cf.createSession(false, false, false);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         int numMessages = 100;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-         session.start();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage cm = cc.receive(5000);
-            assertNotNull(cm);
-            cm.acknowledge();
-         }
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(numMessages, q.getDeliveringCount());
-         session.rollback();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage cm = cc.receive(5000);
-            assertNotNull(cm);
-            cm.acknowledge();
-         }
-         assertEquals(numMessages, q.getDeliveringCount());
-         session.close();
-         sendSession.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testReceiveAckLastMessageOnly() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         cf.setAckBatchSize(0);
-         cf.setBlockOnAcknowledge(true);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession session = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         int numMessages = 100;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-         session.start();
-         ClientMessage cm = null;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cm = cc.receive(5000);
-            assertNotNull(cm);
-         }
-         cm.acknowledge();
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-
-         assertEquals(0, q.getDeliveringCount());
-         session.close();
-         sendSession.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testAsyncConsumerNoAck() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession session = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         int numMessages = 100;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-         final CountDownLatch latch = new CountDownLatch(numMessages);
-         session.start();
-         cc.setMessageHandler(new MessageHandler()
-         {
-            public void onMessage(ClientMessage message)
-            {
-               latch.countDown();
-            }
-         });
-         assertTrue(latch.await(5, TimeUnit.SECONDS));
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(numMessages, q.getDeliveringCount());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testAsyncConsumerAck() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         cf.setBlockOnAcknowledge(true);
-         cf.setAckBatchSize(0);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         final ClientSession session = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         int numMessages = 100;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-         final CountDownLatch latch = new CountDownLatch(numMessages);
-         session.start();
-         cc.setMessageHandler(new MessageHandler()
-         {
-            public void onMessage(ClientMessage message)
-            {
-               try
-               {
-                  message.acknowledge();
-               }
-               catch (MessagingException e)
-               {
-                  try
-                  {
-                     session.close();
-                  }
-                  catch (MessagingException e1)
-                  {
-                     e1.printStackTrace();
-                  }
-               }
-               latch.countDown();
-            }
-         });
-         assertTrue(latch.await(5, TimeUnit.SECONDS));
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(0, q.getDeliveringCount());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testAsyncConsumerAckLastMessageOnly() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         cf.setBlockOnAcknowledge(true);
-         cf.setAckBatchSize(0);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         final ClientSession session = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         int numMessages = 100;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-         final CountDownLatch latch = new CountDownLatch(numMessages);
-         session.start();
-         cc.setMessageHandler(new MessageHandler()
-         {
-            public void onMessage(ClientMessage message)
-            {
-               if (latch.getCount() == 1)
-               {
-                  try
-                  {
-                     message.acknowledge();
-                  }
-                  catch (MessagingException e)
-                  {
-                     try
-                     {
-                        session.close();
-                     }
-                     catch (MessagingException e1)
-                     {
-                        e1.printStackTrace();
-                     }
-                  }
-               }
-               latch.countDown();
-            }
-         });
-         assertTrue(latch.await(5, TimeUnit.SECONDS));
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(0, q.getDeliveringCount());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testAsyncConsumerCommit() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         cf.setBlockOnAcknowledge(true);
-         cf.setAckBatchSize(0);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         final ClientSession session = cf.createSession(false, true, false);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         int numMessages = 100;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-         final CountDownLatch latch = new CountDownLatch(numMessages);
-         session.start();
-         cc.setMessageHandler(new MessageHandler()
-         {
-            public void onMessage(ClientMessage message)
-            {
-               try
-               {
-                  message.acknowledge();
-               }
-               catch (MessagingException e)
-               {
-                  try
-                  {
-                     session.close();
-                  }
-                  catch (MessagingException e1)
-                  {
-                     e1.printStackTrace();
-                  }
-               }
-               latch.countDown();
-            }
-         });
-         assertTrue(latch.await(5, TimeUnit.SECONDS));
-         Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(numMessages, q.getDeliveringCount());
-         assertEquals(numMessages, q.getMessageCount());
-         session.commit();
-         assertEquals(0, q.getDeliveringCount());
-         assertEquals(0, q.getMessageCount());
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testAsyncConsumerRollback() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         cf.setBlockOnAcknowledge(true);
-         cf.setAckBatchSize(0);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         final ClientSession session = cf.createSession(false, true, false);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = session.createConsumer(queueA);
-         int numMessages = 100;
-         for (int i = 0; i < numMessages; i++)
-         {
-            cp.send(sendSession.createClientMessage(false));
-         }
-         CountDownLatch latch = new CountDownLatch(numMessages);
-         session.start();
-         cc.setMessageHandler(new ackHandler(session, latch));
-         assertTrue(latch.await(5, TimeUnit.SECONDS));
-         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-         assertEquals(numMessages, q.getDeliveringCount());
-         assertEquals(numMessages, q.getMessageCount());
-         session.stop();
-         session.rollback();
-         assertEquals(0, q.getDeliveringCount());
-         assertEquals(numMessages, q.getMessageCount());
-         latch = new CountDownLatch(numMessages);
-         cc.setMessageHandler(new ackHandler(session, latch));
-         session.start();
-         assertTrue(latch.await(5, TimeUnit.SECONDS));
-         sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testSendDeliveryOrderOnCommit() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, false, true);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         int numMessages = 1000;
-         sendSession.createQueue(addressA, queueA, false);
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage cm = sendSession.createClientMessage(false);
-            cm.getBody().writeInt(i);
-            cp.send(cm);
-            if (i % 10 == 0)
-            {
-               sendSession.commit();
-            }
-            sendSession.commit();
-         }
-         ClientConsumer c = sendSession.createConsumer(queueA);
-         sendSession.start();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage cm = c.receive(5000);
-            assertNotNull(cm);
-            assertEquals(i, cm.getBody().readInt());
-         }
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testReceiveDeliveryOrderOnRollback() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         int numMessages = 1000;
-         sendSession.createQueue(addressA, queueA, false);
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage cm = sendSession.createClientMessage(false);
-            cm.getBody().writeInt(i);
-            cp.send(cm);
-         }
-         ClientConsumer c = sendSession.createConsumer(queueA);
-         sendSession.start();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage cm = c.receive(5000);
-            assertNotNull(cm);
-            cm.acknowledge();
-            assertEquals(i, cm.getBody().readInt());
-         }
-         sendSession.rollback();
-         for (int i = 0; i < numMessages; i++)
-         {
-            ClientMessage cm = c.receive(5000);
-            assertNotNull(cm);
-            cm.acknowledge();
-            assertEquals(i, cm.getBody().readInt());
-         }
-         sendSession.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testMultipleConsumersMessageOrder() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession recSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         int numReceivers = 100;
-         AtomicInteger count = new AtomicInteger(0);
-         int numMessage = 10000;
-         ClientConsumer[] clientConsumers = new ClientConsumer[numReceivers];
-         Receiver[] receivers = new Receiver[numReceivers];
-         CountDownLatch latch = new CountDownLatch(numMessage);
-         for (int i = 0; i < numReceivers; i++)
-         {
-            clientConsumers[i] = recSession.createConsumer(queueA);
-            receivers[i] = new Receiver(latch);
-            clientConsumers[i].setMessageHandler(receivers[i]);
-         }
-         recSession.start();
-         ClientProducer clientProducer = sendSession.createProducer(addressA);
-         for (int i = 0; i < numMessage; i++)
-         {
-            ClientMessage cm = sendSession.createClientMessage(false);
-            cm.getBody().writeInt(count.getAndIncrement());
-            clientProducer.send(cm);
-         }
-         assertTrue(latch.await(10, TimeUnit.SECONDS));
-         for (Receiver receiver : receivers)
-         {
-            assertFalse("" + receiver.lastMessage, receiver.failed);
-         }
-         sendSession.close();
-         recSession.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testSendConsumeLargeMessage() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         cf.setMinLargeMessageSize(1000);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession recSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         ClientConsumer cc = recSession.createConsumer(queueA);
-         recSession.start();
-         ClientMessage message = recSession.createClientMessage(false);
-         byte[] bytes = new byte[3000];
-         message.getBody().writeBytes(bytes);
-         cp.send(message);
-         ClientMessage m = cc.receive(5000);
-         assertNotNull(m);
-         byte[] recBytes = new byte[3000];
-         m.getBody().readBytes(recBytes);
-         assertEqualsByteArrays(bytes, recBytes);
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testConsumeFileMessage() throws Exception
-   {
-      String testDir = System.getProperty("java.io.tmpdir", "/tmp") + "/jbm-unit-test";
-
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         cf.setMinLargeMessageSize(1000);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession recSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         File directory = new File(testDir);
-         directory.mkdirs();
-         ClientConsumer cc = recSession.createFileConsumer(directory, queueA);
-         recSession.start();
-         ClientMessage message = recSession.createClientMessage(false);
-         byte[] bytes = new byte[3000];
-         message.getBody().writeBytes(bytes);
-         cp.send(message);
-         ClientFileMessageImpl m = (ClientFileMessageImpl)cc.receive(5000);
-         assertNotNull(m);
-         FileChannel channel = m.getChannel();
-         ByteBuffer dst = ByteBuffer.allocate(3000);
-         channel.read(dst);
-         assertEqualsByteArrays(bytes, dst.array());
-         sendSession.close();
-         recSession.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testProduceFileMessage() throws Exception
-   {
-      String testDir = System.getProperty("java.io.tmpdir", "/tmp") + "/jbm-unit-test";
-
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         cf.setMinLargeMessageSize(1000);
-         ClientSession sendSession = cf.createSession(false, true, true);
-         ClientSession recSession = cf.createSession(false, true, true);
-         sendSession.createQueue(addressA, queueA, false);
-         ClientProducer cp = sendSession.createProducer(addressA);
-         File directory = new File(testDir);
-         directory.delete();
-         directory.mkdirs();
-         ClientConsumer cc = recSession.createConsumer(queueA);
-         recSession.start();
-         ClientFileMessage message = sendSession.createFileMessage(false);
-         byte[] bytes = new byte[3000];
-         File src = new File(directory, "test.jbm");
-         src.createNewFile();
-         FileOutputStream fos = new FileOutputStream(src);
-         fos.write(bytes);
-         fos.close();
-         message.setFile(src);
-         cp.send(message);
-         ClientMessage m = cc.receive(5000);
-         assertNotNull(m);
-         byte[] recBytes = new byte[3000];
-         m.getBody().readBytes(recBytes);
-         assertEqualsByteArrays(bytes, recBytes);
-         sendSession.close();
-         recSession.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   public void testConsumersRoundRobinCorrectly() throws Exception
-   {
-      MessagingService messagingService = createService(false);
-      try
-      {
-         messagingService.start();
-         ClientSessionFactory cf = createInVMFactory();
-         ClientSession session = cf.createSession(false, true, true);
-         session.createQueue(addressA, queueA, false);
-
-         ClientConsumer[] consumers = new ClientConsumer[5];
-         // start the session before we create the consumers, this is because start is non blocking and we have to
-         // gaurantee
-         // all consumers have been started before sending messages
-         session.start();
-         consumers[0] = session.createConsumer(queueA);
-         consumers[1] = session.createConsumer(queueA);
-         consumers[2] = session.createConsumer(queueA);
-         consumers[3] = session.createConsumer(queueA);
-         consumers[4] = session.createConsumer(queueA);
-
-         //ClientSession sendSession = cf.createSession(false, true, true);
-         ClientProducer cp = session.createProducer(addressA);
-         int numMessage = 100;
-         for (int i = 0; i < numMessage; i++)
-         {
-            ClientMessage cm = session.createClientMessage(false);
-            cm.getBody().writeInt(i);
-            cp.send(cm);
-         }
-         int currMessage = 0;
-         for (int i = 0; i < numMessage / 5; i++)
-         {
-            for (int j = 0; j < 5; j++)
-            {
-               ClientMessage cm = consumers[j].receive(5000);
-               assertNotNull(cm);
-               assertEquals(currMessage++, cm.getBody().readInt());
-            }
-         }
-         //sendSession.close();
-         session.close();
-      }
-      finally
-      {
-         if (messagingService.isStarted())
-         {
-            messagingService.stop();
-         }
-      }
-   }
-
-   class Receiver implements MessageHandler
-   {
-      final CountDownLatch latch;
-
-      int lastMessage = -1;
-
-      boolean failed = false;
-
-      public Receiver(CountDownLatch latch)
-      {
-         this.latch = latch;
-      }
-
-      public void onMessage(ClientMessage message)
-      {
-         int i = message.getBody().readInt();
-         try
-         {
-            message.acknowledge();
-         }
-         catch (MessagingException e)
-         {
-            e.printStackTrace();
-         }
-         if (i <= lastMessage)
-         {
-            failed = true;
-         }
-         lastMessage = i;
-         latch.countDown();
-      }
-
-   }
-
-   private static class MyMessageHandler implements MessageHandler
-   {
-      volatile int messagesReceived = 0;
-
-      private final CountDownLatch latch;
-
-      public MyMessageHandler(CountDownLatch latch)
-      {
-         this.latch = latch;
-      }
-
-      public void onMessage(ClientMessage message)
-      {
-         messagesReceived++;
-         try
-         {
-            message.acknowledge();
-         }
-         catch (MessagingException e)
-         {
-            e.printStackTrace();
-         }
-         latch.countDown();
-      }
-   }
-
-   private static class ackHandler implements MessageHandler
-   {
-      private final ClientSession session;
-
-      private final CountDownLatch latch;
-
-      public ackHandler(ClientSession session, CountDownLatch latch)
-      {
-         this.session = session;
-         this.latch = latch;
-      }
-
-      public void onMessage(ClientMessage message)
-      {
-         try
-         {
-            message.acknowledge();
-         }
-         catch (MessagingException e)
-         {
-            try
-            {
-               session.close();
-            }
-            catch (MessagingException e1)
-            {
-               e1.printStackTrace();
-            }
-         }
-         latch.countDown();
-      }
-   }
-}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientFileMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientFileMessageTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientFileMessageTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,133 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientFileMessage;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientFileMessageImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientFileMessageTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   public void testConsumeFileMessage() throws Exception
+   {
+      String testDir = System.getProperty("java.io.tmpdir", "/tmp") + "/jbm-unit-test";
+
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setMinLargeMessageSize(1000);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession recSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         File directory = new File(testDir);
+         directory.mkdirs();
+         ClientConsumer cc = recSession.createFileConsumer(directory, queueA);
+         recSession.start();
+         ClientMessage message = recSession.createClientMessage(false);
+         byte[] bytes = new byte[3000];
+         message.getBody().writeBytes(bytes);
+         cp.send(message);
+         ClientFileMessageImpl m = (ClientFileMessageImpl) cc.receive(5000);
+         assertNotNull(m);
+         FileChannel channel = m.getChannel();
+         ByteBuffer dst = ByteBuffer.allocate(3000);
+         channel.read(dst);
+         assertEqualsByteArrays(bytes, dst.array());
+         sendSession.close();
+         recSession.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testProduceFileMessage() throws Exception
+   {
+      String testDir = System.getProperty("java.io.tmpdir", "/tmp") + "/jbm-unit-test";
+
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setMinLargeMessageSize(1000);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession recSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         File directory = new File(testDir);
+         directory.delete();
+         directory.mkdirs();
+         ClientConsumer cc = recSession.createConsumer(queueA);
+         recSession.start();
+         ClientFileMessage message = sendSession.createFileMessage(false);
+         byte[] bytes = new byte[3000];
+         File src = new File(directory, "test.jbm");
+         src.createNewFile();
+         FileOutputStream fos = new FileOutputStream(src);
+         fos.write(bytes);
+         fos.close();
+         message.setFile(src);
+         cp.send(message);
+         ClientMessage m = cc.receive(5000);
+         assertNotNull(m);
+         byte[] recBytes = new byte[3000];
+         m.getBody().readBytes(recBytes);
+         assertEqualsByteArrays(bytes, recBytes);
+         sendSession.close();
+         recSession.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientLargeMessageTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientLargeMessageTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   public void testSendConsumeLargeMessage() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         cf.setMinLargeMessageSize(1000);
+         ClientSession sendSession = cf.createSession(false, true, true);
+         ClientSession recSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         ClientProducer cp = sendSession.createProducer(addressA);
+         ClientConsumer cc = recSession.createConsumer(queueA);
+         recSession.start();
+         ClientMessage message = recSession.createClientMessage(false);
+         byte[] bytes = new byte[3000];
+         message.getBody().writeBytes(bytes);
+         cp.send(message);
+         ClientMessage m = cc.receive(5000);
+         assertNotNull(m);
+         byte[] recBytes = new byte[3000];
+         m.getBody().readBytes(recBytes);
+         assertEqualsByteArrays(bytes, recBytes);
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientReceiveTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientReceiveTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientReceiveTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.tests.util.ServiceTestBase;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientReceiveTest extends ServiceTestBase
+{
+   public void testReceive()
+   {
+
+   }
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientRoutingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientRoutingTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientRoutingTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,314 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientRoutingTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   public final SimpleString queueB = new SimpleString("queueB");
+
+   public final SimpleString queueC = new SimpleString("queueC");
+
+
+   public void testRouteToMultipleQueues() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         sendSession.createQueue(addressA, queueB, false);
+         sendSession.createQueue(addressA, queueC, false);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(sendSession.createClientMessage(false));
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         ClientConsumer c2 = session.createConsumer(queueB);
+         ClientConsumer c3 = session.createConsumer(queueC);
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+            c2.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+            c3.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         assertNull(c2.receiveImmediate());
+         assertNull(c3.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToSingleNonDurableQueue() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(sendSession.createClientMessage(false));
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToSingleDurableQueue() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, true);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(sendSession.createClientMessage(false));
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToSingleQueueWithFilter() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, new SimpleString("foo = 'bar'"), false, false);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage clientMessage = sendSession.createClientMessage(false);
+            clientMessage.putStringProperty(new SimpleString("foo"), new SimpleString("bar"));
+            p.send(clientMessage);
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToMultipleQueueWithFilters() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, new SimpleString("foo = 'bar'"), false, false);
+         sendSession.createQueue(addressA, queueB, new SimpleString("x = 1"), false, false);
+         sendSession.createQueue(addressA, queueC, new SimpleString("b = false"), false, false);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage clientMessage = sendSession.createClientMessage(false);
+            if (i % 3 == 0)
+            {
+               clientMessage.putStringProperty(new SimpleString("foo"), new SimpleString("bar"));
+            }
+            else if (i % 3 == 1)
+            {
+               clientMessage.putIntProperty(new SimpleString("x"), 1);
+            }
+            else
+            {
+               clientMessage.putBooleanProperty(new SimpleString("b"), false);
+            }
+            p.send(clientMessage);
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         ClientConsumer c2 = session.createConsumer(queueB);
+         ClientConsumer c3 = session.createConsumer(queueC);
+         session.start();
+         for (int i = 0; i < numMessages / 3; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+            m = c2.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+            m = c3.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         assertNull(c2.receiveImmediate());
+         assertNull(c3.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testRouteToSingleTemporaryQueue() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession sendSession = cf.createSession(false, true, true);
+         sendSession.createQueue(addressA, queueA, false, true);
+         int numMessages = 300;
+         ClientProducer p = sendSession.createProducer(addressA);
+         for (int i = 0; i < numMessages; i++)
+         {
+            p.send(sendSession.createClientMessage(false));
+         }
+         ClientSession session = cf.createSession(false, true, true);
+         ClientConsumer c1 = session.createConsumer(queueA);
+         session.start();
+         for (int i = 0; i < numMessages; i++)
+         {
+            ClientMessage m = c1.receive(5000);
+            assertNotNull(m);
+            m.acknowledge();
+         }
+         assertNull(c1.receiveImmediate());
+         sendSession.close();
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSendTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSendTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientSendTest.java	2009-03-25 10:14:36 UTC (rev 6157)
@@ -0,0 +1,122 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ClientSendTest extends ServiceTestBase
+{
+   public final SimpleString addressA = new SimpleString("addressA");
+
+   public final SimpleString queueA = new SimpleString("queueA");
+
+   public final SimpleString queueB = new SimpleString("queueB");
+
+   public final SimpleString queueC = new SimpleString("queueC");
+
+
+   public void testSendWithCommit() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession session = cf.createSession(false, false, false);
+         session.createQueue(addressA, queueA, false);
+         ClientProducer cp = session.createProducer(addressA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(session.createClientMessage(false));
+         }
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(q.getMessageCount(), 0);
+         session.commit();
+         assertEquals(q.getMessageCount(), numMessages);
+         // now send some more
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(session.createClientMessage(false));
+         }
+         assertEquals(q.getMessageCount(), numMessages);
+         session.commit();
+         assertEquals(q.getMessageCount(), numMessages * 2);
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+   public void testSendWithRollback() throws Exception
+   {
+      MessagingService messagingService = createService(false);
+      try
+      {
+         messagingService.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSession session = cf.createSession(false, false, false);
+         session.createQueue(addressA, queueA, false);
+         ClientProducer cp = session.createProducer(addressA);
+         int numMessages = 100;
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(session.createClientMessage(false));
+         }
+         Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+         assertEquals(q.getMessageCount(), 0);
+         session.rollback();
+         assertEquals(q.getMessageCount(), 0);
+         // now send some more
+         for (int i = 0; i < numMessages; i++)
+         {
+            cp.send(session.createClientMessage(false));
+         }
+         assertEquals(q.getMessageCount(), 0);
+         session.commit();
+         assertEquals(q.getMessageCount(), numMessages);
+         session.close();
+      }
+      finally
+      {
+         if (messagingService.isStarted())
+         {
+            messagingService.stop();
+         }
+      }
+   }
+
+}




More information about the jboss-cvs-commits mailing list