[jboss-cvs] JBoss Messaging SVN: r3687 - in trunk: src/main/org/jboss/messaging/core and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 8 13:08:20 EST 2008


Author: timfox
Date: 2008-02-08 13:08:20 -0500 (Fri, 08 Feb 2008)
New Revision: 3687

Removed:
   trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/MessageReference.java
   trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java
   trunk/tests/build.xml
Log:
Completed DLQ and Expiry Queue logic


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-08 18:08:20 UTC (rev 3687)
@@ -88,6 +88,8 @@
    private final boolean autoDeleteQueue;
    
    private final boolean enableFlowControl;
+   
+   private final PersistenceManager persistenceManager;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -113,6 +115,8 @@
       
       this.enableFlowControl = enableFlowControl;
       
+      this.persistenceManager = sessionEndpoint.getConnectionEndpoint().getMessagingServer().getPersistenceManager();
+      
       // adding the consumer to the queue
       messageQueue.addConsumer(this);
       
@@ -132,7 +136,7 @@
 
       if (ref.getMessage().isExpired())
       {         
-         sessionEndpoint.expireDelivery(ref);
+         ref.expire(persistenceManager);
          
          return HandleStatus.HANDLED;
       }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-08 18:08:20 UTC (rev 3687)
@@ -213,28 +213,6 @@
    // Package protected
    // ----------------------------------------------------------------------------
 
