[jboss-cvs] JBoss Messaging SVN: r5308 - in trunk: src/main/org/jboss/messaging/core/server/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 7 06:39:44 EST 2008


Author: timfox
Date: 2008-11-07 06:39:43 -0500 (Fri, 07 Nov 2008)
New Revision: 5308

Added:
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DLQTest.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverNoSessionsFailoverTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java
Log:
More tests and tweaks


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-11-07 11:39:43 UTC (rev 5308)
@@ -86,7 +86,7 @@
       this.rateLimiter = rateLimiter;
 
       this.blockOnNonPersistentSend = blockOnNonPersistentSend;
-
+      
       this.blockOnPersistentSend = blockOnPersistentSend;
 
       this.autoGroupId = autoGroupId;
@@ -230,7 +230,7 @@
       }
 
       boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
-
+      
       SessionSendMessage message;
       
       // check to see if this message need to be scheduled.

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-07 11:39:43 UTC (rev 5308)
@@ -424,7 +424,7 @@
          // If the producer is not auto-commit sends then messages are never
          // sent blocking - there is no point
          // since commit, prepare or rollback will flush any messages sent.
-
+ 
          producer = new ClientProducerImpl(this,
                                            idGenerator.generateID(),
                                            address,

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java	2008-11-07 11:39:43 UTC (rev 5308)
@@ -243,6 +243,7 @@
       Transaction tx = new TransactionImpl(persistenceManager, postOffice);
 
       ServerMessage copyMessage = makeCopy(expiry, persistenceManager);
+      
       copyMessage.setDestination(otherBinding.getAddress());
 
       tx.addMessage(copyMessage);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-07 11:39:43 UTC (rev 5308)
@@ -510,7 +510,7 @@
                                                      final boolean autoCommitAcks,
                                                      final boolean xa,
                                                      final int sendWindowSize) throws Exception
