[jboss-cvs] JBoss Messaging SVN: r6159 - in trunk/tests/src/org/jboss/messaging/tests/integration: server and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 25 06:21:52 EDT 2009


Author: ataylor
Date: 2009-03-25 06:21:52 -0400 (Wed, 25 Mar 2009)
New Revision: 6159

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java
Removed:
   trunk/tests/src/org/jboss/messaging/tests/integration/server/DeadLetterAddressTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryAddressTest.java
Log:
repackaged tests

Copied: trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java (from rev 6151, trunk/tests/src/org/jboss/messaging/tests/integration/server/MessageGroupingTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageGroupingTest.java	2009-03-25 10:21:52 UTC (rev 6159)
@@ -0,0 +1,607 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.client;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.GroupingRoundRobinDistributor;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.utils.SimpleString;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class MessageGroupingTest extends UnitTestCase
+{
+   private MessagingService messagingService;
+
+   private ClientSession clientSession;
+
+   private SimpleString qName = new SimpleString("MessageGroupingTestQueue");
+
+   public void testBasicGrouping() throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      clientSession.start();
+      SimpleString groupId = new SimpleString("grp1");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertTrue(dummyMessageHandler.list.size() == 100);
+      assertTrue(dummyMessageHandler2.list.size() == 0);
+      consumer.close();
+      consumer2.close();
+   }
+
+   public void testMultipleGrouping() throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      clientSession.start();
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer.close();
+      consumer2.close();
+   }
+
+   public void testMultipleGroupingStartConsumersAfterMessagesSent() throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+
+      clientSession.start();
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer.close();
+      consumer2.close();
+   }
+
+   public void testMultipleGroupingConsumeHalf() throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      clientSession.start();
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+
+      for(int i = 0; i < numMessages/2; i++)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         assertEquals(cm.getBody().readString(), "m" + i);
+         i++;
+         cm = consumer2.receive(500);
+         assertNotNull(cm);
+         assertEquals(cm.getBody().readString(), "m" + i);
+      }
+
+      consumer2.close();
+      consumer.close();
+      //check that within their groups the messages are still in the correct order
+      consumer = clientSession.createConsumer(qName);
+      for(int i = 0; i < numMessages; i+=2)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         assertEquals(cm.getBody().readString(), "m" + i);
+      }
+      for(int i = 1; i < numMessages; i+=2)
+      {
+         ClientMessage cm = consumer.receive(500);
+         assertNotNull(cm);
+         assertEquals(cm.getBody().readString(), "m" + i);
+      }
+      consumer.close();
+   }
+
+   public void testMultipleGroupingSingleConsumer() throws Exception
+   {
+      ClientProducer clientProducer = clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      clientSession.start();
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(dummyMessageHandler.list.size(), 100);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 1;
+      }
+      consumer.close();
+   }
+
+   public void testMultipleGroupingTXCommit() throws Exception
+   {
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      ClientSession clientSession = sessionFactory.createSession(false, false, false);
+      ClientProducer clientProducer = this.clientSession.createProducer(qName);
+      clientSession.start();
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      clientSession.commit();
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer.close();
+      consumer2.close();
+      consumer = this.clientSession.createConsumer(qName);
+      assertNull(consumer.receive(500));
+   }
+
+   public void testMultipleGroupingTXRollback() throws Exception
+   {
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      ClientSession clientSession = sessionFactory.createSession(false, false, false);
+      ClientProducer clientProducer = this.clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      clientSession.start();
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      latch = new CountDownLatch(numMessages);
+      dummyMessageHandler.reset(latch);
+      dummyMessageHandler2.reset(latch);
+      clientSession.rollback();
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer = this.clientSession.createConsumer(qName);
+      assertNull(consumer.receive(500));
+   }
+
+   public void testMultipleGroupingXACommit() throws Exception
+   {
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      ClientSession clientSession = sessionFactory.createSession(true, false, false);
+      ClientProducer clientProducer = this.clientSession.createProducer(qName);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      clientSession.start();
+      Xid xid = new XidImpl("bq".getBytes(), 4, "gtid".getBytes());
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+      clientSession.commit(xid, true);
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer.close();
+      consumer2.close();
+      consumer = this.clientSession.createConsumer(qName);
+      assertNull(consumer.receive(500));
+   }
+
+   public void testMultipleGroupingXARollback() throws Exception
+   {
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      sessionFactory.setBlockOnAcknowledge(true);
+      ClientSession clientSession = sessionFactory.createSession(true, false, false);
+      ClientProducer clientProducer = this.clientSession.createProducer(qName);
+      clientSession.start();
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientConsumer consumer2 = clientSession.createConsumer(qName);
+      Xid xid = new XidImpl("bq".getBytes(), 4, "gtid".getBytes());
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+
+      SimpleString groupId = new SimpleString("grp1");
+      SimpleString groupId2 = new SimpleString("grp2");
+      int numMessages = 100;
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, clientSession);
+         if( i % 2 == 0 || i == 0)
+         {
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+         }
+         else
+         {
+             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+         }
+         clientProducer.send(message);
+      }
+      CountDownLatch latch = new CountDownLatch(numMessages);
+      DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+      consumer.setMessageHandler(dummyMessageHandler);
+      DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+      consumer2.setMessageHandler(dummyMessageHandler2);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      int i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      latch = new CountDownLatch(numMessages);
+      dummyMessageHandler.reset(latch);
+      dummyMessageHandler2.reset(latch);
+      clientSession.rollback(xid);
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      assertTrue(latch.await(10, TimeUnit.SECONDS));
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+      clientSession.commit(xid, false);
+      assertEquals(dummyMessageHandler.list.size(), 50);
+      i = 0;
+      for (ClientMessage message : dummyMessageHandler.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      assertEquals(dummyMessageHandler2.list.size(), 50);
+      i = 1;
+      for (ClientMessage message : dummyMessageHandler2.list)
+      {
+         assertEquals(message.getBody().readString(), "m" + i);
+         i += 2;
+      }
+      consumer = this.clientSession.createConsumer(qName);
+      assertNull(consumer.receive(500));
+   }
+
+   protected void tearDown() throws Exception
+   {
+      if (clientSession != null)
+      {
+         try
+         {
+            clientSession.close();
+         }
+         catch (MessagingException e1)
+         {
+            //
+         }
+      }
+      if (messagingService != null && messagingService.isStarted())
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Exception e1)
+         {
+            //
+         }
+      }
+      messagingService = null;
+      clientSession = null;
+      
+      super.tearDown();
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      ConfigurationImpl configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = Messaging.newNullStorageMessagingService(configuration);
+      // start the server
+      messagingService.start();
+
+      AddressSettings qs = new AddressSettings();
+      qs.setDistributionPolicyClass(GroupingRoundRobinDistributor.class.getName());
+      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), qs);
+      // then we create a client as normal
+      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+      clientSession = sessionFactory.createSession(false, true, true);
+      clientSession.createQueue(qName, qName, null, false, false);
+   }
+
+   private static class DummyMessageHandler implements MessageHandler
+   {
+      ArrayList<ClientMessage> list = new ArrayList<ClientMessage>();
+
+      private CountDownLatch latch;
+
+      private final boolean acknowledge;
+
+      public DummyMessageHandler(CountDownLatch latch, boolean acknowledge)
+      {
+         this.latch = latch;
+         this.acknowledge = acknowledge;
+      }
+
+      public void onMessage(ClientMessage message)
+      {
+         list.add(message);
+         if (acknowledge)
+         {
+            try
+         {
+            message.acknowledge();
+         }
+         catch (MessagingException e)
+            {
+               //ignore
+            }
+         }
+         latch.countDown();
+      }
+
+      public void reset(CountDownLatch latch)
+      {
+         list.clear();
+         this.latch = latch;
+      }
+   }
+}

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/server/DeadLetterAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/DeadLetterAddressTest.java	2009-03-25 10:19:56 UTC (rev 6158)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/DeadLetterAddressTest.java	2009-03-25 10:21:52 UTC (rev 6159)
@@ -1,284 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.server;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.server.Messaging;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.core.transaction.impl.XidImpl;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.utils.SimpleString;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class DeadLetterAddressTest extends UnitTestCase
-{
-   private MessagingService messagingService;
-
-   private ClientSession clientSession;
-
-   public void testBasicSend() throws Exception
-   {
-      Xid xid = new XidImpl("bq".getBytes(), 0, "gt".getBytes());
-      SimpleString dla = new SimpleString("DLA");
-      SimpleString qName = new SimpleString("q1");
-      AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setMaxDeliveryAttempts(1);
-      addressSettings.setDeadLetterAddress(dla);
-      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
-      SimpleString dlq = new SimpleString("DLQ1");
-      clientSession.createQueue(dla, dlq, null, false, false);
-      clientSession.createQueue(qName, qName, null, false, false);
-      ClientProducer producer = clientSession.createProducer(qName);
-      producer.send(createTextMessage("heyho!", clientSession));
-      clientSession.start();
-      clientSession.start(xid, XAResource.TMNOFLAGS);
-      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
-      ClientMessage m = clientConsumer.receive(500);
-      m.acknowledge();
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "heyho!");
-      //force a cancel
-      clientSession.end(xid, XAResource.TMSUCCESS);
-      clientSession.rollback(xid);
-      m = clientConsumer.receive(500);
-      assertNull(m);
-      clientConsumer.close();
-      clientConsumer = clientSession.createConsumer(dlq);
-      m = clientConsumer.receive(500);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "heyho!");
-   }
-
-   public void testBasicSendToMultipleQueues() throws Exception
-   {
-      Xid xid = new XidImpl("bq".getBytes(), 0, "gt".getBytes());
-      SimpleString dla = new SimpleString("DLA");
-      SimpleString qName = new SimpleString("q1");
-      AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setMaxDeliveryAttempts(1);
-      addressSettings.setDeadLetterAddress(dla);
-      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
-      SimpleString dlq = new SimpleString("DLQ1");
-      SimpleString dlq2 = new SimpleString("DLQ2");
-      clientSession.createQueue(dla, dlq, null, false, false);
-      clientSession.createQueue(dla, dlq2, null, false, false);
-      clientSession.createQueue(qName, qName, null, false, false);
-      ClientProducer producer = clientSession.createProducer(qName);
-      producer.send(createTextMessage("heyho!", clientSession));
-      clientSession.start();
-      clientSession.start(xid, XAResource.TMNOFLAGS);
-      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
-      ClientMessage m = clientConsumer.receive(500);
-      m.acknowledge();
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "heyho!");
-      //force a cancel
-      clientSession.end(xid, XAResource.TMSUCCESS);
-      clientSession.rollback(xid);
-      clientSession.start(xid, XAResource.TMNOFLAGS);
-      m = clientConsumer.receive(500);
-      assertNull(m);
-      clientConsumer.close();
-      clientConsumer = clientSession.createConsumer(dlq);
-      m = clientConsumer.receive(500);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "heyho!");
-      clientConsumer.close();
-      clientConsumer = clientSession.createConsumer(dlq2);
-      m = clientConsumer.receive(500);
-      assertNotNull(m);
-      m.acknowledge();
-      assertEquals(m.getBody().readString(), "heyho!");
-      clientConsumer.close();
-   }
-
-   public void testBasicSendToNoQueue() throws Exception
-   {
-      Xid xid = new XidImpl("bq".getBytes(), 0, "gt".getBytes());
-      SimpleString qName = new SimpleString("q1");
-      AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setMaxDeliveryAttempts(1);
-      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
-      clientSession.createQueue(qName, qName, null, false, false);
-      ClientProducer producer = clientSession.createProducer(qName);
-      producer.send(createTextMessage("heyho!", clientSession));
-      clientSession.start();
-      clientSession.start(xid, XAResource.TMNOFLAGS);
-      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
-      ClientMessage m = clientConsumer.receive(500);
-      m.acknowledge();
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "heyho!");
-      //force a cancel
-      clientSession.end(xid, XAResource.TMSUCCESS);
-      clientSession.rollback(xid);
-      m = clientConsumer.receive(500);
-      assertNull(m);
-      clientConsumer.close();
-   }
-
-   public void testHeadersSet() throws Exception
-   {
-      final int MAX_DELIVERIES = 16;
-      final int NUM_MESSAGES = 5;
-      Xid xid = new XidImpl("bq".getBytes(), 0, "gt".getBytes());
-      SimpleString dla = new SimpleString("DLA");
-      SimpleString qName = new SimpleString("q1");
-      AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setMaxDeliveryAttempts(MAX_DELIVERIES);
-      addressSettings.setDeadLetterAddress(dla);
-      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
-      SimpleString dlq = new SimpleString("DLQ1");
-      clientSession.createQueue(dla, dlq, null, false, false);
-      clientSession.createQueue(qName, qName, null, false, false);
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-      ClientSession sendSession = sessionFactory.createSession(false, true, true);
-      ClientProducer producer = sendSession.createProducer(qName);
-      Map<String, Long> origIds = new HashMap<String, Long>();
-
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage tm = createTextMessage("Message:" + i, clientSession);
-         producer.send(tm);
-      }
-
-      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
-      clientSession.start();
-
-      for (int i = 0; i < MAX_DELIVERIES; i++)
-      {
-         clientSession.start(xid, XAResource.TMNOFLAGS);
-         for (int j = 0; j < NUM_MESSAGES; j++)
-         {
-            ClientMessage tm = clientConsumer.receive(1000);
-
-            assertNotNull(tm);
-            tm.acknowledge();
-            if(i == 0)
-            {
-               origIds.put("Message:" + j, tm.getMessageID());
-            }
-            assertEquals("Message:" + j, tm.getBody().readString());
-         }
-         clientSession.end(xid, XAResource.TMSUCCESS);
-         clientSession.rollback(xid);
-      }
-      
-      assertEquals(0, ((Queue)messagingService.getServer().getPostOffice().getBinding(qName).getBindable()).getMessageCount());
-      ClientMessage m = clientConsumer.receive(1000);
-      assertNull(m);
-      //All the messages should now be in the DLQ
-
-      ClientConsumer cc3 = clientSession.createConsumer(dlq);
-
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage tm = cc3.receive(1000);
-
-         assertNotNull(tm);
-
-         String text = tm.getBody().readString();
-         assertEquals("Message:" + i, text);
-
-         // Check the headers
-         SimpleString origDest =
-               (SimpleString) tm.getProperty(MessageImpl.HDR_ORIGINAL_DESTINATION);
-
-         Long origMessageId =
-               (Long) tm.getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
-
-         assertEquals(qName, origDest);
-
-         Long origId = origIds.get(text);
-
-         assertEquals(origId, origMessageId);
-      }
-
-   }
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-      
-      ConfigurationImpl configuration = new ConfigurationImpl();
-      configuration.setSecurityEnabled(false);
-      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      messagingService = Messaging.newNullStorageMessagingService(configuration);
-      //start the server
-      messagingService.start();
-      //then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-      clientSession = sessionFactory.createSession(true, true, false);
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      if (clientSession != null)
-      {
-         try
-         {
-            clientSession.close();
-         }
-         catch (MessagingException e1)
-         {
-            //
-         }
-      }
-      if (messagingService != null && messagingService.isStarted())
-      {
-         try
-         {
-            messagingService.stop();
-         }
-         catch (Exception e1)
-         {
-            //
-         }
-      }
-      messagingService = null;
-      clientSession = null;
-      super.tearDown();
-   }
-
-}

Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryAddressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryAddressTest.java	2009-03-25 10:19:56 UTC (rev 6158)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/server/ExpiryAddressTest.java	2009-03-25 10:21:52 UTC (rev 6159)
@@ -1,266 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.tests.integration.server;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
-import org.jboss.messaging.core.server.Messaging;
-import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public class ExpiryAddressTest extends UnitTestCase
-{
-   private static final Logger log = Logger.getLogger(ExpiryAddressTest.class);
-
-   private MessagingService messagingService;
-
-   private ClientSession clientSession;
-
-   public void testBasicSend() throws Exception
-   {
-      SimpleString ea = new SimpleString("EA");
-      SimpleString qName = new SimpleString("q1");
-      SimpleString eq = new SimpleString("EA1");
-      AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setExpiryAddress(ea);
-      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
-      clientSession.createQueue(ea, eq, null, false, false);
-      clientSession.createQueue(qName, qName, null, false, false);
-      
-      ClientProducer producer = clientSession.createProducer(qName);
-      ClientMessage clientMessage = createTextMessage("heyho!", clientSession);
-      clientMessage.setExpiration(System.currentTimeMillis());
-      producer.send(clientMessage);
-      
-      clientSession.start();
-      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
-      ClientMessage m = clientConsumer.receive(500);
-      assertNull(m);
-      System.out.println("size3 = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
-      m = clientConsumer.receive(500);
-      assertNull(m);
-      clientConsumer.close();
-      clientConsumer = clientSession.createConsumer(eq);
-      m = clientConsumer.receive(500);
-      assertNotNull(m);
-      assertEquals(m.getBody().readString(), "heyho!");
-      m.acknowledge();
-      
-      // PageSize should be the same as when it started
-      assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
-   }
-
-   public void testBasicSendToMultipleQueues() throws Exception
-   {
-      SimpleString ea = new SimpleString("EA");
-      SimpleString qName = new SimpleString("q1");
-      SimpleString eq = new SimpleString("EQ1");
-      SimpleString eq2 = new SimpleString("EQ2");
-      AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setExpiryAddress(ea);
-      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
-      clientSession.createQueue(ea, eq, null, false, false);
-      clientSession.createQueue(ea, eq2, null, false, false);
-      clientSession.createQueue(qName, qName, null, false, false);
-      ClientProducer producer = clientSession.createProducer(qName);
-      ClientMessage clientMessage = createTextMessage("heyho!", clientSession);
-      clientMessage.setExpiration(System.currentTimeMillis());
-      
-      System.out.println("initialPageSize = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
-      
-      producer.send(clientMessage);
-      
-      System.out.println("pageSize after message sent = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
-      
-      clientSession.start();
-      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
-      ClientMessage m = clientConsumer.receive(500);
-      
-      System.out.println("pageSize after message received = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
-      
-      assertNull(m);
-      
-      clientConsumer.close();
-      
-      clientConsumer = clientSession.createConsumer(eq);
-      
-      m = clientConsumer.receive(500);
-      
-      assertNotNull(m);
-      
-      log.info("acking");
-      m.acknowledge();
-      
-      assertEquals(m.getBody().readString(), "heyho!");
-      
-      clientConsumer.close();
-      
-      clientConsumer = clientSession.createConsumer(eq2);
-      
-      m = clientConsumer.receive(500);
-      
-      assertNotNull(m);
-      
-      log.info("acking");
-      m.acknowledge();
-      
-      assertEquals(m.getBody().readString(), "heyho!");
-      
-      clientConsumer.close();
-      
-      clientSession.commit();
-
-      // PageGlobalSize should be untouched as the message expired
-      assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
-   }
-
-   public void testBasicSendToNoQueue() throws Exception
-   {
-      SimpleString ea = new SimpleString("EA");
-      SimpleString qName = new SimpleString("q1");
-      SimpleString eq = new SimpleString("EQ1");
-      SimpleString eq2 = new SimpleString("EQ2");
-      clientSession.createQueue(ea, eq, null, false, false);
-      clientSession.createQueue(ea, eq2, null, false, false);
-      clientSession.createQueue(qName, qName, null, false, false);
-      ClientProducer producer = clientSession.createProducer(qName);
-      ClientMessage clientMessage = createTextMessage("heyho!", clientSession);
-      clientMessage.setExpiration(System.currentTimeMillis());
-      producer.send(clientMessage);
-      clientSession.start();
-      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
-      ClientMessage m = clientConsumer.receive(500);
-      assertNull(m);
-      clientConsumer.close();
-   }
-
-   public void testHeadersSet() throws Exception
-   {
-      final int NUM_MESSAGES = 5;
-      SimpleString ea = new SimpleString("DLA");
-      SimpleString qName = new SimpleString("q1");
-      AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setExpiryAddress(ea);
-      messagingService.getServer().getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
-      SimpleString eq = new SimpleString("EA1");
-      clientSession.createQueue(ea, eq, null, false, false);
-      clientSession.createQueue(qName, qName, null, false, false);
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-      ClientSession sendSession = sessionFactory.createSession(false, true, true);
-      ClientProducer producer = sendSession.createProducer(qName);
-
-      long expiration = System.currentTimeMillis();
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage tm = createTextMessage("Message:" + i, clientSession);
-         tm.setExpiration(expiration);
-         producer.send(tm);
-      }
-
-      ClientConsumer clientConsumer = clientSession.createConsumer(qName);
-      clientSession.start();
-      ClientMessage m = clientConsumer.receive(1000);
-      assertNull(m);
-      // All the messages should now be in the EQ
-
-      ClientConsumer cc3 = clientSession.createConsumer(eq);
-
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage tm = cc3.receive(1000);
-
-         assertNotNull(tm);
-
-         String text = tm.getBody().readString();
-         assertEquals("Message:" + i, text);
-
-         // Check the headers
-         Long actualExpiryTime = (Long)tm.getProperty(HDR_ACTUAL_EXPIRY_TIME);
-         assertTrue(actualExpiryTime >= expiration);
-      }
-
-   }
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-      
-      ConfigurationImpl configuration = new ConfigurationImpl();
-      configuration.setSecurityEnabled(false);
-      TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
-      configuration.getAcceptorConfigurations().add(transportConfig);
-      messagingService = Messaging.newNullStorageMessagingService(configuration);
-      // start the server
-      messagingService.start();
-      // then we create a client as normal
-      ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
-      sessionFactory.setBlockOnAcknowledge(true); // There are assertions over sizes that needs to be done after the ACK was received on server
-      clientSession = sessionFactory.createSession(null, null, false, true, true, false, 0);
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      if (clientSession != null)
-      {
-         try
-         {
-            clientSession.close();
-         }
-         catch (MessagingException e1)
-         {
-            //
-         }
-      }
-      if (messagingService != null && messagingService.isStarted())
-      {
-         try
-         {
-            messagingService.stop();
-         }
-         catch (Exception e1)
-         {
-            //
-         }
-      }
-      messagingService = null;
-      clientSession = null;
-      
-      super.tearDown();
-   }
-
-}




More information about the jboss-cvs-commits mailing list