-   void expireDelivery(MessageReference ref) throws Exception
-   {
-      Queue expiryQueue = ref.getQueue().getExpiryQueue();
-
-      if (expiryQueue != null)
-      {
-         Message copy = makeCopyForDLQOrExpiry(true, ref);
-
-         moveInTransaction(copy, ref, expiryQueue, true);
-      }
-      else
-      {
-         log.warn("No expiry queue has been configured so removing expired " + ref);
-
-         // TODO - tidy up these references - ugly
-         ref.acknowledge(this.getConnectionEndpoint().getMessagingServer()
-               .getPersistenceManager());
-      }
-
-      // TODO
-   }
-
    void removeBrowser(String browserId) throws Exception
    {
       if (browsers.remove(browserId) == null)
@@ -497,8 +475,10 @@
       }
       else if (expired)
       {
-         if (deliveryID == -1) { throw new IllegalArgumentException(
-               "Invalid delivery id"); }
+         if (deliveryID == -1)
+         {
+            throw new IllegalArgumentException("Invalid delivery id");
+         }
 
          // Expire a single reference
 
@@ -508,8 +488,7 @@
 
             if (delivery.getDeliveryID() == deliveryID)
             {
-               // TODO - send to expiry queue
-               delivery.getReference().acknowledge(sp.getPersistenceManager());
+               delivery.getReference().expire(sp.getPersistenceManager());
 
                iter.remove();
 
@@ -819,134 +798,6 @@
    // Private
    // --------------------------------------------------------------------------------------
 
-   // private void cancelDeliveryInternal(Cancel cancel) throws Exception
-   // {
-   // DeliveryRecord rec =
-   // (DeliveryRecord)deliveries.remove(cancel.getDeliveryId());
-   //
-   // if (rec == null)
-   // {
-   // //The delivery might not be found, if the session is not replicated (i.e.
-   // auto_ack or dups_ok)
-   // //and has failed over since recoverDeliveries won't have been called
-   // if (trace)
-   // {
-   // log.trace("Cannot find delivery to cancel, session probably failed over
-   // and is not replicated");
-   // }
-   // return;
-   // }
-   //
-   // MessageReference ref = rec.ref;
-   //
-   // //Note we check the flag *and* evaluate again, this is because the server
-   // and client clocks may
-   // //be out of synch and don't want to send back to the client a message it
-   // thought it has sent to
-   // //the expiry queue
-   // boolean expired = cancel.isExpired() || ref.getMessage().isExpired();
-   //
-   // //Note we check the flag *and* evaluate again, this is because the server
-   // value of maxDeliveries
-   // //might get changed after the client has sent the cancel - and we don't
-   // want to end up cancelling
-   // //back to the original queue
-   // boolean reachedMaxDeliveryAttempts =
-   // cancel.isReachedMaxDeliveryAttempts() || cancel.getDeliveryCount() >=
-   // rec.maxDeliveryAttempts;
-   //
-   // if (!expired && !reachedMaxDeliveryAttempts)
-   // {
-   // //Normal cancel back to the queue
-   //
-   // ref.setDeliveryCount(cancel.getDeliveryCount());
-   //
-   // //Do we need to set a redelivery delay?
-   //
-   // if (rec.redeliveryDelay != 0)
-   // {
-   // ref.setScheduledDeliveryTime(System.currentTimeMillis() +
-   // rec.redeliveryDelay);
-   // }
-   //
-   // if (trace) { log.trace("Cancelling delivery " + cancel.getDeliveryId()); }
-   //
-   // ref.cancel(sp.getPersistenceManager());
-   //
-   // }
-   // else
-   // {
-   // if (expired)
-   // {
-   // //Sent to expiry queue
-   //
-   // Message copy = makeCopyForDLQOrExpiry(true, ref);
-   //
-   // moveInTransaction(copy, ref, rec.expiryQueue, false);
-   // }
-   // else
-   // {
-   // //Send to DLQ
-   //
-   // Message copy = makeCopyForDLQOrExpiry(false, ref);
-   //
-   // moveInTransaction(copy, ref, rec.dlq, true);
-   // }
-   // }
-   // }
-
-   private Message makeCopyForDLQOrExpiry(boolean expiry, MessageReference ref)
-         throws Exception
-   {
-      // We copy the message and send that to the dlq/expiry queue - this is
-      // because
-      // otherwise we may end up with a ref with the same message id in the
-      // queue more than once
-      // which would barf - this might happen if the same message had been
-      // expire from multiple
-      // subscriptions of a topic for example
-      // We set headers that hold the original message destination, expiry time
-      // and original message id
-
-      if (trace)
-      {
-         log.trace("Making copy of message for DLQ or expiry " + ref);
-      }
-
-      Message msg = ref.getMessage();
-
-      Message copy = msg.copy();
-
-      long newMessageId = sp.getPersistenceManager().generateMessageID();
-
-      copy.setMessageID(newMessageId);
-
-      // reset expiry
-      copy.setExpiration(0);
-
-      if (expiry)
-      {
-         long actualExpiryTime = System.currentTimeMillis();
-
-         copy.putHeader(Message.HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
-      }
-
-      return copy;
-   }
-
-   private void moveInTransaction(Message msg, MessageReference ref,
-         Queue queue, boolean dlq) throws Exception
-   {
-      Transaction tx = new TransactionImpl();
-
-      tx.addMessage(msg);
-
-      tx.addAcknowledgement(ref);
-
-      tx.commit(true, getConnectionEndpoint().getMessagingServer()
-            .getPersistenceManager());
-   }
-
    private void addAddress(String address) throws Exception
    {
       if (postOffice.containsAllowableAddress(address)) { throw new MessagingException(

Modified: trunk/src/main/org/jboss/messaging/core/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessageReference.java	2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/src/main/org/jboss/messaging/core/MessageReference.java	2008-02-08 18:08:20 UTC (rev 3687)
@@ -60,7 +60,9 @@
    
    void acknowledge(PersistenceManager persistenceManager) throws Exception;  
    
-   void cancel(PersistenceManager persistenceManager) throws Exception;   
+   void cancel(PersistenceManager persistenceManager) throws Exception;  
+   
+   void expire(PersistenceManager persistenceManager) throws Exception;
 }
 
 

Modified: trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java	2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java	2008-02-08 18:08:20 UTC (rev 3687)
@@ -136,8 +136,44 @@
       }
             
       queue.decrementDeliveringCount();
+      
+      int maxDeliveries = queue.getMaxDeliveryAttempts();
+      
+      if (maxDeliveries > 0 && deliveryCount >= maxDeliveries)
+      {
+         if (queue.getDLQ() != null)
+         {
+            Message copyMessage = makeCopyForDLQOrExpiry(false, persistenceManager);
+            
+            moveInTransaction(queue.getDLQ(), copyMessage, persistenceManager);
+         }
+         else
+         {
+            //No DLQ
+            
+            log.warn("Message has reached maximum delivery attempts, no DLQ is configured so dropping it");
+            
+            acknowledge(persistenceManager);
+         }         
+      }
    }
    
+   public void expire(PersistenceManager persistenceManager) throws Exception
+   {
+      if (queue.getExpiryQueue() != null)
+      {
+         Message copyMessage = makeCopyForDLQOrExpiry(false, persistenceManager);
+         
+         moveInTransaction(queue.getExpiryQueue(), copyMessage, persistenceManager);
+      }
+      else
+      {
+         log.warn("Message has expired, no expiry queue is configured so dropping it");
+         
+         acknowledge(persistenceManager);
+      }
+   }
+         
    // Public --------------------------------------------------------
 
    public String toString()
@@ -150,7 +186,51 @@
    // Protected -----------------------------------------------------   
    
    // Private -------------------------------------------------------
+   
+   private void moveInTransaction(Queue destinationQueue, Message copyMessage,
+                                  PersistenceManager persistenceManager) throws Exception
+   {
+      copyMessage.createReference(queue.getExpiryQueue());
+      
+      TransactionImpl tx = new TransactionImpl();
+      
+      tx.addMessage(copyMessage);
+      
+      tx.addAcknowledgement(this);
+      
+      tx.commit(true, persistenceManager);
+   }
+   
+   private Message makeCopyForDLQOrExpiry(boolean expiry, PersistenceManager pm) throws Exception
+   {
+      /*
+       We copy the message and send that to the dlq/expiry queue - this is
+       because otherwise we may end up with a ref with the same message id in the
+       queue more than once which would barf - this might happen if the same message had been
+       expire from multiple subscriptions of a topic for example
+       We set headers that hold the original message destination, expiry time
+       and original message id
+      */
 
+      Message copy = message.copy();
+      
+      long newMessageId = pm.generateMessageID();
+      
+      copy.setMessageID(newMessageId);
+      
+      // reset expiry
+      copy.setExpiration(0);
+      
+      if (expiry)
+      {
+         long actualExpiryTime = System.currentTimeMillis();
+      
+         copy.putHeader(Message.HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
+      }
+      
+      return copy;
+   }
+
    // Inner classes -------------------------------------------------
    
 }
\ No newline at end of file

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/tests/build.xml	2008-02-08 18:08:20 UTC (rev 3687)
@@ -520,8 +520,6 @@
                <exclude name="**/jms/bridge/**"/>
                <exclude name="**/jms/manual/**"/>
                <exclude name="**/jms/clustering/**"/>
-               <exclude name="**/jms/ExpiryQueueTest.class"/>
-               <exclude name="**/jms/DLQTest.class"/>
                <exclude name="**/postoffice/**"/>
                <exclude name="**/jms/JCAWrapperTest.class"/>
                <exclude name="**/jms/server/ServerPeerTest.class"/>
@@ -643,8 +641,6 @@
                <exclude name="**/jms/JCAWrapperTest.class"/>
                <exclude name="**/jms/ClientExitTest.class"/>
 
-               <exclude name="**/jms/ExpiryQueueTest.class"/>
-               <exclude name="**/jms/DLQTest.class"/>
                <exclude name="**/jms/SecurityTest.class"/>
                <exclude name="**/jms/crash/*Test.class"/>
 	       

Deleted: trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java	2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java	2008-02-08 18:08:20 UTC (rev 3687)
@@ -1,835 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.management.ObjectName;
-
-import org.jboss.jms.destination.JBossQueue;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.messaging.core.impl.server.MessagingServerImpl;
-import org.jboss.test.messaging.tools.ServerManagement;
-
-/**
- * A DLQTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *          <p/>
- *          $Id$
- */
-public class DLQTest extends JMSTestCase
-{
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public DLQTest(String name)
-   {
-      super(name);
-   }
-
-   // Public --------------------------------------------------------
-
-   public void testDefaultAndOverrideDLQ() throws Exception
-   {
-      if (ServerManagement.isRemote())
-      {
-         return;
-      }
-
-      final int NUM_MESSAGES = 5;
-
-      final int MAX_DELIVERIES = 8;
-
-      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
-      String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=Queue1";
-
-      Connection conn = null;
-
-      try
-      {
-         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
-
-         String overrideDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue3";
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
-
-         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
-
-         conn = cf.createConnection();
-
-         {
-            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-            MessageProducer prod = sess.createProducer(queue1);
-
-            for (int i = 0; i < NUM_MESSAGES; i++)
-            {
-               TextMessage tm = sess.createTextMessage("Message:" + i);
-
-               prod.send(tm);
-            }
-
-            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-            MessageConsumer cons = sess2.createConsumer(queue1);
-
-            conn.start();
-
-            for (int i = 0; i < MAX_DELIVERIES; i++)
-            {
-               for (int j = 0; j < NUM_MESSAGES; j++)
-               {
-                  TextMessage tm = (TextMessage) cons.receive(1000);
-
-                  assertNotNull(tm);
-
-                  assertEquals("Message:" + j, tm.getText());
-               }
-
-               sess2.recover();
-            }
-
-            //Prompt them to go to DLQ
-            cons.receive(100);
-
-            //At this point all the messages have been delivered exactly MAX_DELIVERIES times 
-
-            checkEmpty(queue1);
-
-            //Now should be in default dlq
-
-            MessageConsumer cons3 = sess.createConsumer(queue2);
-
-            for (int i = 0; i < NUM_MESSAGES; i++)
-            {
-               TextMessage tm = (TextMessage) cons3.receive(1000);
-
-               assertNotNull(tm);
-
-               assertEquals("Message:" + i, tm.getText());
-            }
-
-            conn.close();
-         }
-
-
-         {
-            //Now try with overriding the default dlq
-
-            conn = cf.createConnection();
-
-            ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", overrideDLQObjectName);
-
-            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-            MessageProducer prod = sess.createProducer(queue1);
-
-            for (int i = 0; i < NUM_MESSAGES; i++)
-            {
-               TextMessage tm = sess.createTextMessage("Message:" + i);
-
-               prod.send(tm);
-            }
-
-            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-            MessageConsumer cons = sess2.createConsumer(queue1);
-
-            conn.start();
-
-            for (int i = 0; i < MAX_DELIVERIES; i++)
-            {
-               for (int j = 0; j < NUM_MESSAGES; j++)
-               {
-                  TextMessage tm = (TextMessage) cons.receive(1000);
-
-                  assertNotNull(tm);
-
-                  assertEquals("Message:" + j, tm.getText());
-               }
-
-               sess2.recover();
-            }
-
-            cons.receive(100);
-
-            //At this point all the messages have been delivered exactly MAX_DELIVERIES times 
-
-            checkEmpty(queue1);
-
-            //Now should be in override dlq
-
-            MessageConsumer cons3 = sess.createConsumer(queue3);
-
-            for (int i = 0; i < NUM_MESSAGES; i++)
-            {
-               TextMessage tm = (TextMessage) cons3.receive(1000);
-
-               assertNotNull(tm);
-
-               assertEquals("Message:" + i, tm.getText());
-            }
-         }
-      }
-      finally
-      {
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=DLQ");
-
-         ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
-
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
-
-   public void testWithMessageListenerPersistent() throws Exception
-   {
-      testWithMessageListener(true);
-   }
-
-   public void testWithMessageListenerNonPersistent() throws Exception
-   {
-      testWithMessageListener(false);
-   }
-
-   public void testWithReceiveClientAckPersistent() throws Exception
-   {
-      this.testWithReceiveClientAck(true);
-   }
-
-   public void testWithReceiveClientAckNonPersistent() throws Exception
-   {
-      testWithReceiveClientAck(false);
-   }
-
-   public void testWithReceiveTransactionalPersistent() throws Exception
-   {
-      this.testWithReceiveTransactional(true);
-   }
-
-   public void testWithReceiveTransactionalNonPersistent() throws Exception
-   {
-      testWithReceiveTransactional(false);
-   }
-
-   public void testHeadersSet() throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=Queue2");
-
-         final int MAX_DELIVERIES = 16;
-
-         final int NUM_MESSAGES = 5;
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
-
-         int maxRedeliveryAttempts =
-                 ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
-
-         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
-
-         conn = cf.createConnection();
-
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageProducer prod = sess.createProducer(queue1);
-
-         Map origIds = new HashMap();
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess.createTextMessage("Message:" + i);
-
-            prod.send(tm);
-
-            origIds.put(tm.getText(), tm.getJMSMessageID());
-         }
-
-         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-
-         MessageConsumer cons = sess2.createConsumer(queue1);
-
-         conn.start();
-
-         for (int i = 0; i < MAX_DELIVERIES; i++)
-         {
-            for (int j = 0; j < NUM_MESSAGES; j++)
-            {
-               TextMessage tm = (TextMessage) cons.receive(1000);
-
-               assertNotNull(tm);
-
-               assertEquals("Message:" + j, tm.getText());
-            }
-
-            sess2.rollback();
-         }
-
-         //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
-         //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
-
-         checkEmpty(queue2);
-
-         // So let's try and consume them - this should cause them to go to the DLQ - since they
-         // will then exceed max delivery attempts
-         Message m = cons.receive(100);
-
-         assertNull(m);
-
-         //All the messages should now be in the DLQ
-
-         MessageConsumer cons3 = sess.createConsumer(queue2);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage) cons3.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("Message:" + i, tm.getText());
-
-            // Check the headers
-            String origDest =
-                    tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION);
-
-            String origMessageId =
-                    tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORIG_MESSAGE_ID);
-
-            assertEquals(queue1.toString(), origDest);
-
-            String origId = (String) origIds.get(tm.getText());
-
-            assertEquals(origId, origMessageId);
-         }
-      }
-      finally
-      {
-         if (conn != null) conn.close();
-      }
-   }
-
-   public void testOverrideDefaultMaxDeliveryAttemptsForQueue() throws Exception
-   {
-      int md = getDefaultMaxDeliveryAttempts();
-      try
-      {
-         int maxDeliveryAttempts = md - 5;
-         setMaxDeliveryAttempts(
-                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
-                 maxDeliveryAttempts);
-         testMaxDeliveryAttempts(queue1, maxDeliveryAttempts, true);
-      }
-      finally
-      {
-         setMaxDeliveryAttempts(
-                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
-                 md);
-      }
-   }
-
-   public void testOverrideDefaultMaxDeliveryAttemptsForTopic() throws Exception
-   {
-      int md = getDefaultMaxDeliveryAttempts();
-      try
-      {
-         int maxDeliveryAttempts = md - 5;
-         setMaxDeliveryAttempts(
-                 new ObjectName("jboss.messaging.destination:service=Topic,name=Topic1"),
-                 maxDeliveryAttempts);
-
-         testMaxDeliveryAttempts(topic1, maxDeliveryAttempts, false);
-      }
-      finally
-      {
-         setMaxDeliveryAttempts(
-                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
-                 md);
-      }
-   }
-
-   public void testUseDefaultMaxDeliveryAttemptsForQueue() throws Exception
-   {
-      int md = getDefaultMaxDeliveryAttempts();
-      try
-      {
-         setMaxDeliveryAttempts(
-                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
-                 -1);
-
-         // Check that defaultMaxDeliveryAttempts takes effect
-         testMaxDeliveryAttempts(queue1, getDefaultMaxDeliveryAttempts(), true);
-      }
-      finally
-      {
-         setMaxDeliveryAttempts(
-                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
-                 md);
-      }
-   }
-
-   public void testUseDefaultMaxDeliveryAttemptsForTopic() throws Exception
-   {
-      int md = getDefaultMaxDeliveryAttempts();
-      try
-      {
-         setMaxDeliveryAttempts(
-                 new ObjectName("jboss.messaging.destination:service=Topic,name=Topic1"),
-                 -1);
-
-         // Check that defaultMaxDeliveryAttempts takes effect
-         testMaxDeliveryAttempts(topic1, getDefaultMaxDeliveryAttempts(), false);
-      }
-      finally
-      {
-         setMaxDeliveryAttempts(
-                 new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
-                 md);
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected void testWithMessageListener(boolean persistent) throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
-         final int MAX_DELIVERIES = 16;
-
-         final int NUM_MESSAGES = 5;
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
-
-         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
-
-         int maxRedeliveryAttempts =
-                 ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
-
-         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
-
-         conn = cf.createConnection();
-
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageProducer prod = sess.createProducer(queue1);
-
-         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess.createTextMessage("Message:" + i);
-
-            prod.send(tm);
-         }
-
-         MessageConsumer cons = sess.createConsumer(queue1);
-
-         FailingMessageListener listener = new FailingMessageListener(MAX_DELIVERIES * NUM_MESSAGES);
-
-         cons.setMessageListener(listener);
-
-         conn.start();
-
-         listener.waitForMessages();
-
-         assertEquals(MAX_DELIVERIES * NUM_MESSAGES, listener.deliveryCount);
-
-         //Message should all be in the dlq - let's check
-
-         MessageConsumer cons2 = sess.createConsumer(queue2);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage) cons2.receive(1000);
-
-            assertNotNull(tm);
-
-            log.info("Got mnessage" + tm);
-
-            assertEquals("Message:" + i, tm.getText());
-         }
-
-         checkEmpty(queue1);
-      }
-      finally
-      {
-         if (conn != null) conn.close();
-      }
-   }
-
-
-   protected void testWithReceiveClientAck(boolean persistent) throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
-         final int MAX_DELIVERIES = 16;
-
-         final int NUM_MESSAGES = 5;
-
-         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
-
-         int maxRedeliveryAttempts =
-                 ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
-
-         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
-
-         conn = cf.createConnection();
-
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageProducer prod = sess.createProducer(queue1);
-
-         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess.createTextMessage("Message:" + i);
-
-            prod.send(tm);
-         }
-
-         Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-         MessageConsumer cons = sess2.createConsumer(queue1);
-
-         conn.start();
-
-         for (int i = 0; i < MAX_DELIVERIES; i++)
-         {
-            for (int j = 0; j < NUM_MESSAGES; j++)
-            {
-               TextMessage tm = (TextMessage) cons.receive(1000);
-
-               assertNotNull(tm);
-
-               assertEquals("Message:" + j, tm.getText());
-            }
-
-            sess2.recover();
-         }
-
-         //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
-         //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
-
-         checkEmpty(queue2);
-
-         //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
-         //delivery attempts
-
-         Message m = cons.receive(100);
-
-         assertNull(m);
-
-         //Now, all the messages should now be in the DLQ
-
-         MessageConsumer cons3 = sess.createConsumer(queue2);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage) cons3.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("Message:" + i, tm.getText());
-         }
-
-         //No more should be available
-
-         cons.close();
-
-         checkEmpty(queue1);
-      }
-      finally
-      {
-         destroyQueue("DLQ");
-
-         if (conn != null) conn.close();
-      }
-   }
-
-   protected void testWithReceiveTransactional(boolean persistent) throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
-         final int MAX_DELIVERIES = 16;
-
-         final int NUM_MESSAGES = 5;
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
-
-         String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
-
-         int maxRedeliveryAttempts =
-                 ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
-
-         assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
-
-         conn = cf.createConnection();
-
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageProducer prod = sess.createProducer(queue1);
-
-         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess.createTextMessage("Message:" + i);
-
-            prod.send(tm);
-         }
-
-         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-
-         MessageConsumer cons = sess2.createConsumer(queue1);
-
-         conn.start();
-
-         for (int i = 0; i < MAX_DELIVERIES; i++)
-         {
-            for (int j = 0; j < NUM_MESSAGES; j++)
-            {
-               TextMessage tm = (TextMessage) cons.receive(1000);
-
-               assertNotNull(tm);
-
-               assertEquals("Message:" + j, tm.getText());
-            }
-
-            sess2.rollback();
-         }
-
-         //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
-         //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
-
-         checkEmpty(queue2);
-
-         //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
-         //delivery attempts
-         Message m = cons.receive(100);
-
-         assertNull(m);
-
-         //All the messages should now be in the DLQ
-
-         MessageConsumer cons3 = sess.createConsumer(queue2);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage) cons3.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("Message:" + i, tm.getText());
-         }
-
-         //No more should be available
-
-         checkEmpty(queue1);
-      }
-      finally
-      {
-         destroyQueue("DLQ");
-
-         if (conn != null) conn.close();
-      }
-   }
-
-   protected int getDefaultMaxDeliveryAttempts() throws Exception
-   {
-      return ((Integer) ServerManagement.getAttribute(
-              ServerManagement.getServerPeerObjectName(),
-              "DefaultMaxDeliveryAttempts"))
-              .intValue();
-   }
-
-   protected void setMaxDeliveryAttempts(ObjectName dest, int maxDeliveryAttempts) throws Exception
-   {
-      ServerManagement.setAttribute(dest, "MaxDeliveryAttempts",
-              Integer.toString(maxDeliveryAttempts));
-   }
-
-   protected void testMaxDeliveryAttempts(Destination destination, int destMaxDeliveryAttempts, boolean queue) throws Exception
-   {
-      Connection conn = cf.createConnection();
-
-      if (!queue)
-      {
-         conn.setClientID("wib123");
-      }
-
-      try
-      {
-         ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(),
-                 "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=Queue2");
-
-         // Create the consumer before the producer so that the message we send doesn't
-         // get lost if the destination is a Topic.
-         Session consumingSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-         MessageConsumer destinationConsumer;
-
-         if (queue)
-         {
-            destinationConsumer = consumingSession.createConsumer(destination);
-         }
-         else
-         {
-            //For topics we only keep a delivery record on the server side for durable subs
-            destinationConsumer = consumingSession.createDurableSubscriber((Topic) destination, "testsub1");
-         }
-
-         {
-            Session producingSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageProducer prod = producingSession.createProducer(destination);
-            TextMessage tm = producingSession.createTextMessage("Message");
-            prod.send(tm);
-         }
-
-         conn.start();
-
-         // Make delivery attempts up to the maximum. The message should not end up in the DLQ.
-         for (int i = 0; i < destMaxDeliveryAttempts; i++)
-         {
-            TextMessage tm = (TextMessage) destinationConsumer.receive(1000);
-            assertNotNull("No message received on delivery attempt number " + (i + 1), tm);
-            assertEquals("Message", tm.getText());
-            consumingSession.recover();
-         }
-
-         // At this point the message should not yet be in the DLQ
-         checkEmpty(queue2);
-
-         // Now we try to consume the message again from the destination, which causes it
-         // to go to the DLQ instead.
-         Message m = destinationConsumer.receive(100);
-         assertNull(m);
-
-         // The message should be in the DLQ now
-         MessageConsumer dlqConsumer = consumingSession.createConsumer(queue2);
-         m = dlqConsumer.receive(1000);
-         assertNotNull(m);
-         assertTrue(m instanceof TextMessage);
-         assertEquals("Message", ((TextMessage) m).getText());
-
-         m.acknowledge();
-
-         if (!queue)
-         {
-            destinationConsumer.close();
-
-            consumingSession.unsubscribe("testsub1");
-         }
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-   class FailingMessageListener implements MessageListener
-   {
-      volatile int deliveryCount;
-
-      int numMessages;
-
-      FailingMessageListener(int numMessages)
-      {
-         this.numMessages = numMessages;
-      }
-
-      synchronized void waitForMessages() throws Exception
-      {
-         while (deliveryCount != numMessages)
-         {
-            this.wait();
-         }
-      }
-
-      public synchronized void onMessage(Message msg)
-      {
-         deliveryCount++;
-
-         this.notify();
-
-         throw new RuntimeException("Your mum!");
-      }
-
-   }
-
-}