-   {
+   {      
       checkActivate(connection);
 
       return doCreateSession(name,
@@ -577,8 +577,6 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   private final Object createSessionLock = new Object();
-
    private CreateSessionResponseMessage doCreateSession(final String name,
                                                         final long channelID,
                                                         final String username,
@@ -646,7 +644,7 @@
       channel.setHandler(handler);
 
       connection.addFailureListener(session);
-
+      
       return new CreateSessionResponseMessage(version.getIncrementingVersion());
    }
 

Added: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DLQTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DLQTest.java	                        (rev 0)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DLQTest.java	2008-11-07 11:39:43 UTC (rev 5308)
@@ -0,0 +1,834 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A DLQTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 3674 $</tt>
+ *          <p/>
+ *          $Id: DLQTest.java 3674 2008-02-07 18:21:26Z timfox $
+ */
+public class DLQTest extends JMSTestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public DLQTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void testDefaultAndOverrideDLQ() throws Exception
+   {
+      if (ServerManagement.isRemote())
+      {
+         return;
+      }
+
+      final int NUM_MESSAGES = 5;
+
+      final int MAX_DELIVERIES = 8;
+
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+      String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=Queue1";
+
+      Connection conn = null;
+
+      try
+      {
+         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+         String overrideDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue3";
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+         conn = cf.createConnection();
+
+         {
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageProducer prod = sess.createProducer(queue1);
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = sess.createTextMessage("Message:" + i);
+
+               prod.send(tm);
+            }
+
+            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            MessageConsumer cons = sess2.createConsumer(queue1);
+
+            conn.start();
+
+            for (int i = 0; i < MAX_DELIVERIES; i++)
+            {
+               for (int j = 0; j < NUM_MESSAGES; j++)
+               {
+                  TextMessage tm = (TextMessage) cons.receive(1000);
+
+                  assertNotNull(tm);
+
+                  assertEquals("Message:" + j, tm.getText());
+               }
+
+               sess2.recover();
+            }
+
+            //Prompt them to go to DLQ
+            cons.receive(100);
+
+            //At this point all the messages have been delivered exactly MAX_DELIVERIES times 
+
+            checkEmpty(queue1);
+
+            //Now should be in default dlq
+
+            MessageConsumer cons3 = sess.createConsumer(queue2);
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = (TextMessage) cons3.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("Message:" + i, tm.getText());
+            }
+
+            conn.close();
+         }
+
+
+         {
+            //Now try with overriding the default dlq
+
+            conn = cf.createConnection();
+
+            ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", overrideDLQObjectName);
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageProducer prod = sess.createProducer(queue1);
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = sess.createTextMessage("Message:" + i);
+
+               prod.send(tm);
+            }
+
+            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            MessageConsumer cons = sess2.createConsumer(queue1);
+
+            conn.start();
+
+            for (int i = 0; i < MAX_DELIVERIES; i++)
+            {
+               for (int j = 0; j < NUM_MESSAGES; j++)
+               {
+                  TextMessage tm = (TextMessage) cons.receive(1000);
+
+                  assertNotNull(tm);
+
+                  assertEquals("Message:" + j, tm.getText());
+               }
+
+               sess2.recover();
+            }
+
+            cons.receive(100);
+
+            //At this point all the messages have been delivered exactly MAX_DELIVERIES times 
+
+            checkEmpty(queue1);
+
+            //Now should be in override dlq
+
+            MessageConsumer cons3 = sess.createConsumer(queue3);
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = (TextMessage) cons3.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("Message:" + i, tm.getText());
+            }
+         }
+      }
+      finally
+      {
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=DLQ");
+
+         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+
+   public void testWithMessageListenerPersistent() throws Exception
+   {
+      testWithMessageListener(true);
+   }
+
+   public void testWithMessageListenerNonPersistent() throws Exception
+   {
+      testWithMessageListener(false);
+   }
+
+   public void testWithReceiveClientAckPersistent() throws Exception
+   {
+      this.testWithReceiveClientAck(true);
+   }
+
+   public void testWithReceiveClientAckNonPersistent() throws Exception
+   {
+      testWithReceiveClientAck(false);
+   }
+
+   public void testWithReceiveTransactionalPersistent() throws Exception
+   {
+      this.testWithReceiveTransactional(true);
+   }
+
+   public void testWithReceiveTransactionalNonPersistent() throws Exception
+   {
+      testWithReceiveTransactional(false);
+   }
+
+   public void testHeadersSet() throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=Queue2");
+
+         final int MAX_DELIVERIES = 16;
+
+         final int NUM_MESSAGES = 5;
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+         int maxRedeliveryAttempts =
+                 ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+         conn = cf.createConnection();
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(queue1);
+
+         Map origIds = new HashMap();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+
+            prod.send(tm);
+
+            origIds.put(tm.getText(), tm.getJMSMessageID());
+         }
+
+         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageConsumer cons = sess2.createConsumer(queue1);
+
+         conn.start();
+
+         for (int i = 0; i < MAX_DELIVERIES; i++)
+         {
+            for (int j = 0; j < NUM_MESSAGES; j++)
+            {
+               TextMessage tm = (TextMessage) cons.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("Message:" + j, tm.getText());
+            }
+
+            sess2.rollback();
+         }
+
+         //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
+         //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
+
+         checkEmpty(queue2);
+
+         // So let's try and consume them - this should cause them to go to the DLQ - since they
+         // will then exceed max delivery attempts
+         Message m = cons.receive(100);
+
+         assertNull(m);
+
+         //All the messages should now be in the DLQ
+
+         MessageConsumer cons3 = sess.createConsumer(queue2);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons3.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message:" + i, tm.getText());
+
+            // Check the headers
+            String origDest =
+                    tm.getStringProperty(MessageImpl.HDR_ORIGIN_QUEUE.toString());
+
+            String origMessageId =
+                    tm.getStringProperty(MessageImpl.HDR_ORIG_MESSAGE_ID.toString());
+
+            assertEquals(queue1.toString(), origDest);
+
+            String origId = (String) origIds.get(tm.getText());
+
+            assertEquals(origId, origMessageId);
+         }
+      }
+      finally
+      {
+         if (conn != null) conn.close();
+      }
+   }
+
+   public void testOverrideDefaultMaxDeliveryAttemptsForQueue() throws Exception
+   {
+      int md = getDefaultMaxDeliveryAttempts();
+      try
+      {
+         int maxDeliveryAttempts = md - 5;
+         setMaxDeliveryAttempts(
+                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+                 maxDeliveryAttempts);
+         testMaxDeliveryAttempts(queue1, maxDeliveryAttempts, true);
+      }
+      finally
+      {
+         setMaxDeliveryAttempts(
+                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+                 md);
+      }
+   }
+
+   public void testOverrideDefaultMaxDeliveryAttemptsForTopic() throws Exception
+   {
+      int md = getDefaultMaxDeliveryAttempts();
+      try
+      {
+         int maxDeliveryAttempts = md - 5;
+         setMaxDeliveryAttempts(
+                 new ObjectName("jboss.messaging.destination:service=Topic,name=Topic1"),
+                 maxDeliveryAttempts);
+
+         testMaxDeliveryAttempts(topic1, maxDeliveryAttempts, false);
+      }
+      finally
+      {
+         setMaxDeliveryAttempts(
+                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+                 md);
+      }
+   }
+
+   public void testUseDefaultMaxDeliveryAttemptsForQueue() throws Exception
+   {
+      int md = getDefaultMaxDeliveryAttempts();
+      try
+      {
+         setMaxDeliveryAttempts(
+                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+                 -1);
+
+         // Check that defaultMaxDeliveryAttempts takes effect
+         testMaxDeliveryAttempts(queue1, getDefaultMaxDeliveryAttempts(), true);
+      }
+      finally
+      {
+         setMaxDeliveryAttempts(
+                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+                 md);
+      }
+   }
+
+   public void testUseDefaultMaxDeliveryAttemptsForTopic() throws Exception
+   {
+      int md = getDefaultMaxDeliveryAttempts();
+      try
+      {
+         setMaxDeliveryAttempts(
+                 new ObjectName("jboss.messaging.destination:service=Topic,name=Topic1"),
+                 -1);
+
+         // Check that defaultMaxDeliveryAttempts takes effect
+         testMaxDeliveryAttempts(topic1, getDefaultMaxDeliveryAttempts(), false);
+      }
+      finally
+      {
+         setMaxDeliveryAttempts(
+                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+                 md);
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void testWithMessageListener(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+         final int MAX_DELIVERIES = 16;
+
+         final int NUM_MESSAGES = 5;
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+         int maxRedeliveryAttempts =
+                 ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+         conn = cf.createConnection();
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(queue1);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+
+            prod.send(tm);
+         }
+
+         MessageConsumer cons = sess.createConsumer(queue1);
+
+         FailingMessageListener listener = new FailingMessageListener(MAX_DELIVERIES * NUM_MESSAGES);
+
+         cons.setMessageListener(listener);
+
+         conn.start();
+
+         listener.waitForMessages();
+
+         assertEquals(MAX_DELIVERIES * NUM_MESSAGES, listener.deliveryCount);
+
+         //Message should all be in the dlq - let's check
+
+         MessageConsumer cons2 = sess.createConsumer(queue2);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons2.receive(1000);
+
+            assertNotNull(tm);
+
+            log.info("Got mnessage" + tm);
+
+            assertEquals("Message:" + i, tm.getText());
+         }
+
+         checkEmpty(queue1);
+      }
+      finally
+      {
+         if (conn != null) conn.close();
+      }
+   }
+
+
+   protected void testWithReceiveClientAck(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+         final int MAX_DELIVERIES = 16;
+
+         final int NUM_MESSAGES = 5;
+
+         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+         int maxRedeliveryAttempts =
+                 ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+         conn = cf.createConnection();
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(queue1);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+
+            prod.send(tm);
+         }
+
+         Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         MessageConsumer cons = sess2.createConsumer(queue1);
+
+         conn.start();
+
+         for (int i = 0; i < MAX_DELIVERIES; i++)
+         {
+            for (int j = 0; j < NUM_MESSAGES; j++)
+            {
+               TextMessage tm = (TextMessage) cons.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("Message:" + j, tm.getText());
+            }
+
+            sess2.recover();
+         }
+
+         //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
+         //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
+
+         checkEmpty(queue2);
+
+         //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
+         //delivery attempts
+
+         Message m = cons.receive(100);
+
+         assertNull(m);
+
+         //Now, all the messages should now be in the DLQ
+
+         MessageConsumer cons3 = sess.createConsumer(queue2);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons3.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message:" + i, tm.getText());
+         }
+
+         //No more should be available
+
+         cons.close();
+
+         checkEmpty(queue1);
+      }
+      finally
+      {
+         destroyQueue("DLQ");
+
+         if (conn != null) conn.close();
+      }
+   }
+
+   protected void testWithReceiveTransactional(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+         final int MAX_DELIVERIES = 16;
+
+         final int NUM_MESSAGES = 5;
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+         int maxRedeliveryAttempts =
+                 ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+         conn = cf.createConnection();
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(queue1);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+
+            prod.send(tm);
+         }
+
+         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageConsumer cons = sess2.createConsumer(queue1);
+
+         conn.start();
+
+         for (int i = 0; i < MAX_DELIVERIES; i++)
+         {
+            for (int j = 0; j < NUM_MESSAGES; j++)
+            {
+               TextMessage tm = (TextMessage) cons.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("Message:" + j, tm.getText());
+            }
+
+            sess2.rollback();
+         }
+
+         //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
+         //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
+
+         checkEmpty(queue2);
+
+         //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
+         //delivery attempts
+         Message m = cons.receive(100);
+
+         assertNull(m);
+
+         //All the messages should now be in the DLQ
+
+         MessageConsumer cons3 = sess.createConsumer(queue2);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons3.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message:" + i, tm.getText());
+         }
+
+         //No more should be available
+
+         checkEmpty(queue1);
+      }
+      finally
+      {
+         destroyQueue("DLQ");
+
+         if (conn != null) conn.close();
+      }
+   }
+
+   protected int getDefaultMaxDeliveryAttempts() throws Exception
+   {
+      return ((Integer) ServerManagement.getAttribute(
+              ServerManagement.getServerPeerObjectName(),
+              "DefaultMaxDeliveryAttempts"))
+              .intValue();
+   }
+
+   protected void setMaxDeliveryAttempts(ObjectName dest, int maxDeliveryAttempts) throws Exception
+   {
+      ServerManagement.setAttribute(dest, "MaxDeliveryAttempts",
+              Integer.toString(maxDeliveryAttempts));
+   }
+
+   protected void testMaxDeliveryAttempts(Destination destination, int destMaxDeliveryAttempts, boolean queue) throws Exception
+   {
+      Connection conn = cf.createConnection();
+
+      if (!queue)
+      {
+         conn.setClientID("wib123");
+      }
+
+      try
+      {
+         ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(),
+                 "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=Queue2");
+
+         // Create the consumer before the producer so that the message we send doesn't
+         // get lost if the destination is a Topic.
+         Session consumingSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer destinationConsumer;
+
+         if (queue)
+         {
+            destinationConsumer = consumingSession.createConsumer(destination);
+         }
+         else
+         {
+            //For topics we only keep a delivery record on the server side for durable subs
+            destinationConsumer = consumingSession.createDurableSubscriber((Topic) destination, "testsub1");
+         }
+
+         {
+            Session producingSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer prod = producingSession.createProducer(destination);
+            TextMessage tm = producingSession.createTextMessage("Message");
+            prod.send(tm);
+         }
+
+         conn.start();
+
+         // Make delivery attempts up to the maximum. The message should not end up in the DLQ.
+         for (int i = 0; i < destMaxDeliveryAttempts; i++)
+         {
+            TextMessage tm = (TextMessage) destinationConsumer.receive(1000);
+            assertNotNull("No message received on delivery attempt number " + (i + 1), tm);
+            assertEquals("Message", tm.getText());
+            consumingSession.recover();
+         }
+
+         // At this point the message should not yet be in the DLQ
+         checkEmpty(queue2);
+
+         // Now we try to consume the message again from the destination, which causes it
+         // to go to the DLQ instead.
+         Message m = destinationConsumer.receive(100);
+         assertNull(m);
+
+         // The message should be in the DLQ now
+         MessageConsumer dlqConsumer = consumingSession.createConsumer(queue2);
+         m = dlqConsumer.receive(1000);
+         assertNotNull(m);
+         assertTrue(m instanceof TextMessage);
+         assertEquals("Message", ((TextMessage) m).getText());
+
+         m.acknowledge();
+
+         if (!queue)
+         {
+            destinationConsumer.close();
+
+            consumingSession.unsubscribe("testsub1");
+         }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   class FailingMessageListener implements MessageListener
+   {
+      volatile int deliveryCount;
+
+      int numMessages;
+
+      FailingMessageListener(int numMessages)
+      {
+         this.numMessages = numMessages;
+      }
+
+      synchronized void waitForMessages() throws Exception
+      {
+         while (deliveryCount != numMessages)
+         {
+            this.wait();
+         }
+      }
+
+      public synchronized void onMessage(Message msg)
+      {
+         deliveryCount++;
+
+         this.notify();
+
+         throw new RuntimeException("Your mum!");
+      }
+
+   }
+
+}

