[jboss-cvs] JBoss Messaging SVN: r6043 - in trunk/tests/src/org/jboss/messaging/tests/integration: server and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Mar 9 09:20:10 EDT 2009


Author: ataylor
Date: 2009-03-09 09:20:09 -0400 (Mon, 09 Mar 2009)
New Revision: 6043

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/server/
   trunk/tests/src/org/jboss/messaging/tests/integration/server/DeadLetterAddressTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryAddressTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryRunnerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/FakeStorageManager.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/MessageGroupingTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/PredefinedQueueTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java
Log:
moved tests

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/DeadLetterAddressTest.java (from rev 6040, trunk/tests/src/org/jboss/messaging/tests/integration/queue/DeadLetterAddressTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/DeadLetterAddressTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/DeadLetterAddressTest.java	2009-03-09 13:20:09 UTC (rev 6043)
@@ -0,0 +1,284 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.SimpleString;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class DeadLetterAddressTest extends UnitTestCase
+{
+   private MessagingService messagingService;
+
+   private ClientSession clientSession;
+
+   public void testBasicSend() throws Exception
+   {
+      Xid xid = new XidImpl("bq".getBytes(), 0, "gt".getBytes());
+      SimpleString dla = new SimpleString("DLA");
+      SimpleString qName = new SimpleString("q1");
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setMaxDeliveryAttempts(1);
+      addressSettings.setDeadLetterAddress(dla);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+      SimpleString dlq = new SimpleString("DLQ1");
+      clientSession.createQueue(dla, dlq, null, false, false);
+      clientSession.createQueue(qName, qName, null, false, false);
+      ClientProducer producer = clientSession.createProducer(qName);
+      producer.send(createTextMessage("heyho!", clientSession));
+      clientSession.start();
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      ClientMessage m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "heyho!");
+      //force a cancel
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.rollback(xid);
+      m = clientConsumer.receive(500);
+      assertNull(m);
+      clientConsumer.close();
+      clientConsumer = clientSession.createConsumer(dlq);
+      m = clientConsumer.receive(500);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "heyho!");
+   }
+
+   public void testBasicSendToMultipleQueues() throws Exception
+   {
+      Xid xid = new XidImpl("bq".getBytes(), 0, "gt".getBytes());
+      SimpleString dla = new SimpleString("DLA");
+      SimpleString qName = new SimpleString("q1");
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setMaxDeliveryAttempts(1);
+      addressSettings.setDeadLetterAddress(dla);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+      SimpleString dlq = new SimpleString("DLQ1");
+      SimpleString dlq2 = new SimpleString("DLQ2");
+      clientSession.createQueue(dla, dlq, null, false, false);
+      clientSession.createQueue(dla, dlq2, null, false, false);
+      clientSession.createQueue(qName, qName, null, false, false);
+      ClientProducer producer = clientSession.createProducer(qName);
+      producer.send(createTextMessage("heyho!", clientSession));
+      clientSession.start();
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      ClientMessage m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "heyho!");
+      //force a cancel
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.rollback(xid);
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      m = clientConsumer.receive(500);
+      assertNull(m);
+      clientConsumer.close();
+      clientConsumer = clientSession.createConsumer(dlq);
+      m = clientConsumer.receive(500);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "heyho!");
+      clientConsumer.close();
+      clientConsumer = clientSession.createConsumer(dlq2);
+      m = clientConsumer.receive(500);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "heyho!");
+      clientConsumer.close();
+   }
+
+   public void testBasicSendToNoQueue() throws Exception
+   {
+      Xid xid = new XidImpl("bq".getBytes(), 0, "gt".getBytes());
+      SimpleString qName = new SimpleString("q1");
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setMaxDeliveryAttempts(1);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+      clientSession.createQueue(qName, qName, null, false, false);
+      ClientProducer producer = clientSession.createProducer(qName);
+      producer.send(createTextMessage("heyho!", clientSession));
+      clientSession.start();
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      ClientMessage m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "heyho!");
+      //force a cancel
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.rollback(xid);
+      m = clientConsumer.receive(500);
+      assertNull(m);
+      clientConsumer.close();
+   }
+
+   public void testHeadersSet() throws Exception
+   {
+      final int MAX_DELIVERIES = 16;
+      final int NUM_MESSAGES = 5;
+      Xid xid = new XidImpl("bq".getBytes(), 0, "gt".getBytes());
+      SimpleString dla = new SimpleString("DLA");
+      SimpleString qName = new SimpleString("q1");
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setMaxDeliveryAttempts(MAX_DELIVERIES);
+      addressSettings.setDeadLetterAddress(dla);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+      SimpleString dlq = new SimpleString("DLQ1");
+      clientSession.createQueue(dla, dlq, null, false, false);
+      clientSession.createQueue(qName, qName, null, false, false);
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      ClientSession sendSession = sessionFactory.createSession(false, true, true);
+      ClientProducer producer = sendSession.createProducer(qName);
+      Map<String, Long> origIds = new HashMap<String, Long>();
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         ClientMessage tm = createTextMessage("Message:" + i, clientSession);
+         producer.send(tm);
+      }
+
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      clientSession.start();
+
+      for (int i = 0; i < MAX_DELIVERIES; i++)
+      {
+         clientSession.start(xid, XAResource.TMNOFLAGS);
+         for (int j = 0; j < NUM_MESSAGES; j++)
+         {
+            ClientMessage tm = clientConsumer.receive(1000);
+
+            assertNotNull(tm);
+            tm.acknowledge();
+            if(i == 0)
+            {
+               origIds.put("Message:" + j, tm.getMessageID());
+            }
+            assertEquals("Message:" + j, tm.getBody().readString());
+         }
+         clientSession.end(xid, XAResource.TMSUCCESS);
+         clientSession.rollback(xid);
+      }
+      
+      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getMessageCount());
+      ClientMessage m = clientConsumer.receive(1000);
+      assertNull(m);
+      //All the messages should now be in the DLQ
+
+      ClientConsumer cc3 = clientSession.createConsumer(dlq);
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         ClientMessage tm = cc3.receive(1000);
+
+         assertNotNull(tm);
+
+         String text = tm.getBody().readString();
+         assertEquals("Message:" + i, text);
+
+         // Check the headers
+         SimpleString origDest =
+               (SimpleString) tm.getProperty(MessageImpl.HDR_ORIGINAL_DESTINATION);
+
+         Long origMessageId =
+               (Long) tm.getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+
+         assertEquals(qName, origDest);
+
+         Long origId = origIds.get(text);
+
+         assertEquals(origId, origMessageId);
+      }
+
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      ConfigurationImpl configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = Messaging.newNullStorageMessagingService(configuration);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      clientSession = sessionFactory.createSession(true, true, false);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (clientSession != null)
+      {
+         try
+         {
+            clientSession.close();
+         }
+         catch (MessagingException e1)
+         {
+            //
+         }
+      }
+      if (messagingService != null && messagingService.isStarted())
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Exception e1)
+         {
+            //
+         }
+      }
+      messagingService = null;
+      clientSession = null;
+      super.tearDown();
+   }
+
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryAddressTest.java (from rev 6040, trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryAddressTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryAddressTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryAddressTest.java	2009-03-09 13:20:09 UTC (rev 6043)
@@ -0,0 +1,266 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ExpiryAddressTest extends UnitTestCase
+{
+   private static final Logger log = Logger.getLogger(ExpiryAddressTest.class);
+
+   private MessagingService messagingService;
+
+   private ClientSession clientSession;
+
+   public void testBasicSend() throws Exception
+   {
+      SimpleString ea = new SimpleString("EA");
+      SimpleString qName = new SimpleString("q1");
+      SimpleString eq = new SimpleString("EA1");
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setExpiryAddress(ea);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+      clientSession.createQueue(ea, eq, null, false, false);
+      clientSession.createQueue(qName, qName, null, false, false);
+      
+      ClientProducer producer = clientSession.createProducer(qName);
+      ClientMessage clientMessage = createTextMessage("heyho!", clientSession);
+      clientMessage.setExpiration(System.currentTimeMillis());
+      producer.send(clientMessage);
+      
+      clientSession.start();
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      ClientMessage m = clientConsumer.receive(500);
+      assertNull(m);
+      System.out.println("size3 = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+      m = clientConsumer.receive(500);
+      assertNull(m);
+      clientConsumer.close();
+      clientConsumer = clientSession.createConsumer(eq);
+      m = clientConsumer.receive(500);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "heyho!");
+      m.acknowledge();
+      
+      // PageSize should be the same as when it started
+      assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+   }
+
+   public void testBasicSendToMultipleQueues() throws Exception
+   {
+      SimpleString ea = new SimpleString("EA");
+      SimpleString qName = new SimpleString("q1");
+      SimpleString eq = new SimpleString("EQ1");
+      SimpleString eq2 = new SimpleString("EQ2");
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setExpiryAddress(ea);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+      clientSession.createQueue(ea, eq, null, false, false);
+      clientSession.createQueue(ea, eq2, null, false, false);
+      clientSession.createQueue(qName, qName, null, false, false);
+      ClientProducer producer = clientSession.createProducer(qName);
+      ClientMessage clientMessage = createTextMessage("heyho!", clientSession);
+      clientMessage.setExpiration(System.currentTimeMillis());
+      
+      System.out.println("initialPageSize = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+      
+      producer.send(clientMessage);
+      
+      System.out.println("pageSize after message sent = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+      
+      clientSession.start();
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      ClientMessage m = clientConsumer.receive(500);
+      
+      System.out.println("pageSize after message received = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+      
+      assertNull(m);
+      
+      clientConsumer.close();
+      
+      clientConsumer = clientSession.createConsumer(eq);
+      
+      m = clientConsumer.receive(500);
+      
+      assertNotNull(m);
+      
+      log.info("acking");
+      m.acknowledge();
+      
+      assertEquals(m.getBody().readString(), "heyho!");
+      
+      clientConsumer.close();
+      
+      clientConsumer = clientSession.createConsumer(eq2);
+      
+      m = clientConsumer.receive(500);
+      
+      assertNotNull(m);
+      
+      log.info("acking");
+      m.acknowledge();
+      
+      assertEquals(m.getBody().readString(), "heyho!");
+      
+      clientConsumer.close();
+      
+      clientSession.commit();
+
+      // PageGlobalSize should be untouched as the message expired
+      assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+   }
+
+   public void testBasicSendToNoQueue() throws Exception
+   {
+      SimpleString ea = new SimpleString("EA");
+      SimpleString qName = new SimpleString("q1");
+      SimpleString eq = new SimpleString("EQ1");
+      SimpleString eq2 = new SimpleString("EQ2");
+      clientSession.createQueue(ea, eq, null, false, false);
+      clientSession.createQueue(ea, eq2, null, false, false);
+      clientSession.createQueue(qName, qName, null, false, false);
+      ClientProducer producer = clientSession.createProducer(qName);
+      ClientMessage clientMessage = createTextMessage("heyho!", clientSession);
+      clientMessage.setExpiration(System.currentTimeMillis());
+      producer.send(clientMessage);
+      clientSession.start();
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      ClientMessage m = clientConsumer.receive(500);
+      assertNull(m);
+      clientConsumer.close();
+   }
+
+   public void testHeadersSet() throws Exception
+   {
+      final int NUM_MESSAGES = 5;
+      SimpleString ea = new SimpleString("DLA");
+      SimpleString qName = new SimpleString("q1");
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setExpiryAddress(ea);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+      SimpleString eq = new SimpleString("EA1");
+      clientSession.createQueue(ea, eq, null, false, false);
+      clientSession.createQueue(qName, qName, null, false, false);
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      ClientSession sendSession = sessionFactory.createSession(false, true, true);
+      ClientProducer producer = sendSession.createProducer(qName);
+
+      long expiration = System.currentTimeMillis();
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         ClientMessage tm = createTextMessage("Message:" + i, clientSession);
+         tm.setExpiration(expiration);
+         producer.send(tm);
+      }
+
+      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
+      clientSession.start();
+      ClientMessage m = clientConsumer.receive(1000);
+      assertNull(m);
+      // All the messages should now be in the EQ
+
+      ClientConsumer cc3 = clientSession.createConsumer(eq);
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         ClientMessage tm = cc3.receive(1000);
+
+         assertNotNull(tm);
+
+         String text = tm.getBody().readString();
+         assertEquals("Message:" + i, text);
+
+         // Check the headers
+         Long actualExpiryTime = (Long)tm.getProperty(HDR_ACTUAL_EXPIRY_TIME);
+         assertTrue(actualExpiryTime >= expiration);
+      }
+
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      ConfigurationImpl configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = Messaging.newNullStorageMessagingService(configuration);
+      // start the server
+      messagingService.start();
+      // then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true); // There are assertions over sizes that needs to be done after the ACK was received on server
+      clientSession = sessionFactory.createSession(null, null, false, true, true, false, 0);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (clientSession != null)
+      {
+         try
+         {
+            clientSession.close();
+         }
+         catch (MessagingException e1)
+         {
+            //
+         }
+      }
+      if (messagingService != null && messagingService.isStarted())
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Exception e1)
+         {
+            //
+         }
+      }
+      messagingService = null;
+      clientSession = null;
+      
+      super.tearDown();
+   }
+
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryRunnerTest.java (from rev 6040, trunk/tests/src/org/jboss/messaging/tests/integration/queue/ExpiryRunnerTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryRunnerTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryRunnerTest.java	2009-03-09 13:20:09 UTC (rev 6043)
@@ -0,0 +1,384 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import junit.framework.TestResult;
+import junit.framework.TestSuite;
+import junit.textui.TestRunner;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class ExpiryRunnerTest extends UnitTestCase
+{
+   private MessagingService messagingService;
+
+   private ClientSession clientSession;
+
+   private SimpleString qName = new SimpleString("ExpiryRunnerTestQ");
+
+   private SimpleString qName2 = new SimpleString("ExpiryRunnerTestQ2");
+
+   private SimpleString expiryQueue;
+
+   private SimpleString expiryAddress;
+
+   public void testBasicExpire() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(qName);
+      int numMessages = 100;
+      long expiration = System.currentTimeMillis();
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage m = createTextMessage("m" + i, clientSession);
+         m.setExpiration(expiration);
+         producer.send(m);
+      }
+      Thread.sleep(1600);
+      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getMessageCount());
+      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
+
+      ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
+      clientSession.start();
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         //assertEquals("m" + i, cm.getBody().getString());
+      }
+      consumer.close();
+   }
+
+   public void testExpireFromMultipleQueues() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(qName);
+      clientSession.createQueue(qName2, qName2, null, false, false);
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setExpiryAddress(expiryAddress);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName2.toString(), addressSettings);
+      ClientProducer producer2 = clientSession.createProducer(qName2);
+      int numMessages = 100;
+      long expiration = System.currentTimeMillis();
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage m = createTextMessage("m" + i, clientSession);
+         m.setExpiration(expiration);
+         producer.send(m);
+         m = createTextMessage("m" + i, clientSession);
+         m.setExpiration(expiration);
+         producer2.send(m);
+      }
+      Thread.sleep(1600);
+      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getMessageCount());
+      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
+
+      ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
+      clientSession.start();
+      for (int i = 0; i < numMessages * 2; i++)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         //assertEquals("m" + i, cm.getBody().getString());
+      }
+      consumer.close();
+   }
+
+   public void testExpireHalf() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(qName);
+      int numMessages = 100;
+      long expiration = System.currentTimeMillis();
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage m = createTextMessage("m" + i, clientSession);
+         if (i % 2 == 0)
+         {
+            m.setExpiration(expiration);
+         }
+         producer.send(m);
+      }
+      Thread.sleep(1600);
+      assertEquals(numMessages / 2, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getMessageCount());
+      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
+
+      ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
+      clientSession.start();
+      for (int i = 0; i < numMessages; i += 2)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         //assertEquals("m" + i, cm.getBody().getString());
+      }
+      consumer.close();
+   }
+
+   public void testExpireConsumeHalf() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(qName);
+      int numMessages = 100;
+      long expiration = System.currentTimeMillis() + 1000;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage m = createTextMessage("m" + i, clientSession);
+         m.setExpiration(expiration);
+         producer.send(m);
+      }
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      clientSession.start();
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull("message not received " + i, cm);
+         cm.acknowledge();
+         assertEquals("m" + i, cm.getBody().readString());
+      }
+      consumer.close();
+      Thread.sleep(2100);
+      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getMessageCount());
+      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
+
+      consumer = clientSession.createConsumer(expiryQueue);
+      clientSession.start();
+      for (int i = 50; i < numMessages; i++)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         //assertEquals("m" + i, cm.getBody().getString());
+      }
+      consumer.close();
+   }
+
+   public void testExpireToMultipleQueues() throws Exception
+   {
+      clientSession.createQueue(qName, qName2, null, false, false);
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setExpiryAddress(expiryAddress);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName2.toString(), addressSettings);
+      ClientProducer producer = clientSession.createProducer(qName);
+      int numMessages = 100;
+      long expiration = System.currentTimeMillis();
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage m = createTextMessage("m" + i, clientSession);
+         m.setExpiration(expiration);
+         producer.send(m);
+      }
+      Thread.sleep(1600);
+      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getMessageCount());
+      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
+
+      ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
+      clientSession.start();
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         //assertEquals("m" + i, cm.getBody().getString());
+      }
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         //assertEquals("m" + i, cm.getBody().getString());
+      }
+      consumer.close();
+   }
+
+   public void testExpireWhilstConsuming() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      CountDownLatch latch = new CountDownLatch(1);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(consumer, latch);
+      clientSession.start();
+      new Thread(dummyMessageHandler).start();
+      long expiration = System.currentTimeMillis() + 1000;
+      int numMessages = 0;
+      long sendMessagesUntil = System.currentTimeMillis() + 2000;
+      do
+      {
+         ClientMessage m = createTextMessage("m" + (numMessages++), clientSession);
+         m.setExpiration(expiration);
+         producer.send(m);
+         Thread.sleep(100);
+      }
+      while (System.currentTimeMillis() < sendMessagesUntil);
+      assertTrue(latch.await(10000, TimeUnit.MILLISECONDS));
+      consumer.close();
+
+      consumer = clientSession.createConsumer(expiryQueue);
+      do
+      {
+         ClientMessage cm = consumer.receive(2000);
+         if(cm == null)
+         {
+            break;
+         }
+         String text = cm.getBody().readString();
+         cm.acknowledge();
+         assertFalse(dummyMessageHandler.payloads.contains(text));
+         dummyMessageHandler.payloads.add(text);
+      } while(true);
+
+      for(int i = 0; i < numMessages; i++)
+      {
+         assertTrue(dummyMessageHandler.payloads.remove("m" + i));
+      }
+      assertTrue(dummyMessageHandler.payloads.isEmpty());
+      consumer.close();
+   }
+
+   public static void main(String[] args) throws Exception
+   {
+      for (int i = 0; i < 1000; i++)
+      {
+         TestSuite suite = new TestSuite();
+         ExpiryRunnerTest expiryRunnerTest = new ExpiryRunnerTest();
+         expiryRunnerTest.setName("testExpireWhilstConsuming");
+         suite.addTest(expiryRunnerTest);
+
+         TestResult result = TestRunner.run(suite);
+         if(result.errorCount() > 0 || result.failureCount() > 0)
+         {
+            System.exit(1);
+         }
+      }
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      ConfigurationImpl configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      configuration.setMessageExpiryScanPeriod(1000);
+      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = Messaging.newNullStorageMessagingService(configuration);
+      // start the server
+      messagingService.start();
+      // then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      clientSession = sessionFactory.createSession(false, true, true);
+      clientSession.createQueue(qName, qName, null, false, false);
+      expiryAddress = new SimpleString("EA");
+      expiryQueue = new SimpleString("expiryQ");
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setExpiryAddress(expiryAddress);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName2.toString(), addressSettings);
+      clientSession.createQueue(expiryAddress, expiryQueue, null, false, false);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (clientSession != null)
+      {
+         try
+         {
+            clientSession.close();
+         }
+         catch (MessagingException e1)
+         {
+            //
+         }
+      }
+      if (messagingService != null && messagingService.isStarted())
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Exception e1)
+         {
+            //
+         }
+      }
+      messagingService = null;
+      clientSession = null;
+      
+      super.tearDown();
+   }
+
+   private static class DummyMessageHandler implements Runnable
+   {
+      List<String> payloads = new ArrayList<String>();
+
+      private final ClientConsumer consumer;
+
+      private final CountDownLatch latch;
+
+      public DummyMessageHandler(ClientConsumer consumer, CountDownLatch latch)
+      {
+         this.consumer = consumer;
+         this.latch = latch;
+      }
+
+      public void run()
+      {
+         while (true)
+         {
+            try
+            {
+               ClientMessage message = consumer.receive(5000);
+               if (message == null)
+               {
+                  break;
+               }
+               message.acknowledge();
+               payloads.add(message.getBody().readString());
+
+               Thread.sleep(110);
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+         latch.countDown();
+
+      }
+   }
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/FakeStorageManager.java (from rev 6040, trunk/tests/src/org/jboss/messaging/tests/integration/queue/FakeStorageManager.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/FakeStorageManager.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/FakeStorageManager.java	2009-03-09 13:20:09 UTC (rev 6043)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import org.jboss.messaging.core.persistence.impl.nullpm.NullStorageManager;
+import org.jboss.messaging.core.server.ServerMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class FakeStorageManager extends NullStorageManager
+{
+   List<Long> messageIds = new ArrayList<Long>();
+
+   List<Long> ackIds = new ArrayList<Long>();
+
+   public void storeMessage(ServerMessage message) throws Exception
+   {
+      messageIds.add(message.getMessageID());
+   }
+
+   public void storeMessageTransactional(long txID, ServerMessage message) throws Exception
+   {
+      messageIds.add(message.getMessageID());
+   }
+
+   public void deleteMessageTransactional(long txID, long queueID, long messageID) throws Exception
+   {
+      messageIds.remove(messageID);
+   }
+
+   public void deleteMessage(long messageID) throws Exception
+   {
+      messageIds.remove(messageID);
+   }
+
+   public void storeAcknowledge(long queueID, long messageID) throws Exception
+   {
+      ackIds.add(messageID);
+   }
+
+   public void storeAcknowledgeTransactional(long txID, long queueID, long messageiD) throws Exception
+   {
+      ackIds.add(messageiD);
+   }
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/MessageGroupingTest.java (from rev 6040, trunk/tests/src/org/jboss/messaging/tests/integration/queue/MessageGroupingTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/MessageGroupingTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/MessageGroupingTest.java	2009-03-09 13:20:09 UTC (rev 6043)
@@ -0,0 +1,607 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.SimpleString;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class MessageGroupingTest extends UnitTestCase
+{
+   private MessagingService messagingService;
+
+   private ClientSession clientSession;
+
+   private SimpleString qName = new SimpleString("MessageGroupingTestQueue");
+
+   public void testBasicGrouping() throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      clientSession.start();
+      SimpleString groupId = new SimpleString("grp1");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertTrue(dummyMessageHandler.list.size() == 100);
+      assertTrue(dummyMessageHandler2.list.size() == 0);
+      consumer.close();
+      consumer2.close();
+   }
+
+   public void testMultipleGrouping() throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      clientSession.start();
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer.close();
+      consumer2.close();
+   }
+
+   public void testMultipleGroupingStartConsumersAfterMessagesSent() throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+
+      clientSession.start();
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer.close();
+      consumer2.close();
+   }
+
+   public void testMultipleGroupingConsumeHalf() throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      clientSession.start();
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+
+      for(int i = 0; i < numMessages/2; i++)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         assertEquals(cm.getBody().readString(), "m" + i);
+         i++;
+         cm = consumer2.receive(500);
+         assertNotNull(cm);
+         assertEquals(cm.getBody().readString(), "m" + i);
+      }
+
+      consumer2.close();
+      consumer.close();
+      //check that within their groups the messages are still in the correct order
+      consumer = clientSession.createConsumer(qName);
+      for(int i = 0; i < numMessages; i+=2)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         assertEquals(cm.getBody().readString(), "m" + i);
+      }
+      for(int i = 1; i < numMessages; i+=2)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         assertEquals(cm.getBody().readString(), "m" + i);
+      }
+      consumer.close();
+   }
+
+   public void testMultipleGroupingSingleConsumer() throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      clientSession.start();
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(dummyMessageHandler.list.size(), 100);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 1;
+      }
+      consumer.close();
+   }
+
+   public void testMultipleGroupingTXCommit() throws Exception
+   {
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      ClientSession clientSession = sessionFactory.createSession(false, false, false);
+      ClientProducer clientProducer = this.clientSession.createProducer(qName);
+      clientSession.start();
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      clientSession.commit();
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer.close();
+      consumer2.close();
+      consumer = this.clientSession.createConsumer(qName);
+      assertNull(consumer.receive(500));
+   }
+
+   public void testMultipleGroupingTXRollback() throws Exception
+   {
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      ClientSession clientSession = sessionFactory.createSession(false, false, false);
+      ClientProducer clientProducer = this.clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      clientSession.start();
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      latch = new CountDownLatch(numMessages);
+      dummyMessageHandler.reset(latch);
+      dummyMessageHandler2.reset(latch);
+      clientSession.rollback();
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer = this.clientSession.createConsumer(qName);
+      assertNull(consumer.receive(500));
+   }
+
+   public void testMultipleGroupingXACommit() throws Exception
+   {
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      ClientSession clientSession = sessionFactory.createSession(true, false, false);
+      ClientProducer clientProducer = this.clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      clientSession.start();
+      Xid xid = new XidImpl("bq".getBytes(), 4, "gtid".getBytes());
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+      clientSession.commit(xid, true);
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer.close();
+      consumer2.close();
+      consumer = this.clientSession.createConsumer(qName);
+      assertNull(consumer.receive(500));
+   }
+
+   public void testMultipleGroupingXARollback() throws Exception
+   {
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      ClientSession clientSession = sessionFactory.createSession(true, false, false);
+      ClientProducer clientProducer = this.clientSession.createProducer(qName);
+      clientSession.start();
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      Xid xid = new XidImpl("bq".getBytes(), 4, "gtid".getBytes());
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      latch = new CountDownLatch(numMessages);
+      dummyMessageHandler.reset(latch);
+      dummyMessageHandler2.reset(latch);
+      clientSession.rollback(xid);
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+      clientSession.commit(xid, false);
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer = this.clientSession.createConsumer(qName);
+      assertNull(consumer.receive(500));
+   }
+
+   protected void tearDown() throws Exception
+   {
+      if (clientSession != null)
+      {
+         try
+         {
+            clientSession.close();
+         }
+         catch (MessagingException e1)
+         {
+            //
+         }
+      }
+      if (messagingService != null && messagingService.isStarted())
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Exception e1)
+         {
+            //
+         }
+      }
+      messagingService = null;
+      clientSession = null;
+      
+      super.tearDown();
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      ConfigurationImpl configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = Messaging.newNullStorageMessagingService(configuration);
+      // start the server
+      messagingService.start();
+
+      AddressSettings qs = new AddressSettings();
+      qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), qs);
+      // then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      clientSession = sessionFactory.createSession(false, true, true);
+      clientSession.createQueue(qName, qName, null, false, false);
+   }
+
+   private static class DummyMessageHandler implements MessageHandler
+   {
+      ArrayList<ClientMessage> list = new ArrayList<ClientMessage>();
+
+      private CountDownLatch latch;
+
+      private final boolean acknowledge;
+
+      public DummyMessageHandler(CountDownLatch latch, boolean acknowledge)
+      {
+         this.latch = latch;
+         this.acknowledge = acknowledge;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         list.add(message);
+         if (acknowledge)
+         {
+            try
+         {
+            message.acknowledge();
+         }
+         catch (MessagingException e)
+            {
+               //ignore
+            }
+         }
+         latch.countDown();
+      }
+
+      public void reset(CountDownLatch latch)
+      {
+         list.clear();
+         this.latch = latch;
+      }
+   }
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/PredefinedQueueTest.java (from rev 6040, trunk/tests/src/org/jboss/messaging/tests/integration/queue/PredefinedQueueTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/PredefinedQueueTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/PredefinedQueueTest.java	2009-03-09 13:20:09 UTC (rev 6043)
@@ -0,0 +1,485 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.server;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.cluster.QueueConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 
+ * A PredefinedQueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 19 Jan 2009 15:44:52
+ *
+ *
+ */
+public class PredefinedQueueTest extends ServiceTestBase
+{
+   private static final Logger log = Logger.getLogger(PredefinedQueueTest.class);
+
+   public void testFailOnCreatePredefinedQueues() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      final String testAddress = "testAddress";
+      
+      final String queueName1 = "queue1";
+      
+      final String queueName2 = "queue2";
+      
+      final String queueName3 = "queue3";
+      
+      QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, true);
+      
+      QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName2, null, true);
+      
+      QueueConfiguration queue3 = new QueueConfiguration(testAddress, queueName3, null, true);
+      
+      List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+      
+      queueConfs.add(queue1);
+      queueConfs.add(queue2);
+      queueConfs.add(queue3);
+      
+      conf.setQueueConfigurations(queueConfs);
+      
+      MessagingService messagingService = Messaging.newNullStorageMessagingService(conf);
+           
+      messagingService.start();
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      try
+      {
+         session.createQueue(testAddress, queueName1, null, false, false);
+         
+         fail("Should throw exception");
+      }
+      catch (MessagingException me)
+      {
+         assertEquals(MessagingException.QUEUE_EXISTS, me.getCode());
+      }
+      try
+      {
+         session.createQueue(testAddress, queueName2, null, false, false);
+         
+         fail("Should throw exception");
+      }
+      catch (MessagingException me)
+      {
+         assertEquals(MessagingException.QUEUE_EXISTS, me.getCode());
+      }
+      try
+      {
+         session.createQueue(testAddress, queueName3, null, false, false);
+         
+         fail("Should throw exception");
+      }
+      catch (MessagingException me)
+      {
+         assertEquals(MessagingException.QUEUE_EXISTS, me.getCode());
+      }
+            
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
+   
+   public void testDeploySameNames() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      final String testAddress = "testAddress";
+      
+      final String queueName1 = "queue1";
+      
+      final String queueName2 = "queue2";
+      
+      QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, true);
+      
+      QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName1, null, true);
+      
+      QueueConfiguration queue3 = new QueueConfiguration(testAddress, queueName2, null, true);
+      
+      List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+      
+      queueConfs.add(queue1);
+      queueConfs.add(queue2);
+      queueConfs.add(queue3);
+      
+      conf.setQueueConfigurations(queueConfs);
+      
+      MessagingService messagingService = Messaging.newNullStorageMessagingService(conf);
+           
+      messagingService.start();
+      
+      Bindings bindings = messagingService.getServer().getPostOffice().getBindingsForAddress(new SimpleString(testAddress));
+      
+      assertEquals(2, bindings.getBindings().size());
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+            
+      session.start();
+      
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+      
+      ClientConsumer consumer2 = session.createConsumer(queueName2);
+      
+      final int numMessages = 10;
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(false);
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+         
+         message = consumer2.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
+   
+   public void testDeployPreexistingQueues() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      final String testAddress = "testAddress";
+      
+      final String queueName1 = "queue1";
+      
+      final String queueName2 = "queue2";
+      
+      final String queueName3 = "queue3";
+                 
+      MessagingService messagingService = Messaging.newMessagingService(conf);
+           
+      messagingService.start();
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      session.createQueue(testAddress, queueName1, null, true, false);
+        
+      session.createQueue(testAddress, queueName2, null, true, false);
+         
+      session.createQueue(testAddress, queueName3, null, true, false);
+      
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+      
+      QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, true);
+      
+      QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName2, null, true);
+      
+      QueueConfiguration queue3 = new QueueConfiguration(testAddress, queueName3, null, true);
+      
+      List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+      
+      queueConfs.add(queue1);
+      queueConfs.add(queue2);
+      queueConfs.add(queue3);
+      
+      conf.setQueueConfigurations(queueConfs);
+      
+      messagingService.start();
+      
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      session = sf.createSession(false, true, true);
+      
+      session.start();
+      
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+      
+      ClientConsumer consumer2 = session.createConsumer(queueName2);
+      
+      ClientConsumer consumer3 = session.createConsumer(queueName3);
+      
+      final int numMessages = 10;
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(false);
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+         
+         message = consumer2.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+         
+         message = consumer3.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+      assertNull(consumer3.receive(200));
+      
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
+   
+   public void testDurableNonDurable() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      final String testAddress = "testAddress";
+      
+      final String queueName1 = "queue1";
+      
+      final String queueName2 = "queue2";
+      
+      QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, null, false);
+      
+      QueueConfiguration queue2 = new QueueConfiguration(testAddress, queueName2, null, true);
+      
+      List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+      
+      queueConfs.add(queue1);
+      queueConfs.add(queue2);
+      
+      conf.setQueueConfigurations(queueConfs);
+      
+      MessagingService messagingService = Messaging.newMessagingService(conf);
+           
+      messagingService.start();
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      final int numMessages = 1;
+            
+      log.info("sending messages");
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(true);
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+      
+      session.close();
+      
+      log.info("stopping");
+      
+      sf.close();
+      
+      messagingService.stop();
+      
+      messagingService.start();
+      
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      session = sf.createSession(false, true, true);
+      
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+      
+      ClientConsumer consumer2 = session.createConsumer(queueName2);
+      
+      ClientMessage message = consumer1.receive(200);  
+      
+      assertNull(message);
+            
+      for (int i = 0; i < numMessages; i++)
+      {
+         message = consumer2.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer1.receive(200));
+      assertNull(consumer2.receive(200));
+
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
+   
+   
+   public void testDeployWithFilter() throws Exception
+   {
+      Configuration conf = createDefaultConfig();
+      
+      final String testAddress = "testAddress";
+      
+      final String queueName1 = "queue1";
+      
+      final String filter = "cheese='camembert'";
+      
+      QueueConfiguration queue1 = new QueueConfiguration(testAddress, queueName1, filter, false);
+      
+      List<QueueConfiguration> queueConfs = new ArrayList<QueueConfiguration>();
+      
+      queueConfs.add(queue1);
+
+      conf.setQueueConfigurations(queueConfs);
+      
+      MessagingService messagingService = Messaging.newNullStorageMessagingService(conf);
+           
+      messagingService.start();
+      
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+
+      ClientSession session = sf.createSession(false, true, true);
+      
+      ClientProducer producer = session.createProducer(new SimpleString(testAddress));
+      
+      final SimpleString propKey = new SimpleString("testkey");
+      
+      final int numMessages = 1;
+            
+      log.info("sending messages");
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(true);
+         
+         message.putStringProperty(new SimpleString("cheese"), new SimpleString("camembert"));
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+            
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(queueName1);
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(200);         
+         assertNotNull(message);         
+         assertEquals((Integer)i, (Integer)message.getProperty(propKey));         
+         message.acknowledge();
+      }
+      
+      assertNull(consumer1.receive(200));
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(true);
+         
+         message.putStringProperty(new SimpleString("cheese"), new SimpleString("roquefort"));
+         
+         message.putIntProperty(propKey, i);
+         
+         producer.send(message);
+      }
+      
+      assertNull(consumer1.receive(200));
+            
+      session.close();
+      
+      sf.close();
+      
+      messagingService.stop();
+   }
+  
+   
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java (from rev 6040, trunk/tests/src/org/jboss/messaging/tests/integration/queue/SoloQueueRecoveryTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueRecoveryTest.java	2009-03-09 13:20:09 UTC (rev 6043)
@@ -0,0 +1,236 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.SimpleString;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class SoloQueueRecoveryTest extends ServiceTestBase
+{
+   private MessagingService messagingService;
+
+   private ClientSession clientSession;
+
+   private SimpleString address = new SimpleString("SoloQueueTestAddress");
+
+   private SimpleString qName1 = new SimpleString("SoloQueueTestQ1");
+
+   private ClientSession clientSessionXa;
+
+   private ConfigurationImpl configuration;
+
+   private AddressSettings qs;
+
+   public void testMultipleMessagesAfterRecovery() throws Exception
+   {
+      Xid xid = new XidImpl("bq1".getBytes(), 4, "gtid1".getBytes());
+      ClientProducer producer = clientSessionXa.createProducer(address, -1, true, true);
+      SimpleString messageId1 = new SimpleString("SMID1");
+      SimpleString messageId2 = new SimpleString("SMID2");
+      clientSessionXa.start(xid, XAResource.TMNOFLAGS);
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      clientSessionXa.end(xid, XAResource.TMSUCCESS);
+      clientSessionXa.prepare(xid);
+      restartServer();
+      clientSessionXa.commit(xid, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+   }
+
+   public void testManyMessagesReceivedWithRollback() throws Exception
+   {
+      Xid xid = new XidImpl("bq1".getBytes(), 4, "gtid1".getBytes());
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionXa.createConsumer(qName1);
+
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m1.setDurable(true);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m2.setDurable(true);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m3.setDurable(true);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m4.setDurable(true);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m5.setDurable(true);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m6.setDurable(true);
+      clientSessionXa.start(xid, XAResource.TMNOFLAGS);
+      clientSessionXa.start();
+      producer.send(m1);
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m1");
+      producer.send(m2);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m2");
+      producer.send(m3);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m3");
+      producer.send(m4);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m4");
+      producer.send(m5);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m5");
+      producer.send(m6);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m6");
+      clientSessionXa.end(xid, XAResource.TMSUCCESS);
+      clientSessionXa.prepare(xid);
+
+      restartServer();
+      clientSessionXa.rollback(xid);
+      consumer = clientSession.createConsumer(qName1);
+      clientSession.start();
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      m = consumer.receive(1000);
+      assertNull(m);
+   }
+   protected void tearDown() throws Exception
+   {
+      if (clientSession != null)
+      {
+         try
+         {
+            clientSession.close();
+         }
+         catch (MessagingException e1)
+         {
+            //
+         }
+      }
+      if (messagingService != null && messagingService.isStarted())
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Exception e1)
+         {
+            //
+         }
+      }
+      messagingService = null;
+      clientSession = null;
+      
+      super.tearDown();
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      clearData();
+      configuration = createFileConfig();
+      configuration.setSecurityEnabled(false);
+      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = createService(true, configuration);
+      // start the server
+      messagingService.start();
+
+      qs = new AddressSettings();
+      qs.setSoloQueue(true);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(address.toString(), qs);
+      // then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      sessionFactory.setAckBatchSize(0);
+      clientSession = sessionFactory.createSession(false, true, true);
+      clientSessionXa = sessionFactory.createSession(true, false, false);
+      clientSession.createQueue(address, qName1, null, true, false);
+   }
+
+   private void restartServer() throws Exception
+   {
+      messagingService.stop();
+      messagingService = null;
+      messagingService = createService(true, configuration);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(address.toString(), qs);
+      // start the server
+      messagingService.start();
+
+      AddressSettings qs = new AddressSettings();
+      qs.setSoloQueue(true);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(address.toString(), qs);
+      // then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      sessionFactory.setAckBatchSize(0);
+      clientSession = sessionFactory.createSession(false, true, true);
+      clientSessionXa = sessionFactory.createSession(true, false, false);
+   }
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java (from rev 6040, trunk/tests/src/org/jboss/messaging/tests/integration/queue/SoloQueueTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/SoloQueueTest.java	2009-03-09 13:20:09 UTC (rev 6043)
@@ -0,0 +1,584 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.server;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class SoloQueueTest extends UnitTestCase
+{
+   private MessagingService messagingService;
+
+   private ClientSession clientSession;
+
+   private ClientSession clientSessionTxReceives;
+
+   private ClientSession clientSessionTxSends;
+
+   private SimpleString address = new SimpleString("SoloQueueTestAddress");
+
+   private SimpleString qName1 = new SimpleString("SoloQueueTestQ1");
+
+   private FakeStorageManager storageManager;
+
+
+   public void testSimple() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      producer.send(m1);
+      producer.send(m2);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+   }
+
+   public void testMultipleMessages() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      SimpleString messageId1 = new SimpleString("SMID1");
+      SimpleString messageId2 = new SimpleString("SMID2");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+   }
+
+   public void testFirstMessageReceivedButAckedAfter() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      producer.send(m1);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      producer.send(m2);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m1");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+   }
+
+   public void testFirstMessageReceivedAndCancelled() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      producer.send(m1);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      producer.send(m2);
+      consumer.close();
+      consumer = clientSession.createConsumer(qName1);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+      m = consumer.receive(1000);
+      assertNull(m);
+   }
+
+   public void testManyMessagesReceivedAndCancelled() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      clientSession.start();
+      producer.send(m1);
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m1");
+      producer.send(m2);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m2");
+      producer.send(m3);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m3");
+      producer.send(m4);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m4");
+      producer.send(m5);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m5");
+      producer.send(m6);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      assertEquals(m.getBody().readString(), "m6");
+      consumer.close();
+      consumer = clientSession.createConsumer(qName1);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      m = consumer.receive(1000);
+      assertNull(m);
+   }
+
+   public void testSimpleInTx() throws Exception
+   {
+
+      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      producer.send(m1);
+      producer.send(m2);
+      clientSessionTxReceives.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+   }
+
+   public void testMultipleMessagesInTx() throws Exception
+   {
+      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+      SimpleString messageId1 = new SimpleString("SMID1");
+      SimpleString messageId2 = new SimpleString("SMID2");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      clientSessionTxReceives.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+      clientSessionTxReceives.commit();
+      m = consumer.receive(1000);
+      assertNull(m);
+   }
+
+   public void testMultipleMessagesInTxRollback() throws Exception
+   {
+      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+      SimpleString messageId1 = new SimpleString("SMID1");
+      SimpleString messageId2 = new SimpleString("SMID2");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId1);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, messageId2);
+      producer.send(m1);
+      producer.send(m2);
+      clientSessionTxReceives.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m1");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+      producer.send(m3);
+      producer.send(m4);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+      clientSessionTxReceives.rollback();
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+   }
+
+   public void testMultipleMessagesInTxSend() throws Exception
+   {
+      ClientProducer producer = clientSessionTxSends.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      producer.send(m5);
+      producer.send(m6);
+      clientSessionTxSends.commit();
+      clientSessionTxSends.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+   }
+
+   public void testMultipleMessagesPersistedCorrectly() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m1.setDurable(true);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m2.setDurable(true);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m3.setDurable(true);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m4.setDurable(true);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m5.setDurable(true);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m6.setDurable(true);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      producer.send(m5);
+      producer.send(m6);
+      assertEquals(1, storageManager.messageIds.size());
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      assertEquals(0, storageManager.messageIds.size());
+   }
+
+   public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception
+   {
+      ClientProducer producer = clientSessionTxSends.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxSends.createConsumer(qName1);
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m1.setDurable(true);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m2.setDurable(true);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m3.setDurable(true);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m4.setDurable(true);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m5.setDurable(true);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m6.setDurable(true);
+      producer.send(m1);
+      producer.send(m2);
+      producer.send(m3);
+      producer.send(m4);
+      producer.send(m5);
+      producer.send(m6);
+      clientSessionTxSends.commit();
+      assertEquals(1, storageManager.messageIds.size());
+      clientSessionTxSends.start();
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      assertEquals(0, storageManager.messageIds.size());
+   }
+
+   public void testMultipleAcksPersistedCorrectly() throws Exception
+   {
+      ClientProducer producer = clientSession.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m1.setDurable(true);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m2.setDurable(true);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m3.setDurable(true);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m4.setDurable(true);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m5.setDurable(true);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m6.setDurable(true);
+      clientSession.start();
+      producer.send(m1);
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m1");
+      producer.send(m2);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+      producer.send(m3);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      producer.send(m4);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+      producer.send(m5);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m5");
+      producer.send(m6);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      assertEquals(6, storageManager.ackIds.size());
+   }
+
+   public void testMultipleAcksPersistedCorrectlyInTx() throws Exception
+   {
+      ClientProducer producer = clientSessionTxReceives.createProducer(address, -1, true, true);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+      SimpleString rh = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage("m1", clientSession);
+      m1.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m1.setDurable(true);
+      ClientMessage m2 = createTextMessage("m2", clientSession);
+      m2.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m2.setDurable(true);
+      ClientMessage m3 = createTextMessage("m3", clientSession);
+      m3.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m3.setDurable(true);
+      ClientMessage m4 = createTextMessage("m4", clientSession);
+      m4.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m4.setDurable(true);
+      ClientMessage m5 = createTextMessage("m5", clientSession);
+      m5.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m5.setDurable(true);
+      ClientMessage m6 = createTextMessage("m6", clientSession);
+      m6.putStringProperty(MessageImpl.HDR_SOLE_MESSAGE, rh);
+      m6.setDurable(true);
+      clientSessionTxReceives.start();
+      producer.send(m1);
+      ClientMessage m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m1");
+      producer.send(m2);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m2");
+      producer.send(m3);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m3");
+      producer.send(m4);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m4");
+      producer.send(m5);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m5");
+      producer.send(m6);
+      m = consumer.receive(1000);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().readString(), "m6");
+      clientSessionTxReceives.commit();
+      assertEquals(6, storageManager.ackIds.size());
+   }
+
+   
+
+
+   protected void tearDown() throws Exception
+   {
+      if (clientSession != null)
+      {
+         try
+         {
+            clientSession.close();
+         }
+         catch (MessagingException e1)
+         {
+            //
+         }
+      }
+      if (messagingService != null && messagingService.isStarted())
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Exception e1)
+         {
+            //
+         }
+      }
+      messagingService = null;
+      clientSession = null;
+      
+      super.tearDown();
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      ConfigurationImpl configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      storageManager = new FakeStorageManager();
+      messagingService = Messaging.newMessagingService(configuration, storageManager);
+      // start the server
+      messagingService.start();
+
+      AddressSettings qs = new AddressSettings();
+      qs.setSoloQueue(true);
+      messagingService.getServer().getAddressSettingsRepository().addMatch(address.toString(), qs);
+      // then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      sessionFactory.setAckBatchSize(0);
+      clientSession = sessionFactory.createSession(false, true, true);
+      clientSessionTxReceives = sessionFactory.createSession(false, true, false);
+      clientSessionTxSends = sessionFactory.createSession(false, false, true);
+      clientSession.createQueue(address, qName1, null, true, false);
+   }
+}




More information about the jboss-cvs-commits mailing list