Deleted: trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java	2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java	2008-02-08 18:08:20 UTC (rev 3687)
@@ -1,620 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.management.ObjectName;
-
-import org.jboss.jms.destination.JBossQueue;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.test.messaging.tools.ServerManagement;
-
-/**
- * A ExpiryQueueTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *          <p/>
- *          $Id$
- */
-public class ExpiryQueueTest extends JMSTestCase
-{
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public ExpiryQueueTest(String name)
-   {
-      super(name);
-   }
-
-   // Public --------------------------------------------------------
-
-   public void testExpiryQueueAlreadyDeployed() throws Exception
-   {
-      if (ServerManagement.isRemote())
-      {
-         return;
-      }
-
-      //deployQueue("ExpiryQueue");
-
-      /*ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
-      ObjectName expiryQueueObjectName = (ObjectName)ServerManagement.getAttribute(serverPeerObjectName, "DefaultExpiryQueue");
-
-      assertNotNull(expiryQueueObjectName);
-
-      String name = (String)ServerManagement.getAttribute(expiryQueueObjectName, "Name");
-
-      assertNotNull(name);
-
-      assertEquals("ExpiryQueue", name);
-
-      String jndiName = (String)ServerManagement.getAttribute(expiryQueueObjectName, "JNDIName");
-
-      assertNotNull(jndiName);
-
-      assertEquals("/queue/ExpiryQueue", jndiName);
-
-      org.jboss.messaging.core.contract.Queue expiryQueue = ServerManagement.getServer().getServerPeer().getDefaultExpiryQueueInstance();
-
-      assertNotNull(expiryQueue);*/
-
-      JBossQueue q = (JBossQueue) ic.lookup("/queue/ExpiryQueue");
-
-      assertNotNull(q);
-
-      assertEquals("ExpiryQueue", q.getName());
-
-   }
-
-
-   public void testDefaultAndOverrideExpiryQueue() throws Exception
-   {
-      final int NUM_MESSAGES = 5;
-
-      Connection conn = null;
-
-      //ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
-      try
-      {
-         createQueue("DefaultExpiry");
-
-         createQueue("OverrideExpiry");
-
-         createQueue("expTestQueue");
-
-         String defaultExpiryObjectName = "DefaultExpiry";
-
-         String overrideExpiryObjectName = "OverrideExpiry";
-
-         String testQueueObjectName = "expTestQueue";
-
-         //ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
-
-         //ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", "");
-
-         Queue testQueue = (Queue) ic.lookup("/queue/expTestQueue");
-
-         Queue defaultExpiry = (Queue) ic.lookup("/queue/DefaultExpiry");
-
-         Queue overrideExpiry = (Queue) ic.lookup("/queue/OverrideExpiry");
-
-         conn = cf.createConnection();
-
-         {
-            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-            MessageProducer prod = sess.createProducer(testQueue);
-
-            conn.start();
-
-            for (int i = 0; i < NUM_MESSAGES; i++)
-            {
-               TextMessage tm = sess.createTextMessage("Message:" + i);
-
-               //Send messages with time to live of 2000 enough time to get to client consumer - so 
-               //they won't be expired on the server side
-               prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
-            }
-
-            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-            MessageConsumer cons = sess2.createConsumer(testQueue);
-
-            //The messages should now be sitting in the consumer buffer
-
-            //Now give them enough time to expire
-
-            Thread.sleep(2500);
-
-            //Now try and receive
-
-            Message m = cons.receive(1000);
-
-            assertNull(m);
-
-            //Message should all be in the default expiry queue - let's check
-
-            MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
-
-            for (int i = 0; i < NUM_MESSAGES; i++)
-            {
-               TextMessage tm = (TextMessage) cons3.receive(1000);
-
-               assertNotNull(tm);
-
-               assertEquals("Message:" + i, tm.getText());
-            }
-
-            conn.close();
-         }
-
-         //now try with overriding the default expiry queue
-         {
-            ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", overrideExpiryObjectName);
-
-            conn = cf.createConnection();
-
-            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-            MessageProducer prod = sess.createProducer(testQueue);
-
-            conn.start();
-
-            for (int i = 0; i < NUM_MESSAGES; i++)
-            {
-               TextMessage tm = sess.createTextMessage("Message:" + i);
-
-               //Send messages with time to live of 2000 enough time to get to client consumer - so 
-               //they won't be expired on the server side
-               prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
-            }
-
-            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-            MessageConsumer cons = sess2.createConsumer(testQueue);
-
-            //The messages should now be sitting in the consumer buffer
-
-            //Now give them enough time to expire
-
-            Thread.sleep(2500);
-
-            //Now try and receive
-
-            Message m = cons.receive(1000);
-
-            assertNull(m);
-
-            //Message should all be in the override expiry queue - let's check
-
-            MessageConsumer cons3 = sess.createConsumer(overrideExpiry);
-
-            for (int i = 0; i < NUM_MESSAGES; i++)
-            {
-               TextMessage tm = (TextMessage) cons3.receive(1000);
-
-               assertNotNull(tm);
-
-               assertEquals("Message:" + i, tm.getText());
-            }
-         }
-      }
-      finally
-      { //
-         //ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
-
-         destroyQueue("DefaultDLQ");
-
-         destroyQueue("OverrideDLQ");
-
-         destroyQueue("TestQueue");
-
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
-   public void testExpireSameMessagesMultiple() throws Exception
-   {
-      final int NUM_MESSAGES = 5;
-
-      Connection conn = null;
-
-      try
-      {
-         createQueue("ExpiryQueue");
-
-         String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=ExpiryQueue";
-
-         ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
-         ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
-
-         Queue defaultExpiry = (Queue) ic.lookup("/queue/ExpiryQueue");
-
-         conn = cf.createConnection();
-
-         conn.setClientID("wib1");
-
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageProducer prod = sess.createProducer(topic1);
-
-         conn.start();
-
-         //Create 3 durable subscriptions
-
-         MessageConsumer sub1 = sess.createDurableSubscriber(topic1, "sub1");
-
-         MessageConsumer sub2 = sess.createDurableSubscriber(topic1, "sub2");
-
-         MessageConsumer sub3 = sess.createDurableSubscriber(topic1, "sub3");
-
-         Map origIds = new HashMap();
-
-         long now = System.currentTimeMillis();
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess.createTextMessage("Message:" + i);
-
-            //Send messages with time to live of 3000 enough time to get to client consumer - so 
-            //they won't be expired on the server side
-            prod.send(tm, DeliveryMode.PERSISTENT, 4, 3000);
-
-            origIds.put(tm.getText(), tm.getJMSMessageID());
-         }
-
-         long approxExpiry = now + 3000;
-
-         //Now sleep. This wil give them enough time to expire
-
-         Thread.sleep(3500);
-
-         //Now try and consume from each - this should force the message to the expiry queue
-
-         Message m = sub1.receive(500);
-         assertNull(m);
-
-         m = sub2.receive(500);
-         assertNull(m);
-
-         m = sub3.receive(500);
-         assertNull(m);
-
-         //Now the messages should all be in the expiry queue
-
-         MessageConsumer cons2 = sess.createConsumer(defaultExpiry);
-
-         while (true)
-         {
-            TextMessage tm = (TextMessage) cons2.receive(500);
-
-            if (tm == null)
-            {
-               break;
-            }
-
-            // Check the headers
-            String origDest =
-                    tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION);
-
-            String origMessageId =
-                    tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORIG_MESSAGE_ID);
-
-            long actualExpiryTime =
-                    tm.getLongProperty(JBossMessage.JBOSS_MESSAGING_ACTUAL_EXPIRY_TIME);
-
-            assertEquals(topic1.toString(), origDest);
-
-            String origId = (String) origIds.get(tm.getText());
-
-            assertEquals(origId, origMessageId);
-
-            assertTrue(actualExpiryTime >= approxExpiry);
-         }
-
-         cons2.close();
-
-         sub1.close();
-
-         sub2.close();
-
-         sub3.close();
-
-         sess.unsubscribe("sub1");
-
-         sess.unsubscribe("sub2");
-
-         sess.unsubscribe("sub3");
-
-      }
-      finally
-      {
-         destroyQueue("ExpiryQueue");
-
-         if (conn != null)
-         {
-            conn.close();
-         }
-      }
-   }
-
-   public void testWithMessageListenerPersistent() throws Exception
-   {
-      testWithMessageListener(true);
-   }
-
-   public void testWithMessageListenerNonPersistent() throws Exception
-   {
-      testWithMessageListener(false);
-   }
-
-   public void testWithReceivePersistent() throws Exception
-   {
-      this.testWithReceive(true);
-   }
-
-   public void testWithReceiveNonPersistent() throws Exception
-   {
-      testWithReceive(false);
-   }
-
-   public void testWithMessageListener(boolean persistent) throws Exception
-   {
-      Connection conn = null;
-
-
-      Queue expiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
-
-      final int NUM_MESSAGES = 5;
-
-      conn = cf.createConnection();
-
-      conn.start();
-
-      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-      MessageProducer prod = sess.createProducer(queue1);
-
-      int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
-
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         TextMessage tm = sess.createTextMessage("Message:" + i);
-
-         //Send messages with time to live of 2000 enough time to get to client consumer - so
-         //they won't be expired on the server side
-         prod.send(tm, deliveryMode, 4, 2000);
-      }
-
-      MessageConsumer cons = sess.createConsumer(queue1);
-
-      //The messages should now be sitting in the consumer buffer
-
-      //Now give them enough time to expire
-
-      Thread.sleep(2500);
-
-      //Now set a listener
-
-      FailingMessageListener listener = new FailingMessageListener();
-
-      cons.setMessageListener(listener);
-
-      Thread.sleep(1000);
-
-      cons.setMessageListener(null);
-
-      //No messages should have been received
-      assertEquals(0, listener.deliveryCount);
-
-      //Shouldn't be able to receive any more
-
-      Message m = cons.receive(1000);
-
-      assertNull(m);
-
-      //Message should all be in the expiry queue - let's check
-
-      MessageConsumer cons2 = sess.createConsumer(expiryQueue);
-
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         TextMessage tm = (TextMessage) cons2.receive(1000);
-
-         assertNotNull(tm);
-
-         assertEquals("Message:" + i, tm.getText());
-      }
-
-   }
-
-   public void testWithReceive(boolean persistent) throws Exception
-   {
-      Connection conn = null;
-
-      try
-      {
-
-         createQueue("ExpiryQueue");
-
-         Queue expiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
-
-         final int NUM_MESSAGES = 5;
-
-         conn = cf.createConnection();
-
-         conn.start();
-
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageProducer prod = sess.createProducer(queue1);
-
-         int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess.createTextMessage("Message:" + i);
-
-            //Send messages with time to live of 2000 enough time to get to client consumer - so 
-            //they won't be expired on the server side
-            prod.send(tm, deliveryMode, 4, 2000);
-         }
-
-         MessageConsumer cons = sess.createConsumer(queue1);
-
-         //The messages should now be sitting in the consumer buffer
-
-         //Now give them enough time to expire
-
-         Thread.sleep(2500);
-
-         //Now try and receive
-
-         Message m = cons.receive(1000);
-
-         assertNull(m);
-
-         //Message should all be in the expiry queue - let's check
-
-         MessageConsumer cons2 = sess.createConsumer(expiryQueue);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage) cons2.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("Message:" + i, tm.getText());
-         }
-
-      }
-      finally
-      {
-         destroyQueue("ExpiryQueue");
-
-         if (conn != null) conn.close();
-      }
-   }
-
-   public void testExpirationTransfer() throws Exception
-   {
-      createQueue("ExpiryQueue");
-
-      Object originalValue = ServerManagement.getAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue");
-
-      ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue", "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
-
-      Connection conn = null;
-
-      try
-      {
-         ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
-
-         conn = cf.createConnection();
-
-         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         conn.start();
-
-         MessageProducer prod = session.createProducer(queue1);
-         prod.setTimeToLive(100);
-
-         Message m = session.createTextMessage("This message will die");
-
-         prod.send(m);
-
-         // wait for the message to die
-         Thread.sleep(2000);
-
-         MessageConsumer cons = session.createConsumer(queue1);
-
-         assertNull(cons.receive(3000));
-
-         Queue queueExpiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
-
-         MessageConsumer consumerExpiredQueue = session.createConsumer(queueExpiryQueue);
-
-         TextMessage txt = (TextMessage) consumerExpiredQueue.receive(1000);
-
-         assertEquals("This message will die", txt.getText());
-
-         assertNull(consumerExpiredQueue.receive(1000));
-      }
-      finally
-      {
-         if (conn != null)
-         {
-            conn.close();
-         }
-
-         destroyQueue("ExpiryQueue");
-
-         ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue", originalValue.toString());
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-   class FailingMessageListener implements MessageListener
-   {
-      volatile int deliveryCount;
-
-      public void onMessage(Message msg)
-      {
-         deliveryCount++;
-
-         throw new RuntimeException("Your mum!");
-      }
-
-   }
-
-}




More information about the jboss-cvs-commits mailing list