Added: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java	                        (rev 0)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java	2008-11-07 11:39:43 UTC (rev 5308)
@@ -0,0 +1,621 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A ExpiryQueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 3614 $</tt>
+ *          <p/>
+ *          $Id: ExpiryQueueTest.java 3614 2008-01-22 16:41:40Z timfox $
+ */
+public class ExpiryQueueTest extends JMSTestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ExpiryQueueTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void testExpiryQueueAlreadyDeployed() throws Exception
+   {
+      if (ServerManagement.isRemote())
+      {
+         return;
+      }
+
+      //deployQueue("ExpiryQueue");
+
+      /*ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+      ObjectName expiryQueueObjectName = (ObjectName)ServerManagement.getAttribute(serverPeerObjectName, "DefaultExpiryQueue");
+
+      assertNotNull(expiryQueueObjectName);
+
+      String name = (String)ServerManagement.getAttribute(expiryQueueObjectName, "Name");
+
+      assertNotNull(name);
+
+      assertEquals("ExpiryQueue", name);
+
+      String jndiName = (String)ServerManagement.getAttribute(expiryQueueObjectName, "JNDIName");
+
+      assertNotNull(jndiName);
+
+      assertEquals("/queue/ExpiryQueue", jndiName);
+
+      org.jboss.messaging.core.contract.Queue expiryQueue = ServerManagement.getServer().getServerPeer().getDefaultExpiryQueueInstance();
+
+      assertNotNull(expiryQueue);*/
+
+      JBossQueue q = (JBossQueue) ic.lookup("/queue/ExpiryQueue");
+
+      assertNotNull(q);
+
+      assertEquals("ExpiryQueue", q.getName());
+
+   }
+
+
+   public void testDefaultAndOverrideExpiryQueue() throws Exception
+   {
+      final int NUM_MESSAGES = 5;
+
+      Connection conn = null;
+
+      //ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+      try
+      {
+         createQueue("DefaultExpiry");
+
+         createQueue("OverrideExpiry");
+
+         createQueue("expTestQueue");
+
+         String defaultExpiryObjectName = "DefaultExpiry";
+
+         String overrideExpiryObjectName = "OverrideExpiry";
+
+         String testQueueObjectName = "expTestQueue";
+
+         //ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
+
+         //ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", "");
+
+         Queue testQueue = (Queue) ic.lookup("/queue/expTestQueue");
+
+         Queue defaultExpiry = (Queue) ic.lookup("/queue/DefaultExpiry");
+
+         Queue overrideExpiry = (Queue) ic.lookup("/queue/OverrideExpiry");
+
+         conn = cf.createConnection();
+
+         {
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageProducer prod = sess.createProducer(testQueue);
+
+            conn.start();
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = sess.createTextMessage("Message:" + i);
+
+               //Send messages with time to live of 2000 enough time to get to client consumer - so 
+               //they won't be expired on the server side
+               prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
+            }
+
+            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            MessageConsumer cons = sess2.createConsumer(testQueue);
+
+            //The messages should now be sitting in the consumer buffer
+
+            //Now give them enough time to expire
+
+            Thread.sleep(2500);
+
+            //Now try and receive
+
+            Message m = cons.receive(1000);
+
+            assertNull(m);
+
+            //Message should all be in the default expiry queue - let's check
+
+            MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = (TextMessage) cons3.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("Message:" + i, tm.getText());
+            }
+
+            conn.close();
+         }
+
+         //now try with overriding the default expiry queue
+         {
+            ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", overrideExpiryObjectName);
+
+            conn = cf.createConnection();
+
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageProducer prod = sess.createProducer(testQueue);
+
+            conn.start();
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = sess.createTextMessage("Message:" + i);
+
+               //Send messages with time to live of 2000 enough time to get to client consumer - so 
+               //they won't be expired on the server side
+               prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
+            }
+
+            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            MessageConsumer cons = sess2.createConsumer(testQueue);
+
+            //The messages should now be sitting in the consumer buffer
+
+            //Now give them enough time to expire
+
+            Thread.sleep(2500);
+
+            //Now try and receive
+
+            Message m = cons.receive(1000);
+
+            assertNull(m);
+
+            //Message should all be in the override expiry queue - let's check
+
+            MessageConsumer cons3 = sess.createConsumer(overrideExpiry);
+
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = (TextMessage) cons3.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("Message:" + i, tm.getText());
+            }
+         }
+      }
+      finally
+      { //
+         //ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
+
+         destroyQueue("DefaultDLQ");
+
+         destroyQueue("OverrideDLQ");
+
+         destroyQueue("TestQueue");
+
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   public void testExpireSameMessagesMultiple() throws Exception
+   {
+      final int NUM_MESSAGES = 5;
+
+      Connection conn = null;
+
+      try
+      {
+         createQueue("ExpiryQueue");
+
+         String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=ExpiryQueue";
+
+         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
+
+         Queue defaultExpiry = (Queue) ic.lookup("/queue/ExpiryQueue");
+
+         conn = cf.createConnection();
+
+         conn.setClientID("wib1");
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(topic1);
+
+         conn.start();
+
+         //Create 3 durable subscriptions
+
+         MessageConsumer sub1 = sess.createDurableSubscriber(topic1, "sub1");
+
+         MessageConsumer sub2 = sess.createDurableSubscriber(topic1, "sub2");
+
+         MessageConsumer sub3 = sess.createDurableSubscriber(topic1, "sub3");
+
+         Map origIds = new HashMap();
+
+         long now = System.currentTimeMillis();
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+
+            //Send messages with time to live of 3000 enough time to get to client consumer - so 
+            //they won't be expired on the server side
+            prod.send(tm, DeliveryMode.PERSISTENT, 4, 3000);
+
+            origIds.put(tm.getText(), tm.getJMSMessageID());
+         }
+
+         long approxExpiry = now + 3000;
+
+         //Now sleep. This wil give them enough time to expire
+
+         Thread.sleep(3500);
+
+         //Now try and consume from each - this should force the message to the expiry queue
+
+         Message m = sub1.receive(500);
+         assertNull(m);
+
+         m = sub2.receive(500);
+         assertNull(m);
+
+         m = sub3.receive(500);
+         assertNull(m);
+
+         //Now the messages should all be in the expiry queue
+
+         MessageConsumer cons2 = sess.createConsumer(defaultExpiry);
+
+         while (true)
+         {
+            TextMessage tm = (TextMessage) cons2.receive(500);
+
+            if (tm == null)
+            {
+               break;
+            }
+
+            // Check the headers
+            String origDest =
+                    tm.getStringProperty(MessageImpl.HDR_ORIGIN_QUEUE.toString());
+
+            String origMessageId =
+                    tm.getStringProperty(MessageImpl.HDR_ORIG_MESSAGE_ID.toString());
+
+            long actualExpiryTime =
+                    tm.getLongProperty(MessageImpl.HDR_ACTUAL_EXPIRY_TIME.toString());
+
+            assertEquals(topic1.toString(), origDest);
+
+            String origId = (String) origIds.get(tm.getText());
+
+            assertEquals(origId, origMessageId);
+
+            assertTrue(actualExpiryTime >= approxExpiry);
+         }
+
+         cons2.close();
+
+         sub1.close();
+
+         sub2.close();
+
+         sub3.close();
+
+         sess.unsubscribe("sub1");
+
+         sess.unsubscribe("sub2");
+
+         sess.unsubscribe("sub3");
+
+      }
+      finally
+      {
+         destroyQueue("ExpiryQueue");
+
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   public void testWithMessageListenerPersistent() throws Exception
+   {
+      testWithMessageListener(true);
+   }
+
+   public void testWithMessageListenerNonPersistent() throws Exception
+   {
+      testWithMessageListener(false);
+   }
+
+   public void testWithReceivePersistent() throws Exception
+   {
+      this.testWithReceive(true);
+   }
+
+   public void testWithReceiveNonPersistent() throws Exception
+   {
+      testWithReceive(false);
+   }
+
+   public void testWithMessageListener(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+
+
+      Queue expiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
+
+      final int NUM_MESSAGES = 5;
+
+      conn = cf.createConnection();
+
+      conn.start();
+
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      MessageProducer prod = sess.createProducer(queue1);
+
+      int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         TextMessage tm = sess.createTextMessage("Message:" + i);
+
+         //Send messages with time to live of 2000 enough time to get to client consumer - so
+         //they won't be expired on the server side
+         prod.send(tm, deliveryMode, 4, 2000);
+      }
+
+      MessageConsumer cons = sess.createConsumer(queue1);
+
+      //The messages should now be sitting in the consumer buffer
+
+      //Now give them enough time to expire
+
+      Thread.sleep(2500);
+
+      //Now set a listener
+
+      FailingMessageListener listener = new FailingMessageListener();
+
+      cons.setMessageListener(listener);
+
+      Thread.sleep(1000);
+
+      cons.setMessageListener(null);
+
+      //No messages should have been received
+      assertEquals(0, listener.deliveryCount);
+
+      //Shouldn't be able to receive any more
+
+      Message m = cons.receive(1000);
+
+      assertNull(m);
+
+      //Message should all be in the expiry queue - let's check
+
+      MessageConsumer cons2 = sess.createConsumer(expiryQueue);
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         TextMessage tm = (TextMessage) cons2.receive(1000);
+
+         assertNotNull(tm);
+
+         assertEquals("Message:" + i, tm.getText());
+      }
+
+   }
+
+   public void testWithReceive(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+
+      try
+      {
+
+         createQueue("ExpiryQueue");
+
+         Queue expiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
+
+         final int NUM_MESSAGES = 5;
+
+         conn = cf.createConnection();
+
+         conn.start();
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(queue1);
+
+         int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+
+            //Send messages with time to live of 2000 enough time to get to client consumer - so 
+            //they won't be expired on the server side
+            prod.send(tm, deliveryMode, 4, 2000);
+         }
+
+         MessageConsumer cons = sess.createConsumer(queue1);
+
+         //The messages should now be sitting in the consumer buffer
+
+         //Now give them enough time to expire
+
+         Thread.sleep(2500);
+
+         //Now try and receive
+
+         Message m = cons.receive(1000);
+
+         assertNull(m);
+
+         //Message should all be in the expiry queue - let's check
+
+         MessageConsumer cons2 = sess.createConsumer(expiryQueue);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage) cons2.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message:" + i, tm.getText());
+         }
+
+      }
+      finally
+      {
+         destroyQueue("ExpiryQueue");
+
+         if (conn != null) conn.close();
+      }
+   }
+
+   public void testExpirationTransfer() throws Exception
+   {
+      createQueue("ExpiryQueue");
+
+      Object originalValue = ServerManagement.getAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue");
+
+      ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue", "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
+
+      Connection conn = null;
+
+      try
+      {
+         ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
+
+         conn = cf.createConnection();
+
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         conn.start();
+
+         MessageProducer prod = session.createProducer(queue1);
+         prod.setTimeToLive(100);
+
+         Message m = session.createTextMessage("This message will die");
+
+         prod.send(m);
+
+         // wait for the message to die
+         Thread.sleep(2000);
+
+         MessageConsumer cons = session.createConsumer(queue1);
+
+         assertNull(cons.receive(3000));
+
+         Queue queueExpiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
+
+         MessageConsumer consumerExpiredQueue = session.createConsumer(queueExpiryQueue);
+
+         TextMessage txt = (TextMessage) consumerExpiredQueue.receive(1000);
+
+         assertEquals("This message will die", txt.getText());
+
+         assertNull(consumerExpiredQueue.receive(1000));
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+
+         destroyQueue("ExpiryQueue");
+
+         ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue", originalValue.toString());
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   class FailingMessageListener implements MessageListener
+   {
+      volatile int deliveryCount;
+
+      public void onMessage(Message msg)
+      {
+         deliveryCount++;
+
+         throw new RuntimeException("Your mum!");
+      }
+
+   }
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverNoSessionsFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverNoSessionsFailoverTest.java	2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverNoSessionsFailoverTest.java	2008-11-07 11:39:43 UTC (rev 5308)
@@ -33,13 +33,10 @@
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
 import org.jboss.messaging.core.server.MessagingService;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java	2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java	2008-11-07 11:39:43 UTC (rev 5308)
@@ -40,7 +40,6 @@
 import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -62,8 +61,6 @@
 
    // Attributes ----------------------------------------------------
 
-   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
-
    private MessagingService liveService;
 
    private MessagingService backupService;

Added: trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java	2008-11-07 11:39:43 UTC (rev 5308)
@@ -0,0 +1,330 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.jms.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A JMSFailoverTest
+ *
+ * A simple test to test failover when using the JMS API.
+ * Most of the failover tests are done on the Core API.
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 7 Nov 2008 11:13:39
+ *
+ *
+ */
+public class JMSFailoverTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(JMSFailoverTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testAutomaticFailover() throws Exception
+   {
+      JBossConnectionFactory jbcf = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                               new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                          backupParams),
+                                                               ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+                                                               ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                                               null,
+                                                               1000,
+                                                               ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+                                                               ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+                                                               ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE,
+                                                               ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+                                                               ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+                                                               ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+                                                               ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+                                                               ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP_ID,
+                                                               ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+
+      Connection conn = jbcf.createConnection();
+
+      MyExceptionListener listener = new MyExceptionListener();
+
+      conn.setExceptionListener(listener);
+
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      ClientSession coreSession = ((JBossSession)sess).getCoreSession();
+
+      RemotingConnection coreConn = ((ClientSessionImpl)coreSession).getConnection();
+
+      SimpleString jmsQueueName = new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
+
+      coreSession.createQueue(jmsQueueName, jmsQueueName, null, false, false);
+
+      Queue queue = sess.createQueue("myqueue");
+
+      final int numMessages = 1000;
+
+      MessageProducer producer = sess.createProducer(queue);
+
+      MessageConsumer consumer = sess.createConsumer(queue);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         TextMessage tm = sess.createTextMessage("message" + i);
+
+         producer.send(tm);
+      }
+
+      conn.start();
+
+      MessagingException me = new MessagingException(MessagingException.NOT_CONNECTED);
+
+      coreConn.fail(me);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         TextMessage tm = (TextMessage)consumer.receive(1000);
+
+         assertNotNull(tm);
+
+         assertEquals("message" + i, tm.getText());
+      }
+
+      TextMessage tm = (TextMessage)consumer.receive(1000);
+
+      assertNull(tm);
+
+      conn.close();
+
+      assertNotNull(listener.e);
+
+      JMSException je = listener.e;
+
+      assertEquals(me, je.getCause());
+   }
+
+   public void testManualFailover() throws Exception
+   {
+      JBossConnectionFactory jbcfLive = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                   null,
+                                                                   ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+                                                                   ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                                                   null,
+                                                                   1000,
+                                                                   ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+                                                                   ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+                                                                   ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE,
+                                                                   ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+                                                                   ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+                                                                   ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+                                                                   true,
+                                                                   ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP_ID,
+                                                                   ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+
+      JBossConnectionFactory jbcfBackup = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                backupParams),
+                                                                     null,
+                                                                     ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+                                                                     ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                                                     null,
+                                                                     1000,
+                                                                     ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+                                                                     ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+                                                                     ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE,
+                                                                     ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+                                                                     ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+                                                                     ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+                                                                     ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+                                                                     ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP_ID,
+                                                                     ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+
+      Connection connLive = jbcfLive.createConnection();
+
+      MyExceptionListener listener = new MyExceptionListener();
+
+      connLive.setExceptionListener(listener);
+
+      Session sessLive = connLive.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      ClientSession coreSessionLive = ((JBossSession)sessLive).getCoreSession();
+
+      RemotingConnection coreConnLive = ((ClientSessionImpl)coreSessionLive).getConnection();
+
+      SimpleString jmsQueueName = new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
+
+      coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, false, false);
+
+      Queue queue = sessLive.createQueue("myqueue");
+
+      final int numMessages = 1000;
+
+      MessageProducer producerLive = sessLive.createProducer(queue);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         TextMessage tm = sessLive.createTextMessage("message" + i);
+
+         producerLive.send(tm);
+      }
+      
+      // Note we block on NP send to make sure all messages get to server before failover
+
+      MessagingException me = new MessagingException(MessagingException.NOT_CONNECTED);
+
+      coreConnLive.fail(me);
+
+      assertNotNull(listener.e);
+
+      JMSException je = listener.e;
+
+      assertEquals(me, je.getCause());
+
+      connLive.close();
+
+      // Now recreate on backup
+
+      Connection connBackup = jbcfBackup.createConnection();
+
+      log.info("creating session on backup");
+      Session sessBackup = connBackup.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      log.info("created on backup");
+
+      MessageConsumer consumerBackup = sessBackup.createConsumer(queue);
+
+      connBackup.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         TextMessage tm = (TextMessage)consumerBackup.receive(1000);
+
+         assertNotNull(tm);
+
+         assertEquals("message" + i, tm.getText());
+      }
+
+      TextMessage tm = (TextMessage)consumerBackup.receive(1000);
+
+      assertNull(tm);
+
+      connBackup.close();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                          backupParams));
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private static class MyExceptionListener implements ExceptionListener
+   {
+      volatile JMSException e;
+
+      public void onException(final JMSException e)
+      {
+         this.e = e;
+      }
+   }
+
+}




More information about the jboss-cvs-commits mailing list