[jboss-cvs] JBoss Messaging SVN: r1879 - in trunk: src/etc src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate src/main/org/jboss/jms/server src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/message tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Dec 31 04:50:12 EST 2006


Author: timfox
Date: 2006-12-31 04:49:46 -0500 (Sun, 31 Dec 2006)
New Revision: 1879

Added:
   trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
Removed:
   trunk/src/main/org/jboss/jms/client/container/ReceiverAspect.java
   trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java
Modified:
   trunk/src/etc/aop-messaging-client.xml
   trunk/src/etc/server/default/deploy/destinations-service.xml
   trunk/src/etc/server/default/deploy/messaging-service.xml
   trunk/src/etc/xmdesc/Queue-xmbean.xml
   trunk/src/etc/xmdesc/ServerPeer-xmbean.xml
   trunk/src/etc/xmdesc/Topic-xmbean.xml
   trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
   trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java
   trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java
   trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java
   trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
   trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/Routable.java
   trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java
   trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
Expiry Queue functionality and now both expiry queue and dlq can be specified on a per queue basis



Modified: trunk/src/etc/aop-messaging-client.xml
===================================================================
--- trunk/src/etc/aop-messaging-client.xml	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/aop-messaging-client.xml	2006-12-31 09:49:46 UTC (rev 1879)
@@ -10,11 +10,9 @@
    <interceptor class="org.jboss.jms.client.container.ConcurrencyInterceptor" scope="PER_INSTANCE"/>
 
    <aspect class="org.jboss.jms.client.container.StateCreationAspect" scope="PER_VM"/>
-   <aspect class="org.jboss.jms.client.container.ReceiverAspect" scope="PER_VM"/>
    <aspect class="org.jboss.jms.client.container.ConsumerAspect" scope="PER_VM"/>
    <aspect class="org.jboss.jms.client.container.ProducerAspect" scope="PER_INSTANCE"/>
    <aspect class="org.jboss.jms.client.container.SessionAspect" scope="PER_VM"/>
-   <aspect class="org.jboss.jms.client.container.TransactionAspect" scope="PER_VM"/>
    <aspect class="org.jboss.jms.client.container.AsfAspect" scope="PER_INSTANCE"/>
    <aspect class="org.jboss.jms.client.container.BrowserAspect" scope="PER_INSTANCE"/>
    <aspect class="org.jboss.jms.client.container.ConnectionAspect" scope="PER_INSTANCE"/>
@@ -141,20 +139,14 @@
       <advice name="handleClose" aspect="org.jboss.jms.client.container.SessionAspect"/>         
    </bind>   
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->commit())">
-      <advice name="handleCommit" aspect="org.jboss.jms.client.container.TransactionAspect"/>
+      <advice name="handleCommit" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->rollback())">
-      <advice name="handleRollback" aspect="org.jboss.jms.client.container.TransactionAspect"/>
+      <advice name="handleRollback" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->preDeliver(..))">
-      <advice name="handlePreDeliver" aspect="org.jboss.jms.client.container.TransactionAspect"/>
-   </bind>   
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->send(..))">
-      <advice name="handleSend" aspect="org.jboss.jms.client.container.TransactionAspect"/>
+      <advice name="handleSend" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->close())">
-      <advice name="handleClose" aspect="org.jboss.jms.client.container.TransactionAspect"/>
-   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createConsumerDelegate(..))">
       <advice name="handleCreateConsumerDelegate" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>   
@@ -181,16 +173,16 @@
       <interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>          
    </bind>       
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getMessageListener())">
-      <advice name="handleGetMessageListener" aspect="org.jboss.jms.client.container.ReceiverAspect"/>         
+      <advice name="handleGetMessageListener" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->setMessageListener(..))">
-      <advice name="handleSetMessageListener" aspect="org.jboss.jms.client.container.ReceiverAspect"/>         
+      <advice name="handleSetMessageListener" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>   
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->receive(..))">
-      <advice name="handleReceive" aspect="org.jboss.jms.client.container.ReceiverAspect"/>         
+      <advice name="handleReceive" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>  
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->receiveNoWait())">
-      <advice name="handleReceiveNoWait" aspect="org.jboss.jms.client.container.ReceiverAspect"/>         
+      <advice name="handleReceiveNoWait" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         
    </bind>    
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing())">
       <advice name="handleClosing" aspect="org.jboss.jms.client.container.ConsumerAspect"/>         

Modified: trunk/src/etc/server/default/deploy/destinations-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/destinations-service.xml	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/server/default/deploy/destinations-service.xml	2006-12-31 09:49:46 UTC (rev 1879)
@@ -9,7 +9,7 @@
 <server>
 
    <!--
-      The Dead Letter Queue. This destination is a dependency of an EJB MDB container.
+      The Default Dead Letter Queue. This destination is a dependency of an EJB MDB container.
    -->
 
    <mbean code="org.jboss.jms.server.destination.QueueService"
@@ -18,7 +18,18 @@
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
       <depends>jboss.messaging:service=PostOffice</depends>
    </mbean>
+   
+   <!--
+      The Default Expiry Queue.
+   -->
 
+   <mbean code="org.jboss.jms.server.destination.QueueService"
+      name="jboss.messaging.destination:service=Queue,name=ExpiryQueue"
+      xmbean-dd="xmdesc/Queue-xmbean.xml">
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=PostOffice</depends>
+   </mbean>   
+
    <!--
         Example destinations.
    -->
@@ -112,6 +123,41 @@
       <depends>jboss.messaging:service=PostOffice</depends>
    </mbean>
    
+   <!-- It's possible for indiviual queues and topics to use a specific queue for
+   an expiry or DLQ -->
+   
+   <mbean code="org.jboss.jms.server.destination.QueueService"
+      name="jboss.messaging.destination:service=Queue,name=PrivateDLQ"
+      xmbean-dd="xmdesc/Queue-xmbean.xml">
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=PostOffice</depends>
+   </mbean>
+   
+   <mbean code="org.jboss.jms.server.destination.QueueService"
+      name="jboss.messaging.destination:service=Queue,name=PrivateExpiryQueue"
+      xmbean-dd="xmdesc/Queue-xmbean.xml">
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=PostOffice</depends>
+   </mbean>      
+   
+   <mbean code="org.jboss.jms.server.destination.QueueService"
+      name="jboss.messaging.destination:service=Queue,name=QueueWithOwnDLQAndExpiryQueue"
+      xmbean-dd="xmdesc/Queue-xmbean.xml">
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=PostOffice</depends>
+      <attribute name="DLQ">jboss.messaging.destination:service=Queue,name=PrivateDLQ</attribute>
+      <attribute name="ExpiryQueue">jboss.messaging.destination:service=Queue,name=PrivateExpiryQueue</attribute>
+   </mbean> 
+   
+   <mbean code="org.jboss.jms.server.destination.TopicService"
+      name="jboss.messaging.destination:service=Topic,name=TopicWithOwnDLQAndExpiryQueue"
+      xmbean-dd="xmdesc/Topic-xmbean.xml">
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+      <depends>jboss.messaging:service=PostOffice</depends>
+      <attribute name="DLQ">jboss.messaging.destination:service=Queue,name=PrivateDLQ</attribute>
+      <attribute name="ExpiryQueue">jboss.messaging.destination:service=Queue,name=PrivateExpiryQueue</attribute>
+   </mbean>       
+   
    <!--
         Example clustered destinations.
    -->

Modified: trunk/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/messaging-service.xml	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/server/default/deploy/messaging-service.xml	2006-12-31 09:49:46 UTC (rev 1879)
@@ -22,20 +22,29 @@
       </constructor>
 
       <depends optional-attribute-name="PersistenceManager">jboss.messaging:service=PersistenceManager</depends>    
+      
       <depends optional-attribute-name="JMSUserManager">jboss.messaging:service=JMSUserManager</depends>
+      
       <depends>jboss.messaging:service=Connector,transport=socket</depends>
 
 	   <attribute name="PostOffice">jboss.messaging:service=PostOffice</attribute>
 
       <attribute name="SecurityDomain">java:/jaas/messaging</attribute>
+      
       <attribute name="DefaultSecurityConfig">
         <security>
             <role name="guest" read="true" write="true" create="true"/>
         </security>
       </attribute>
-      <attribute name="MaxDeliveryAttempts">10</attribute>
-      <attribute name="DLQName">DLQ</attribute>
+      
+      <attribute name="DefaultDLQ">jboss.messaging.destination:service=Queue,name=DLQ</attribute>
+      
+      <attribute name="DefaultMaxDeliveryAttempts">10</attribute>           
+      
+      <attribute name="DefaultExpiryQueue">jboss.messaging.destination:service=Queue,name=ExpiryQueue</attribute> 
+
       <attribute name="FailoverStartTimeout">3000</attribute>
+      
       <attribute name="FailoverCompleteTimeout">12000</attribute>
    </mbean>
 

Modified: trunk/src/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/Queue-xmbean.xml	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/xmdesc/Queue-xmbean.xml	2006-12-31 09:49:46 UTC (rev 1879)
@@ -59,6 +59,18 @@
       <name>ServerPeer</name>
       <type>javax.management.ObjectName</type>
    </attribute>
+   
+   <attribute access="read-write" getMethod="getDLQ" setMethod="setDLQ">
+      <description>The DLQ for this queue, overrides the default DLQ on the server peer</description>
+      <name>DLQ</name>
+      <type>javax.management.ObjectName</type>
+   </attribute>
+   
+   <attribute access="read-write" getMethod="getExpiryQueue" setMethod="setExpiryQueue">
+      <description>The expiry queue for this queue, overrides the default expiry queue on the server peer</description>
+      <name>ExpiryQueue</name>
+      <type>javax.management.ObjectName</type>
+   </attribute>
 
    <attribute access="read-write" getMethod="getSecurityConfig" setMethod="setSecurityConfig">
       <description>The destination's security configuration</description>
@@ -102,6 +114,14 @@
       <type>boolean</type>
    </attribute>   
    
+   <!-- instance access -->
+
+   <attribute access="read-only" getMethod="getInstance">
+      <description>The instance to be accessed by select plugins via a typed hard reference</description>
+      <name>Instance</name>
+      <type>org.jboss.messaging.core.plugin.contract.MessagingComponent</type>
+   </attribute>
+   
    <!-- Managed operations -->
 
    <operation>

Modified: trunk/src/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ServerPeer-xmbean.xml	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/xmdesc/ServerPeer-xmbean.xml	2006-12-31 09:49:46 UTC (rev 1879)
@@ -46,6 +46,18 @@
       <type>javax.management.ObjectName</type>
    </attribute>   
    
+   <attribute access="read-write" getMethod="getDefaultDLQ" setMethod="setDefaultDLQ">
+      <description>The ObjectName of the default DLQ - used when an individual queue does not specify a DLQ</description>
+      <name>DefaultDLQ</name>
+      <type>javax.management.ObjectName</type>
+   </attribute>  
+   
+   <attribute access="read-write" getMethod="getDefaultExpiryQueue" setMethod="setDefaultExpiryQueue">
+      <description>The ObjectName of the default expiry queue - used when an individual queue does not specify an expiry queue</description>
+      <name>DefaultExpiryQueue</name>
+      <type>javax.management.ObjectName</type>
+   </attribute>    
+   
    <!-- instance access -->
 
    <attribute access="read-only" getMethod="getInstance">
@@ -139,19 +151,13 @@
       <name>QueuedExecutorPoolSize</name>
       <type>int</type>
    </attribute>
-   
-   <attribute access="read-write" getMethod="getMaxDeliveryAttempts" setMethod="setMaxDeliveryAttempts">
-      <description>The maximum delivery attempts for destinations</description>
-      <name>MaxDeliveryAttempts</name>
+         
+   <attribute access="read-write" getMethod="getDefaultMaxDeliveryAttempts" setMethod="setDefaultMaxDeliveryAttempts">
+      <description>The default maximum delivery attempts for destinations, unless overridden on the destination</description>
+      <name>DefaultMaxDeliveryAttempts</name>
       <type>int</type>
    </attribute>  
    
-   <attribute access="read-write" getMethod="getDLQName" setMethod="setDLQName">
-      <description>The JNDI name of the DLQ</description>
-      <name>DLQName</name>
-      <type>java.lang.String</type>
-   </attribute>  
-   
    <attribute access="read-write" getMethod="getFailoverStartTimeout" setMethod="setFailoverStartTimeout">
       <description>The maximum amount of time to wait for failover to begin</description>
       <name>FailoverStartTimeout</name>

Modified: trunk/src/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/Topic-xmbean.xml	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/xmdesc/Topic-xmbean.xml	2006-12-31 09:49:46 UTC (rev 1879)
@@ -57,6 +57,18 @@
       <name>ServerPeer</name>
       <type>javax.management.ObjectName</type>
    </attribute>
+   
+   <attribute access="read-write" getMethod="getDLQ" setMethod="setDLQ">
+      <description>The DLQ for this queue, overrides the default DLQ on the server peer</description>
+      <name>DLQ</name>
+      <type>javax.management.ObjectName</type>
+   </attribute>
+   
+   <attribute access="read-write" getMethod="getExpiryQueue" setMethod="setExpiryQueue">
+      <description>The expiry queue for this queue, overrides the default expiry queue on the server peer</description>
+      <name>ExpiryQueue</name>
+      <type>javax.management.ObjectName</type>
+   </attribute>   
 
    <attribute access="read-write" getMethod="getSecurityConfig" setMethod="setSecurityConfig">
       <description>The destination's security configuration</description>
@@ -93,7 +105,15 @@
       <name>Clustered</name>
       <type>boolean</type>
    </attribute>
+   
+   <!-- instance access -->
 
+   <attribute access="read-only" getMethod="getInstance">
+      <description>The instance to be accessed by select plugins via a typed hard reference</description>
+      <name>Instance</name>
+      <type>org.jboss.messaging.core.plugin.contract.MessagingComponent</type>
+   </attribute>
+
    <!-- Managed operations -->
 
    <operation>

Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -21,6 +21,8 @@
   */
 package org.jboss.jms.client.container;
 
+import javax.jms.MessageListener;
+
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
 import org.jboss.jms.client.delegate.DelegateSupport;
@@ -31,6 +33,8 @@
 import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
+import org.jboss.logging.Logger;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -50,6 +54,8 @@
 {
    // Constants -----------------------------------------------------
    
+   private static final Logger log = Logger.getLogger(ConsumerAspect.class);
+   
    // Static --------------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -133,12 +139,42 @@
       //in the right order
       del.cancelInflightMessages(lastDeliveryId);
       
-      //And then we cancel any messages still in the message callback handler buffer
+      //And then we cancel any messages still in the message callback handler buffer     
       consumerState.getMessageCallbackHandler().cancelBuffer();
                                    
       return res;
    }      
    
+   public Object handleReceive(Invocation invocation) throws Throwable
+   {
+      MethodInvocation mi = (MethodInvocation)invocation;
+      Object[] args = mi.getArguments();
+      long timeout = args == null ? 0 : ((Long)args[0]).longValue();
+      
+      return getMessageCallbackHandler(invocation).receive(timeout);
+   }
+   
+   public Object handleReceiveNoWait(Invocation invocation) throws Throwable
+   {      
+      return getMessageCallbackHandler(invocation).receive(-1);
+   }
+   
+   public Object handleSetMessageListener(Invocation invocation) throws Throwable
+   {   
+      MethodInvocation mi = (MethodInvocation)invocation;
+      Object[] args = mi.getArguments();
+      MessageListener l = (MessageListener)args[0];
+      
+      getMessageCallbackHandler(invocation).setMessageListener(l);
+      
+      return null;
+   }
+   
+   public MessageListener handleGetMessageListener(Invocation invocation) throws Throwable
+   {       
+      return getMessageCallbackHandler(invocation).getMessageListener();
+   }
+   
    public Object handleGetDestination(Invocation invocation) throws Throwable
    {
       return getState(invocation).getDestination();
@@ -165,5 +201,11 @@
       return (ConsumerState)((DelegateSupport)inv.getTargetObject()).getState();
    }
    
+   private MessageCallbackHandler getMessageCallbackHandler(Invocation inv)
+   {      
+      ConsumerState state = (ConsumerState)((DelegateSupport)inv.getTargetObject()).getState();
+      return state.getMessageCallbackHandler();      
+   }
+   
    // Inner classes -------------------------------------------------
 }

Deleted: trunk/src/main/org/jboss/jms/client/container/ReceiverAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ReceiverAspect.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/container/ReceiverAspect.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -1,101 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.jms.client.container;
-
-import javax.jms.MessageListener;
-
-import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.remoting.MessageCallbackHandler;
-import org.jboss.jms.client.state.ConsumerState;
-
-/**
- * 
- * This aspect handles receive functionality for a message consumer
- * 
- * This aspect is PER_VM.
- * 
- * 
- * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class ReceiverAspect
-{
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-   
-   public Object handleReceive(Invocation invocation) throws Throwable
-   {
-      MethodInvocation mi = (MethodInvocation)invocation;
-      Object[] args = mi.getArguments();
-      long timeout = args == null ? 0 : ((Long)args[0]).longValue();
-      
-      return getMessageCallbackHandler(invocation).receive(timeout);
-   }
-   
-   public Object handleReceiveNoWait(Invocation invocation) throws Throwable
-   {      
-      return getMessageCallbackHandler(invocation).receive(-1);
-   }
-   
-   public Object handleSetMessageListener(Invocation invocation) throws Throwable
-   {   
-      MethodInvocation mi = (MethodInvocation)invocation;
-      Object[] args = mi.getArguments();
-      MessageListener l = (MessageListener)args[0];
-      
-      getMessageCallbackHandler(invocation).setMessageListener(l);
-      
-      return null;
-   }
-   
-   public MessageListener handleGetMessageListener(Invocation invocation) throws Throwable
-   {       
-      return getMessageCallbackHandler(invocation).getMessageListener();
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-   
-   private MessageCallbackHandler getMessageCallbackHandler(Invocation inv)
-   {      
-      ConsumerState state = (ConsumerState)((DelegateSupport)inv.getTargetObject()).getState();
-      return state.getMessageCallbackHandler();      
-   }
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -26,18 +26,24 @@
 import java.util.List;
 
 import javax.jms.IllegalStateException;
+import javax.jms.Message;
 import javax.jms.Session;
+import javax.jms.TransactionInProgressException;
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.remoting.MessageCallbackHandler;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.jms.server.endpoint.DefaultCancel;
 import org.jboss.jms.server.endpoint.DeliveryInfo;
+import org.jboss.jms.tx.LocalTx;
 import org.jboss.jms.tx.ResourceManager;
 import org.jboss.logging.Logger;
 
@@ -109,7 +115,7 @@
           ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
           (state.isXA() && state.getCurrentTxId() == null))
       {
-         //Acknowledge any outstanding auto ack
+         //Acknowledge or cancel any outstanding auto ack
          
          DeliveryInfo remainingAutoAck = state.getAutoAckInfo();
          
@@ -117,8 +123,8 @@
          {
             if (trace) { log.trace(this + " handleClosing(). Found remaining auto ack. Will ack it " + remainingAutoAck.getDeliveryId()); }
             
-            ackDelivery(del, remainingAutoAck);
-            
+            ackDelivery(del, remainingAutoAck);            
+                        
             if (trace) { log.trace(this + " acked it"); }
             
             state.setAutoAckInfo(null);
@@ -139,34 +145,46 @@
          {
             DeliveryInfo ack = (DeliveryInfo)i.next();            
             
-            DefaultCancel cancel = new DefaultCancel(ack.getMessageProxy().getDeliveryId(), ack.getMessageProxy().getDeliveryCount());
+            DefaultCancel cancel = new DefaultCancel(ack.getMessageProxy().getDeliveryId(),
+                                                     ack.getMessageProxy().getDeliveryCount(),
+                                                     false, false);
+            
             cancels.add(cancel);
-         }
+         }         
          
          if (!cancels.isEmpty())
-         {            
-            del.cancelDeliveries(cancels);            
+         {
+            del.cancelDeliveries(cancels);
          }
          
          state.getClientAckList().clear();
       }
-      
-      
+            
       //TODO - we should also cancel any deliveries remaining in any transaction for the session
       //so the delivery count gets updated to the server, and not rely on the server side close
       //cancelling them
       
       return invocation.invokeNext();
    }
-
-
+   
    public Object handleClose(Invocation invocation) throws Throwable
    {      
       Object res = invocation.invokeNext();
+      
+      SessionState state = getState(invocation);
 
+      ConnectionState connState = (ConnectionState)state.getParent();
+
+      Object xid = state.getCurrentTxId();
+
+      if (xid != null)
+      {
+         //Remove transaction from the resource manager
+         connState.getResourceManager().removeTx(xid);
+      }
+
       // We must explicitly shutdown the executor
 
-      SessionState state = getState(invocation);
       state.getExecutor().shutdownNow();
 
       return res;
@@ -194,10 +212,7 @@
             throw new IllegalStateException("CLIENT_ACKNOWLEDGE cannot be used with a connection consumer");
          }
                   
-         state.getClientAckList().add(info);
-         
-         //We can return immediately
-         return null;
+         state.getClientAckList().add(info);         
       }
       else if (ackMode == Session.AUTO_ACKNOWLEDGE ||
                ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
@@ -206,36 +221,39 @@
          //We collect the single acknowledgement in the state.
          //Currently DUPS_OK is treated the same as AUTO_ACKNOWLDGE
          //Also XA sessions not enlisted in a global tx are treated as AUTO_ACKNOWLEDGE
-                  
-         
+                           
          if (trace) { log.trace(this + " delivery id: " + info.getDeliveryId() + " added to client ack member"); }
          
          state.setAutoAckInfo(info);         
-         
-         //We can return immediately         
-         return null;
       }
-              
-      //Transactional - need to carry on down the stack
-      return invocation.invokeNext();
-   }
+      else
+      {             
+         Object txID = state.getCurrentTxId();
    
-   /* Used for client acknowledge */
-   public Object handleAcknowledgeAll(Invocation invocation) throws Throwable
-   {    
-      MethodInvocation mi = (MethodInvocation)invocation;
-      SessionState state = getState(invocation);
-      SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-    
-      if (!state.getClientAckList().isEmpty())
-      {                 
-         //CLIENT_ACKNOWLEDGE can't be used with a MDB so it is safe to always acknowledge all
-         //on this session (rather than the connection consumer session)
-         del.acknowledgeDeliveries(state.getClientAckList());
+         if (txID != null)
+         {
+            // the session is non-XA and transacted, or XA and enrolled in a global transaction. An
+            // XA session that has not been enrolled in a global transaction behaves as a
+            // non-transacted session.
+            
+            ConnectionState connState = (ConnectionState)state.getParent();
+   
+            if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
+   
+            //If the ack is for a delivery that came through via a connection consumer then we
+            //use the connectionConsumer session as the session id, otherwise we use this sessions'
+            //session id
+            
+            ClientSessionDelegate connectionConsumerDelegate =
+               (ClientSessionDelegate)info.getConnectionConsumerSession();
+            
+            int sessionId = connectionConsumerDelegate != null ?
+               connectionConsumerDelegate.getID() : state.getSessionId();
+            
+            connState.getResourceManager().addAck(txID, sessionId, info);
+         }        
+      }
       
-         state.getClientAckList().clear();
-      }
-        
       return null;
    }
    
@@ -246,13 +264,6 @@
       
       int ackMode = state.getAcknowledgeMode();
       
-      boolean cancel = ((Boolean)mi.getArguments()[0]).booleanValue();
-      
-      if (cancel && ackMode != Session.AUTO_ACKNOWLEDGE && ackMode != Session.DUPS_OK_ACKNOWLEDGE)
-      {
-         throw new IllegalStateException("Ack mode must be AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE");
-      }
-      
       if (ackMode == Session.AUTO_ACKNOWLEDGE ||
           ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
           (state.isXA() && state.getCurrentTxId() == null))
@@ -263,6 +274,10 @@
          
          SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
 
+         //It is possible that session.recover() is called inside a message listener onMessage
+         //method - i.e. between the invocations of preDeliver and postDeliver.
+         //In this case we don't want to acknowledge the last delivered messages - since it
+         //will be redelivered
          if (!state.isRecoverCalled())
          {
             DeliveryInfo deliveryInfo = state.getAutoAckInfo();
@@ -274,14 +289,7 @@
                                  
             if (trace) { log.trace(this + " auto acking delivery " + deliveryInfo.getDeliveryId()); }
                         
-            if (cancel)
-            {
-               cancelDelivery(sd, deliveryInfo);
-            }
-            else
-            {
-               ackDelivery(sd, deliveryInfo);
-            }
+            ackDelivery(sd, deliveryInfo);            
             
             state.setAutoAckInfo(null);
          }
@@ -295,7 +303,26 @@
 
       return null;
    }
-                  
+   
+   /* Used for client acknowledge */
+   public Object handleAcknowledgeAll(Invocation invocation) throws Throwable
+   {    
+      MethodInvocation mi = (MethodInvocation)invocation;
+      SessionState state = getState(invocation);
+      SessionDelegate del = (SessionDelegate)mi.getTargetObject();            
+    
+      if (!state.getClientAckList().isEmpty())
+      {                 
+         //CLIENT_ACKNOWLEDGE can't be used with a MDB so it is safe to always acknowledge all
+         //on this session (rather than the connection consumer session)
+         del.acknowledgeDeliveries(state.getClientAckList());
+      
+         state.getClientAckList().clear();
+      }      
+        
+      return null;
+   }
+                       
    /*
     * Called when session.recover is called
     */
@@ -325,8 +352,12 @@
       }
       else
       {
+         //auto_ack or dups_ok
+         
          DeliveryInfo info = state.getAutoAckInfo();
          
+         //Don't recover if it's already to cancel
+         
          if (info != null)
          {
             List redels = new ArrayList();
@@ -377,18 +408,17 @@
    public Object handleRedeliver(Invocation invocation) throws Throwable
    {            
       MethodInvocation mi = (MethodInvocation)invocation;
-      SessionState state = getState(invocation);
+      SessionState state = getState(invocation);            
             
-      // We put the messages back in the front of their appropriate consumer buffers and set
-      // JMSRedelivered to true.
+      // We put the messages back in the front of their appropriate consumer buffers
       
       List toRedeliver = (List)mi.getArguments()[0];
-      
+       
       if (trace) { log.trace(this + " handleRedeliver() called: " + toRedeliver); }
       
       SessionDelegate del = (SessionDelegate)mi.getTargetObject();
       
-      // Need to be recovered in reverse order.
+      // Need to be redelivered in reverse order.
       for (int i = toRedeliver.size() - 1; i >= 0; i--)
       {
          DeliveryInfo info = (DeliveryInfo)toRedeliver.get(i);
@@ -400,11 +430,14 @@
          {
             // This is ok. The original consumer has closed, so we cancel the message
             
+            //FIXME - this needs to be done atomically for all cancels
+            
             cancelDelivery(del, info);
          }
          else
          {
             if (trace) { log.trace("Adding proxy back to front of buffer"); }
+            
             handler.addToFrontOfBuffer(proxy);
          }                                    
       }
@@ -412,6 +445,100 @@
       return null;  
    }
    
+   public Object handleCommit(Invocation invocation) throws Throwable
+   {
+      SessionState state = getState(invocation);
+
+      if (!state.isTransacted())
+      {
+         throw new IllegalStateException("Cannot commit a non-transacted session");
+      }
+
+      if (state.isXA())
+      {
+         throw new TransactionInProgressException("Cannot call commit on an XA session");
+      }
+
+      ConnectionState connState = (ConnectionState)state.getParent();
+      ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
+
+      try
+      {
+         connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
+      }
+      finally
+      {
+         //Start new local tx
+         Object xid = connState.getResourceManager().createLocalTx();
+
+         state.setCurrentTxId(xid);
+      }
+      
+      //TODO on commit we don't want to ACK any messages that have exceeded the max delivery count OR
+
+      return null;
+   }
+
+   public Object handleRollback(Invocation invocation) throws Throwable
+   {
+      SessionState state = getState(invocation);
+
+      if (!state.isTransacted())
+      {
+         throw new IllegalStateException("Cannot rollback a non-transacted session");
+      }
+
+      if (state.isXA())
+      {
+         throw new TransactionInProgressException("Cannot call rollback on an XA session");
+      }
+      
+      ConnectionState connState = (ConnectionState)state.getParent();
+      ResourceManager rm = connState.getResourceManager();
+      ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
+
+      try
+      {
+         rm.rollbackLocal((LocalTx)state.getCurrentTxId(), conn);
+      }
+      finally
+      {
+         // start new local tx
+         Object xid = rm.createLocalTx();
+         state.setCurrentTxId(xid);
+      }
+
+      return null;
+   }
+   
+   public Object handleSend(Invocation invocation) throws Throwable
+   {
+      SessionState state = getState(invocation);
+      Object txID = state.getCurrentTxId();
+
+      if (txID != null)
+      {
+         // the session is non-XA and transacted, or XA and enrolled in a global transaction, so
+         // we add the message to a transaction instead of sending it now. An XA session that has
+         // not been enrolled in a global transaction behaves as a non-transacted session.
+
+         ConnectionState connState = (ConnectionState)state.getParent();
+         MethodInvocation mi = (MethodInvocation)invocation;
+         Message m = (Message)mi.getArguments()[0];
+
+         if (trace) { log.trace("sending message " + m + " transactionally, queueing on resource manager txID=" + txID + " sessionID= " + state.getSessionId()); }
+
+         connState.getResourceManager().addMessage(txID, state.getSessionId(), (JBossMessage)m);
+
+         // ... and we don't invoke any further interceptors in the stack
+         return null;
+      }
+
+      if (trace) { log.trace("sending message NON-transactionally"); }
+
+      return invocation.invokeNext();
+   }
+   
    public Object handleGetXAResource(Invocation invocation) throws Throwable
    {
       return getState(invocation).getXAResource();
@@ -462,7 +589,8 @@
       
       SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
       
-      sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryId(), delivery.getMessageProxy().getDeliveryCount()));      
+      sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryId(),
+                                  delivery.getMessageProxy().getDeliveryCount(), false, false));      
    }
 
    // Inner Classes -------------------------------------------------

Deleted: trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -1,229 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.jms.client.container;
-
-import javax.jms.IllegalStateException;
-import javax.jms.Message;
-import javax.jms.TransactionInProgressException;
-
-import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.jms.client.delegate.ClientSessionDelegate;
-import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.client.state.HierarchicalState;
-import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.delegate.ConnectionDelegate;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.server.endpoint.DeliveryInfo;
-import org.jboss.jms.tx.LocalTx;
-import org.jboss.jms.tx.ResourceManager;
-import org.jboss.logging.Logger;
-
-/**
- * This aspect handles transaction related logic
- * 
- * This aspect is PER_VM.
- * 
- * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
- * @author <a href="mailto:ovidiu at jboss.com>Ovidiu Feodorov</a>
- *
- * $Id$
- */
-public class TransactionAspect
-{
-   // Constants -----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(TransactionAspect.class);
-
-   // Attributes ----------------------------------------------------
-
-   private boolean trace = log.isTraceEnabled();
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public Object handleClose(Invocation invocation) throws Throwable
-   {
-      Object res = invocation.invokeNext();
-
-      SessionState state = (SessionState)getState(invocation);
-
-      ConnectionState connState = (ConnectionState)state.getParent();
-
-      Object xid = state.getCurrentTxId();
-
-      if (xid != null)
-      {
-         //Remove transaction from the resource manager
-         connState.getResourceManager().removeTx(xid);
-      }
-
-      return res;
-   }
-
-   public Object handleCommit(Invocation invocation) throws Throwable
-   {
-      SessionState state = (SessionState)getState(invocation);
-
-      if (!state.isTransacted())
-      {
-         throw new IllegalStateException("Cannot commit a non-transacted session");
-      }
-
-      if (state.isXA())
-      {
-         throw new TransactionInProgressException("Cannot call commit on an XA session");
-      }
-
-      ConnectionState connState = (ConnectionState)state.getParent();
-      ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-
-      try
-      {
-         connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
-      }
-      finally
-      {
-         //Start new local tx
-         Object xid = connState.getResourceManager().createLocalTx();
-
-         state.setCurrentTxId(xid);
-      }
-
-      return null;
-   }
-
-   public Object handleRollback(Invocation invocation) throws Throwable
-   {
-      SessionState state = (SessionState)getState(invocation);
-
-      if (!state.isTransacted())
-      {
-         throw new IllegalStateException("Cannot rollback a non-transacted session");
-      }
-
-      if (state.isXA())
-      {
-         throw new TransactionInProgressException("Cannot call rollback on an XA session");
-      }
-
-      ConnectionState connState = (ConnectionState)state.getParent();
-      ResourceManager rm = connState.getResourceManager();
-      ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-
-      try
-      {
-         rm.rollbackLocal((LocalTx)state.getCurrentTxId(), conn);
-      }
-      finally
-      {
-         // start new local tx
-         Object xid = rm.createLocalTx();
-         state.setCurrentTxId(xid);
-      }
-
-      return null;
-   }
-
-   public Object handleSend(Invocation invocation) throws Throwable
-   {
-      SessionState state = (SessionState)getState(invocation);
-      Object txID = state.getCurrentTxId();
-
-      if (txID != null)
-      {
-         // the session is non-XA and transacted, or XA and enrolled in a global transaction, so
-         // we add the message to a transaction instead of sending it now. An XA session that has
-         // not been enrolled in a global transaction behaves as a non-transacted session.
-
-         ConnectionState connState = (ConnectionState)state.getParent();
-         MethodInvocation mi = (MethodInvocation)invocation;
-         Message m = (Message)mi.getArguments()[0];
-
-         if (trace) { log.trace("sending message " + m + " transactionally, queueing on resource manager txID=" + txID + " sessionID= " + state.getSessionId()); }
-
-         connState.getResourceManager().addMessage(txID, state.getSessionId(), (JBossMessage)m);
-
-         // ... and we don't invoke any further interceptors in the stack
-         return null;
-      }
-
-      if (trace) { log.trace("sending message NON-transactionally"); }
-
-      return invocation.invokeNext();
-   }
-
-   public Object handlePreDeliver(Invocation invocation) throws Throwable
-   {
-      SessionState state = (SessionState)getState(invocation);
-      Object txID = state.getCurrentTxId();
-
-      if (txID != null)
-      {
-         // the session is non-XA and transacted, or XA and enrolled in a global transaction. An
-         // XA session that has not been enrolled in a global transaction behaves as a
-         // non-transacted session.
-         
-         MethodInvocation mi = (MethodInvocation)invocation;
-         
-         DeliveryInfo info = (DeliveryInfo)mi.getArguments()[0];
-
-         ConnectionState connState = (ConnectionState)state.getParent();
-
-         if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
-
-         //If the ack is for a delivery that came through via a connection consumer then we
-         //use the connectionConsumer session as the session id, otherwise we use this sessions'
-         //session id
-         
-         ClientSessionDelegate connectionConsumerDelegate =
-            (ClientSessionDelegate)info.getConnectionConsumerSession();
-         
-         int sessionId = connectionConsumerDelegate != null ?
-            connectionConsumerDelegate.getID() : state.getSessionId();
-         
-         connState.getResourceManager().addAck(txID, sessionId, info);
-      }
-
-      return null;
-   }
-
-   // Protected ------------------------------------------------------
-
-   // Package Private ------------------------------------------------
-
-   // Private --------------------------------------------------------
-
-   private HierarchicalState getState(Invocation inv)
-   {
-      return ((DelegateSupport)inv.getTargetObject()).getState();
-   }
-
-   // Inner Classes --------------------------------------------------
-
-}
-
-

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -306,7 +306,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void postDeliver(boolean cancel) throws JMSException
+   public void postDeliver() throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -34,11 +34,14 @@
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.message.MessageProxy;
+import org.jboss.jms.server.endpoint.Cancel;
 import org.jboss.jms.server.endpoint.DefaultCancel;
 import org.jboss.jms.server.endpoint.DeliveryInfo;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Message;
 import org.jboss.messaging.util.Future;
 
+import EDU.oswego.cs.dl.util.concurrent.Executor;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
 /**
@@ -63,6 +66,67 @@
       log = Logger.getLogger(MessageCallbackHandler.class);
       trace = log.isTraceEnabled();
    }
+   
+   //FIXME temporary - until remoting provides true asynch invocations
+   static Executor exec = new QueuedExecutor();
+   
+   private static boolean checkExpiredOrReachedMaxdeliveries(MessageProxy proxy, SessionDelegate del,
+                                                             int maxDeliveries) throws JMSException
+   {
+      Message msg = proxy.getMessage();
+      
+      boolean expired = msg.isExpired();
+      
+      boolean reachedMaxDeliveries = proxy.getDeliveryCount() == maxDeliveries;
+      
+      if (expired || reachedMaxDeliveries)
+      {
+         if (trace)
+         {
+            if (expired)
+            {
+               log.trace("Message " + proxy.getMessage() + " has expired - cancelling to server");
+            }
+            else
+            {
+               log.trace("Message " + proxy.getMessage() + " has reached max deliveries - cancelling to server");
+            }
+         }
+         final Cancel cancel = new DefaultCancel(proxy.getDeliveryId(), proxy.getDeliveryCount(),
+                                           expired, reachedMaxDeliveries);
+         
+         //FIXME - this cancel should be sent using remoting true asynch invocations
+         //for now we just send on a different thread to prevent deadlocks
+         
+         final SessionDelegate sess = del;
+         
+         try
+         {
+            
+            exec.execute(new Runnable() { public void run()
+            {
+               try
+               {
+                  sess.cancelDelivery(cancel);
+               }
+               catch (JMSException e)
+               {
+                  log.error("Failed to cancel delivery", e);
+               }
+            }});
+         }
+         catch (InterruptedException e)
+         {
+            log.error("Thread interrupted", e);
+         }
+                  
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
      
    //This is static so it can be called by the asf layer too
    public static void callOnMessage(SessionDelegate sess,
@@ -75,79 +139,58 @@
                                     int maxDeliveries,
                                     SessionDelegate connectionConsumerSession)
       throws JMSException
-   {
+   {      
+      if (checkExpiredOrReachedMaxdeliveries(m, sess, maxDeliveries))
+      {
+         //Message has been cancelled
+         return;
+      }
+      
+      DeliveryInfo deliveryInfo = new DeliveryInfo(m, consumerID, channelID, connectionConsumerSession);
+            
+      m.incDeliveryCount();
+      
       // If this is the callback-handler for a connection consumer we don't want to acknowledge or
       // add anything to the tx for this session.
       if (!isConnectionConsumer)
       {
-         sess.preDeliver(new DeliveryInfo(m, consumerID, channelID, connectionConsumerSession));
-      }  
-                  
-      int tries = 0;
+         //We need to call preDeliver, deliver the message then call postDeliver - this is because
+         //it is legal to call session.recover(), or session.rollback() from within the onMessage()
+         //method in which case the last message needs to be delivered so it needs to know about it
+         sess.preDeliver(deliveryInfo);
+      } 
       
-      boolean cancel = false;
-      
-      while (true)
+      try
       {
-         try
-         {
-            if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
-            
-            m.incDeliveryCount();
-            
-            listener.onMessage(m);
+         if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
+                     
+         listener.onMessage(m);
 
-            if (trace) { log.trace("listener's onMessage() finished"); }
-            
-            break;
-         }
-         catch (RuntimeException e)
-         {
-            long id = m.getMessage().getMessageID();
-   
-            log.error("RuntimeException was thrown from onMessage, " + id + " will be redelivered", e);
-            
-            // See JMS 1.1 spec 4.5.2
-   
-            if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
-            {
-               // We redeliver a certain number of times
-               if (tries < maxDeliveries)
-               {                            
-                  tries++;
-               }
-               else
-               {
-                  log.error("Max redeliveries has occurred for message: " + m.getJMSMessageID());
-                  
-                  // postdeliver will do a cancel rather than an ack which will cause the mesage
-                  // to end up in the DLQ
-                  
-                  cancel = true;
-                  
-                  break;
-               }
-            }
-            else
-            {
-               // Session is either transacted or CLIENT_ACKNOWLEDGE
-               // We just deliver next message
-               if (trace) { log.trace("ignoring exception on " + id); }
-               
-               break;
-            }
-         }
+         if (trace) { log.trace("listener's onMessage() finished"); }
       }
+      catch (RuntimeException e)
+      {
+         long id = m.getMessage().getMessageID();
 
+         log.error("RuntimeException was thrown from onMessage, " + id + " will be redelivered", e);
+         
+         // See JMS 1.1 spec 4.5.2
+
+         if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+         {              
+            sess.recover();
+         }
+      }   
+
       if (!sess.isClosed())
       {
          // postDeliver only if the session is not closed
-
+               
          // If this is the callback-handler for a connection consumer we don't want to acknowledge or
          // add anything to the tx for this session
          if (!isConnectionConsumer)
          {
-            sess.postDeliver(cancel);
+            sess.postDeliver();
          }   
       }
    }
@@ -278,7 +321,7 @@
          // delivered in a session.
          
          if (!buffer.isEmpty())
-         {            
+         {                        
             // Now we cancel any deliveries that might be waiting in our buffer. This is because
             // otherwise the messages wouldn't get cancelled until the corresponding session died.
             // So if another consumer in another session tried to consume from the channel before that
@@ -293,7 +336,7 @@
             {
                MessageProxy mp = (MessageProxy)i.next();
                
-               DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount());
+               DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount(), false, false);
                
                cancels.add(ack);
             }
@@ -409,17 +452,20 @@
                               
                if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
                        
-               // If message is expired we still call pre and post deliver. This makes sure the
-               // message is acknowledged so it gets removed from the queue/subscription.
-
-               if (!isConnectionConsumer)
+               boolean ignore = checkExpiredOrReachedMaxdeliveries(m, sessionDelegate, maxDeliveries);
+               
+               if (!isConnectionConsumer && !ignore)
                {
-                  sessionDelegate.preDeliver(new DeliveryInfo(m, consumerID, channelID, null));
+                  DeliveryInfo info = new DeliveryInfo(m, consumerID, channelID, null);
+                                                    
+                  m.incDeliveryCount();           
+ 
+                  sessionDelegate.preDeliver(info);                  
                   
-                  sessionDelegate.postDeliver(false);
+                  sessionDelegate.postDeliver();                                    
                }
-               
-               if (!m.getMessage().isExpired())
+                                             
+               if (!ignore)
                {
                   if (trace) { log.trace(this + ": message " + m + " is not expired, pushing it to the caller"); }
                   
@@ -428,7 +474,7 @@
                
                if (trace)
                {
-                  log.trace(this + ": message expired, discarding");
+                  log.trace(this + ": message expired or exceeded max deliveries, discarding");
                }
                
                // the message expired, so discard the message, adjust timeout and reenter the buffer
@@ -456,8 +502,6 @@
          sendChangeRateMessage(1);                    
       }
       
-      m.incDeliveryCount();
-      
       if (trace) { log.trace(this + " receive() returning " + m); }
       
       return m;
@@ -480,7 +524,7 @@
 
    public void setConsumerId(int consumerId)
    {
-       this.consumerID=consumerId;
+       this.consumerID = consumerId;
    }
    
    public void addToFrontOfBuffer(MessageProxy proxy)

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -72,6 +72,9 @@
    // List<DeliveryInfo>
    private List clientAckList;
    
+   // List<DeliveryInfo>
+   //private List clientCancelList;
+   
    private DeliveryInfo autoAckInfo;
 
    private ConnectionState parent;
@@ -111,6 +114,8 @@
       
       clientAckList = new ArrayList();
       
+     // clientCancelList = new ArrayList();
+      
       // TODO could optimise this to use the same map of callbackmanagers (which holds refs
       // to callbackhandlers) in the connection, instead of maintaining another map
       callbackHandlers = new HashMap();
@@ -135,8 +140,7 @@
    {
       this.delegate=(SessionDelegate)delegate;
    }
-   
-   
+      
    /**
     * @return List<AckInfo>
     */
@@ -232,9 +236,7 @@
    // When failing over a session, we keep the old session's state but there are certain fields
    // we need to update
    public void copyState(SessionState newState)
-   {      
-      //this.delegate = newState.delegate;
-      
+   {       
       this.sessionId = newState.sessionId;
    }
 }

Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -68,7 +68,7 @@
    
    void preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
    
-   void postDeliver(boolean cancel) throws JMSException;
+   void postDeliver() throws JMSException;
    
    MessageListener getMessageListener() throws JMSException;
    

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -30,6 +30,7 @@
 import java.util.Set;
 
 import javax.management.Attribute;
+import javax.management.InstanceNotFoundException;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -37,6 +38,7 @@
 import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
 import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
 import org.jboss.jms.server.connectormanager.SimpleConnectorManager;
+import org.jboss.jms.server.destination.ManagedQueue;
 import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
 import org.jboss.jms.server.plugin.contract.JMSUserManager;
 import org.jboss.jms.server.remoting.JMSServerInvocationHandler;
@@ -106,10 +108,9 @@
 
    private int objectIDSequence = 1;
 
-   private int maxDeliveryAttempts = 10;
+   //The default maximum number of delivery attempts before sending to DLQ - can be overridden on the destination
+   private int defaultMaxDeliveryAttempts = 10;
 
-   private String dlqName;
-
    private Object failoverStatusLock;
    
    private long failoverStartTimeout = 3000;
@@ -143,6 +144,12 @@
 
    protected ObjectName jmsUserManagerObjectName;
    protected JMSUserManager jmsUserManager;
+   
+   protected ObjectName defaultDLQObjectName;
+   protected Queue defaultDLQ;
+   
+   protected ObjectName defaultExpiryQueueObjectName;
+   protected Queue defaultExpiryQueue;
 
    //Other stuff
 
@@ -304,60 +311,81 @@
 
    // JMX Attributes ------------------------------------------------
 
-   public String getDLQName()
+   // Plugins
+   
+   public synchronized ObjectName getPersistenceManager()
    {
-      return dlqName;
+      return persistenceManagerObjectName;
    }
 
-   public void setDLQName(String dlqName)
+   public synchronized void setPersistenceManager(ObjectName on)
    {
-      this.dlqName = dlqName;
+      if (started)
+      {
+         log.warn("Cannot set persistence manager on server peer when server peer is started");
+         return;         
+      }
+      persistenceManagerObjectName = on;
    }
 
-   public int getMaxDeliveryAttempts()
+   public synchronized ObjectName getPostOffice()
    {
-      return maxDeliveryAttempts;
+      return postOfficeObjectName;
    }
 
-   public void setMaxDeliveryAttempts(int attempts)
+   public synchronized void setPostOffice(ObjectName on)
    {
-      this.maxDeliveryAttempts = attempts;
+      if (started)
+      {
+         log.warn("Cannot set post office on server peer when server peer is started");
+         return;         
+      }
+      postOfficeObjectName = on;
    }
 
-   public ObjectName getPersistenceManager()
+   public synchronized ObjectName getJmsUserManager()
    {
-      return persistenceManagerObjectName;
+      return jmsUserManagerObjectName;
    }
 
-   public void setPersistenceManager(ObjectName on)
+   public synchronized void setJMSUserManager(ObjectName on)
    {
-      persistenceManagerObjectName = on;
+      if (started)
+      {
+         log.warn("Cannot set jms user manager on server peer when server peer is started");
+         return;         
+      }
+      jmsUserManagerObjectName = on;
    }
-
-   public ObjectName getPostOffice()
+   
+   public synchronized ObjectName getDefaultDLQ()
    {
-      return postOfficeObjectName;
+      return defaultDLQObjectName;
    }
 
-   public void setPostOffice(ObjectName on)
+   public synchronized void setDefaultDLQ(ObjectName on)
    {
-      postOfficeObjectName = on;
+      defaultDLQObjectName = on;
    }
-
-   public ObjectName getJmsUserManager()
+   
+   public synchronized ObjectName getDefaultExpiryQueue()
    {
-      return jmsUserManagerObjectName;
+      return defaultExpiryQueueObjectName;
    }
 
-   public void setJMSUserManager(ObjectName on)
+   public synchronized void setDefaultExpiryQueue(ObjectName on)
    {
-      jmsUserManagerObjectName = on;
+      this.defaultExpiryQueueObjectName = on;
    }
+   
+   // Instance access
 
    public Object getInstance()
    {
       return this;
    }
+   
+   //read only JMX attributes
 
    public String getJMSVersion()
    {
@@ -408,8 +436,10 @@
    {
       return defaultTopicJNDIContext;
    }
+   
+   //Read - write attributes
 
-   public void setSecurityDomain(String securityDomain) throws Exception
+   public synchronized void setSecurityDomain(String securityDomain) throws Exception
    {
       try
       {
@@ -421,66 +451,66 @@
       }
    }
 
-   public String getSecurityDomain()
+   public synchronized String getSecurityDomain()
    {
       return securityStore.getSecurityDomain();
    }
 
-   public void setDefaultSecurityConfig(Element conf) throws Exception
+   public synchronized void setDefaultSecurityConfig(Element conf) throws Exception
    {
       securityStore.setDefaultSecurityConfig(conf);
    }
 
-   public Element getDefaultSecurityConfig()
+   public synchronized Element getDefaultSecurityConfig()
    {
       return securityStore.getDefaultSecurityConfig();
    }
-
-   public IDManager getMessageIDManager()
+        
+   public synchronized int getQueuedExecutorPoolSize()
    {
-      return messageIDManager;
-   }
-
-   public IDManager getChannelIDManager()
-   {
-      return channelIDManager;
-   }
-
-   public ServerInvocationHandler getInvocationHandler()
-   {
-      return handler;
-   }
-
-   public int getQueuedExecutorPoolSize()
-   {
       return queuedExecutorPoolSize;
    }
 
-   public void setQueuedExecutorPoolSize(int poolSize)
+   public synchronized void setQueuedExecutorPoolSize(int poolSize)
    {
+      if (started)
+      {
+         log.warn("Cannot set jms queued executor pool size on server peer when server peer is started");
+         return;         
+      }
       this.queuedExecutorPoolSize = poolSize;
    }
    
-   public long getFailoverStartTimeout()
+   public synchronized long getFailoverStartTimeout()
    {
       return this.failoverStartTimeout;
    }
    
-   public void setFailoverStartTimeout(long timeout)
+   public synchronized void setFailoverStartTimeout(long timeout)
    {
       this.failoverStartTimeout = timeout;
    }
    
-   public long getFailoverCompleteTimeout()
+   public synchronized long getFailoverCompleteTimeout()
    {
       return this.failoverCompleteTimeout;
    }
    
-   public void setFailoverCompleteTimeout(long timeout)
+   public synchronized void setFailoverCompleteTimeout(long timeout)
    {
       this.failoverCompleteTimeout = timeout;
    }
    
+   public synchronized int getDefaultMaxDeliveryAttempts()
+   {
+      return defaultMaxDeliveryAttempts;
+   }
+
+   public synchronized void setDefaultMaxDeliveryAttempts(int attempts)
+   {
+      this.defaultMaxDeliveryAttempts = attempts;
+   }
+   
    // JMX Operations ------------------------------------------------
 
    public String createQueue(String name, String jndiName) throws Exception
@@ -569,6 +599,21 @@
 
    // Public --------------------------------------------------------
    
+   public IDManager getMessageIDManager()
+   {
+      return messageIDManager;
+   }
+
+   public IDManager getChannelIDManager()
+   {
+      return channelIDManager;
+   }
+
+   public ServerInvocationHandler getInvocationHandler()
+   {
+      return handler;
+   }
+   
    public ServerSessionEndpoint getSession(Integer sessionID)
    {
       return (ServerSessionEndpoint)sessions.get(sessionID);
@@ -587,26 +632,72 @@
       }
    }
 
-   public Queue getDLQ() throws Exception
+   public synchronized Queue getDefaultDLQInstance() throws Exception
    {
-      if (dlqName == null)
-      {
-         //No DLQ name specified so there is no DLQ
-         return null;
+      Queue dlq = null;
+      
+      if (defaultDLQObjectName != null)
+      { 
+         ManagedQueue dest = null;
+         
+         try
+         {         
+            dest = (ManagedQueue)getServer().
+               getAttribute(defaultDLQObjectName, "Instance");
+         }
+         catch (InstanceNotFoundException e)
+         {
+            //Ok
+         }
+         
+         if (dest != null)
+         {            
+            PostOffice po = getPostOfficeInstance();
+            
+            Binding binding = po.getBindingForQueueName(dest.getName());
+            
+            if (binding != null && binding.getQueue().isActive())
+            {
+               dlq =  binding.getQueue();
+            }
+         }
       }
-
-      PostOffice postOffice = getPostOfficeInstance();
-
-      Binding binding = postOffice.getBindingForQueueName(dlqName);
       
-      if (binding != null && binding.getQueue().isActive())
+      return dlq;
+   }
+   
+   public synchronized Queue getDefaultExpiryQueueInstance() throws Exception
+   {
+      Queue expiryQueue = null;
+      
+      if (defaultExpiryQueueObjectName != null)
       {
-         return binding.getQueue();
+         ManagedQueue dest = null;
+         
+         try
+         {         
+            dest = (ManagedQueue)getServer().
+               getAttribute(defaultExpiryQueueObjectName, "Instance");
+         }
+         catch (InstanceNotFoundException e)
+         {
+            //Ok
+         }
+
+         if (dest != null)
+         {            
+            PostOffice po = getPostOfficeInstance();
+            
+            Binding binding = po.getBindingForQueueName(dest.getName());
+            
+            if (binding != null && binding.getQueue().isActive())
+            {
+               expiryQueue =  binding.getQueue();
+            }
+         }
       }
-      else
-      {
-         return null;
-      }   
+      
+      return expiryQueue;
    }
 
    public TransactionRepository getTxRepository()
@@ -735,8 +826,8 @@
       
       Replicator replicator = getReplicator();
 
-      long startToWait = failoverStartTimeout;
-      long completeToWait = failoverCompleteTimeout;
+      long startToWait = getFailoverStartTimeout();
+      long completeToWait = getFailoverCompleteTimeout();
                      
       // Must lock here
       synchronized (failoverStatusLock)

Modified: trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -6,6 +6,7 @@
  */
 package org.jboss.jms.server.destination;
 
+import javax.management.InstanceNotFoundException;
 import javax.management.ObjectName;
 
 import org.jboss.jms.server.DestinationManager;
@@ -13,10 +14,14 @@
 import org.jboss.jms.server.SecurityManager;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.util.ExceptionUtil;
+import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.plugin.IDManager;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.plugin.contract.MessagingComponent;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.plugin.contract.ServerPlugin;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.system.ServiceMBeanSupport;
 import org.w3c.dom.Element;
@@ -33,7 +38,7 @@
  *
  * $Id$
  */
-public abstract class DestinationServiceSupport extends ServiceMBeanSupport
+public abstract class DestinationServiceSupport extends ServiceMBeanSupport implements ServerPlugin
 {
    // Constants -----------------------------------------------------
 
@@ -43,6 +48,10 @@
 
    private ObjectName serverPeerObjectName;
    
+   private ObjectName dlqObjectName;
+   
+   private ObjectName expiryQueueObjectName;
+   
    protected boolean started = false;
    
    protected ManagedDestination destination;
@@ -79,6 +88,13 @@
    public DestinationServiceSupport()
    {
    }
+   
+   // ServerPlugin implementation ------------------------------------------
+   
+   public MessagingComponent getInstance()
+   {
+      return destination;
+   }
 
    // ServiceMBeanSupport overrides -----------------------------------
    
@@ -167,7 +183,74 @@
    {
       return serverPeerObjectName;
    }
-
+   
+   public void setDLQ(ObjectName on)
+   {
+      dlqObjectName = on;
+      
+      ManagedQueue dest = null;
+      
+      try
+      {
+         
+         try
+         {         
+            dest = (ManagedQueue)getServer().
+               getAttribute(dlqObjectName, "Instance");
+         }
+         catch (InstanceNotFoundException e)
+         {
+            //Ok
+         }
+         
+         Queue dlq = null;
+   
+         if (dest != null)
+         {            
+            Binding binding = postOffice.getBindingForQueueName(dest.getName());
+            
+            if (binding != null && binding.getQueue().isActive())
+            {
+               dlq =  binding.getQueue();
+            }
+         }
+         
+         destination.setDLQ(dlq); 
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to set DLQ", e);
+      }
+   }
+   
+   public ObjectName getDLQ()
+   {
+      return dlqObjectName;
+   }
+   
+   public void setExpiryQueue(ObjectName on)
+   {
+      expiryQueueObjectName = on;
+      
+      Queue expiryQueue = null;
+      
+      try
+      {
+         expiryQueue = (Queue)server.getAttribute(expiryQueueObjectName, "Instance");
+      }
+      catch (Exception e)
+      {
+         //Ok
+      }
+      
+      destination.setExpiryQueue(expiryQueue);
+   }
+   
+   public ObjectName getExpiryQueue()
+   {
+      return expiryQueueObjectName;
+   }
+   
    public void setSecurityConfig(Element securityConfig) throws Exception
    {
       try

Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -21,6 +21,8 @@
  */
 package org.jboss.jms.server.destination;
 
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.plugin.contract.MessagingComponent;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.w3c.dom.Element;
 
@@ -33,7 +35,7 @@
  * $Id$
  *
  */
-public abstract class ManagedDestination
+public abstract class ManagedDestination implements MessagingComponent
 {
    private static final int DEFAULT_FULL_SIZE = 75000;
    
@@ -62,6 +64,10 @@
    
    protected PostOffice postOffice;
    
+   protected Queue dlq;
+   
+   protected Queue expiryQueue;
+   
    public ManagedDestination()
    {      
    }
@@ -167,5 +173,35 @@
       this.temporary = temporary;
    }
    
+   public Queue getDLQ()
+   {
+      return dlq;
+   }
+   
+   public void setDLQ(Queue dlq)
+   {
+      this.dlq = dlq;
+   }
+   
+   public Queue getExpiryQueue()
+   {
+      return expiryQueue;
+   }
+   
+   public void setExpiryQueue(Queue expiryQueue)
+   {
+      this.expiryQueue = expiryQueue;
+   }
+   
    public abstract boolean isQueue();
+
+   public void start() throws Exception
+   {
+      //NOOP
+   }
+
+   public void stop() throws Exception
+   {   
+      //NOOP
+   }
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -35,4 +35,8 @@
    long getDeliveryId();
    
    int getDeliveryCount();
+   
+   boolean isExpired();
+   
+   boolean isReachedMaxDeliveryAttempts();
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -46,7 +46,11 @@
    
    private long deliveryId;
    
-   private int deliveryCount;      
+   private int deliveryCount;    
+   
+   private boolean expired;
+   
+   private boolean reachedMaxDeliveryAttempts;
 
    // Static --------------------------------------------------------
    
@@ -56,11 +60,15 @@
    {      
    }
    
-   public DefaultCancel(long deliveryId, int deliveryCount)
+   public DefaultCancel(long deliveryId, int deliveryCount, boolean expired, boolean maxDeliveries)
    {      
       this.deliveryId = deliveryId;
       
       this.deliveryCount = deliveryCount;
+      
+      this.expired = expired;
+      
+      this.reachedMaxDeliveryAttempts = maxDeliveries;
    }
 
    // Public --------------------------------------------------------
@@ -74,6 +82,16 @@
    {
       return deliveryCount;
    }
+   
+   public boolean isExpired()
+   {
+      return expired;
+   }
+   
+   public boolean isReachedMaxDeliveryAttempts()
+   {
+      return reachedMaxDeliveryAttempts;
+   }
 
    // Class YYY overrides -------------------------------------------
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -58,6 +58,10 @@
    //to the connection consumer's session, otherwise it will be null
    private SessionDelegate connectionConsumerSession;
    
+  // private boolean exceedDeliveryAttempts;
+   
+  // private boolean expired;
+   
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
@@ -95,6 +99,31 @@
    {
       return connectionConsumerSession;
    }
+   
+//   public void setExceededDeliveryAttempts()
+//   {
+//      exceedDeliveryAttempts = true;
+//   }
+//   
+//   public void setExpired()
+//   {
+//      expired = true;
+//   }
+//        
+//   public boolean exceededDeliveryAttempts()
+//   {
+//      return exceedDeliveryAttempts;
+//   }
+//   
+//   public boolean isExpired()
+//   {
+//      return expired;
+//   }
+//   
+//   public boolean isToCancel()
+//   {
+//      return exceedDeliveryAttempts || expired;
+//   }
 
    public String toString()
    {

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -29,6 +29,7 @@
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.jms.selector.Selector;
+import org.jboss.jms.server.destination.ManagedDestination;
 import org.jboss.jms.server.remoting.JMSDispatcher;
 import org.jboss.jms.server.remoting.MessagingMarshallable;
 import org.jboss.jms.util.ExceptionUtil;
@@ -37,6 +38,7 @@
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.DeliveryObserver;
 import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.Receiver;
 import org.jboss.messaging.core.Routable;
 import org.jboss.messaging.core.SimpleDelivery;
@@ -92,6 +94,10 @@
 
    private JBossDestination destination;
    
+   private Queue dlq;
+   
+   private Queue expiryQueue;
+   
    private boolean started;
    
    //This lock protects starting and stopping
@@ -103,9 +109,10 @@
    // Constructors --------------------------------------------------
 
    ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
-                         ServerSessionEndpoint sessionEndpoint,
-                         String selector, boolean noLocal, JBossDestination dest)
-                         throws InvalidSelectorException
+                          ServerSessionEndpoint sessionEndpoint,
+                          String selector, boolean noLocal, JBossDestination dest,
+                          Queue dlq, Queue expiryQueue)
+                          throws InvalidSelectorException
    {
       if (trace) { log.trace("constructing consumer endpoint " + id); }
 
@@ -125,6 +132,10 @@
       
       this.destination = dest;
       
+      this.dlq = dlq;
+      
+      this.expiryQueue = expiryQueue;
+      
       //Always start as false - wait for consumer to initiate
       this.clientAccepting = false;
       
@@ -164,6 +175,20 @@
          
          return null;
       }
+      
+      if (ref.isExpired())
+      {
+         SimpleDelivery delivery = new SimpleDelivery(observer, ref);
+         
+         try
+         {
+            sessionEndpoint.expireDelivery(delivery, expiryQueue);
+         }
+         catch (Throwable t)
+         {
+            log.error("Failed to expire delivery: " + delivery, t);
+         }
+      }
         
       synchronized (startStopLock)
       {         
@@ -385,6 +410,16 @@
    
    // Package protected ---------------------------------------------
    
+   Queue getDLQ()
+   {
+      return dlq;
+   }
+   
+   Queue getExpiryQueue()
+   {
+      return expiryQueue;
+   }
+     
    void localClose() throws Throwable
    {      
       if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -135,7 +135,8 @@
    private PostOffice postOffice;
    private int nodeId;
    private int maxDeliveryAttempts;
-   private Queue dlq;
+   private Queue defaultDLQ;
+   private Queue defaultExpiryQueue;
    
    // Map < deliveryId, Delivery>
    private Map deliveries;
@@ -166,9 +167,10 @@
       consumers = new HashMap();
 		browsers = new HashMap();
       
-      dlq = sp.getDLQ();
+      defaultDLQ = sp.getDefaultDLQInstance();
+      defaultExpiryQueue = sp.getDefaultExpiryQueueInstance();
       tr = sp.getTxRepository();
-      maxDeliveryAttempts = sp.getMaxDeliveryAttempts();
+      maxDeliveryAttempts = sp.getDefaultMaxDeliveryAttempts();
       
       deliveries = new ConcurrentHashMap();
       
@@ -371,7 +373,7 @@
          throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledgeBatch");
       }
    }
-       
+             
    public void cancelDelivery(Cancel cancel) throws JMSException
    {
       if (trace) {log.trace(this + " cancelDelivery " + cancel); }
@@ -386,31 +388,30 @@
       catch (Throwable t)
       {
          throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDelivery");
-      }
-     
-   }      
+      }     
+   }            
 
    public void cancelDeliveries(List cancels) throws JMSException
    {
       if (trace) {log.trace(this + " cancelDeliveries " + cancels); }
-      
+        
       try
       {
          // deliveries must be cancelled in reverse order
 
          Set channels = new HashSet();
-                           
+                          
          for (int i = cancels.size() - 1; i >= 0; i--)
          {
             Cancel cancel = (Cancel)cancels.get(i);       
             
             if (trace) { log.trace(this + " cancelling delivery " + cancel.getDeliveryId()); }
-            
+                        
             Delivery del = cancelDeliveryInternal(cancel);
             
             channels.add(del.getObserver());
          }
-                  
+                              
          // need to prompt delivery for all affected channels
          
          promptDelivery(channels);
@@ -431,8 +432,6 @@
             throw new IllegalStateException("Recovering deliveries but post office is not clustered!");
          }
          
-         ClusteredPostOffice po = (ClusteredPostOffice)postOffice;
-         
          long maxDeliveryId = 0;
                   
          //Sort into different list for each channel
@@ -465,7 +464,7 @@
             Long channelId = (Long)entry.getKey();
             
             //Look up channel
-            Binding binding = po.getBindingforChannelId(channelId.longValue());
+            Binding binding = postOffice.getBindingforChannelId(channelId.longValue());
             
             if (binding == null)
             {
@@ -703,7 +702,23 @@
    }
 
    // Package protected ---------------------------------------------
+   
+   void expireDelivery(Delivery del, Queue expiryQueue) throws Throwable
+   {
+      if (trace) { log.trace("Reference has expired: " + del.getReference()); }
       
+      if (expiryQueue != null)
+      {
+         if (trace) { log.trace("Sending to expiry queue"); }
+         
+         moveInTransaction(del, expiryQueue);
+      }
+      else
+      {
+         log.warn("No expiry queue has been configured so removing the reference");
+      }
+   }
+      
    void cancelDeliveriesForConsumerAfterDeliveryId(int consumerId, long lastDeliveryId) throws Throwable
    {
       //Need to cancel in reverse
@@ -921,20 +936,6 @@
 
    // Private -------------------------------------------------------
    
-   private void acknowledgeDeliveryInternal(Ack ack) throws Throwable
-   {
-      if (trace) { log.trace(this + " acknowledging delivery " + ack.getDeliveryId()); }
-      
-      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryId()));
-      
-      if (rec == null)
-      {
-         throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
-      }
-      
-      rec.del.acknowledge(null);    
-   } 
-   
    private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
    {
       DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
@@ -943,51 +944,112 @@
       {
          throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
       }
-                              
-      if (cancel.getDeliveryCount() >= maxDeliveryAttempts)
+                 
+      //Note we check the flag *and* evaluate again, this is because the server and client clocks may
+      //be out of synch and don't want to send back to the client a message it thought it has sent to
+      //the expiry queue  
+      boolean expired = cancel.isExpired() || rec.del.getReference().isExpired();
+      
+      //Note we check the flag *and* evaluate again, this is because the server value of maxDeliveries
+      //might get changed after the client has sent the cancel - and we don't want to end up cancelling
+      //back to the original queue
+      boolean reachedMaxDeliveryAttempts =
+         cancel.isReachedMaxDeliveryAttempts() || cancel.getDeliveryCount() >= maxDeliveryAttempts;
+         
+      if (!expired && !reachedMaxDeliveryAttempts)
       {
-         //Send to DLQ
+         //Normal cancel back to the queue
          
-         //We do this in a tx so we don't end up with the message in both the original queue
-         //and the dlq if it fails half way through
-         Transaction tx = tr.createTransaction();
+         rec.del.getReference().setDeliveryCount(cancel.getDeliveryCount());
          
-         try
-         {               
-            if (dlq != null)
-            {         
-               //reset delivery count to zero
-               rec.del.getReference().setDeliveryCount(0);
-               
-               dlq.handle(null, rec.del.getReference(), tx);
-               
-               rec.del.acknowledge(tx);           
-            }
-            else
-            {
-               log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
-               
-               rec.del.acknowledge(tx);
-            }                              
-                        
-            tx.commit();
+         rec.del.cancel();
+      }
+      else
+      {
+         ServerConsumerEndpoint consumer = null;
+         
+         synchronized (consumers)
+         {
+            consumer = (ServerConsumerEndpoint)consumers.get(new Integer(rec.consumerId));
          }
-         catch (Throwable t)
+         
+         if (consumer == null)
          {
-            tx.rollback();
+            throw new IllegalStateException("Cannot find consumer with id " + rec.consumerId);
+         }
+         
+         if (expired)
+         {
+            //Sent to expiry queue
             
-            throw t;
-         }         
+            this.moveInTransaction(rec.del, consumer.getExpiryQueue());
+         }
+         else
+         {
+            //Send to DLQ
+            
+            this.moveInTransaction(rec.del, consumer.getDLQ());
+         }
+      }      
+      
+      return rec.del;
+   }      
+   
+   private void moveInTransaction(Delivery del, Queue queue) throws Throwable
+   {
+      Transaction tx = tr.createTransaction();
+      
+      try
+      {               
+         if (queue != null)
+         {                               
+            //Need to reset expiration and delivery account
+            del.getReference().setExpiration(0);
+            del.getReference().getMessage().setExpiration(0);
+            del.getReference().setDeliveryCount(0);
+            
+            queue.handle(null, del.getReference(), tx);
+            
+            del.acknowledge(tx);           
+         }
+         else
+         {
+            log.warn("Cannot move to destination since destination has not been deployed! The message will be removed");
+            
+            del.acknowledge(tx);
+         }             
+         
+         tx.commit();
+         
+         if (queue != null)
+         {
+            queue.deliver(false);
+         }
       }
-      else
-      {                                                   
-         rec.del.getReference().setDeliveryCount(cancel.getDeliveryCount());
+      catch (Throwable t)
+      {
+         tx.rollback();
          
-         rec.del.cancel();
+         throw t;
+      } 
+   }
+   
+   
+   private void acknowledgeDeliveryInternal(Ack ack) throws Throwable
+   {
+      if (trace) { log.trace(this + " acknowledging delivery " + ack.getDeliveryId()); }
+      
+      DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryId()));
+      
+      if (rec == null)
+      {
+         throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
       }
       
-      return rec.del;
-   }
+      rec.del.acknowledge(null);    
+   } 
+   
+   
 
    private ConsumerDelegate failoverConsumer(JBossDestination jmsDestination,
                                              String selectorString,
@@ -1023,10 +1085,22 @@
       int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
       int prefetchSize = connectionEndpoint.getPrefetchSize();
       
+      ManagedDestination dest = 
+         sp.getDestinationManager().getDestination(jmsDestination.getName(), jmsDestination.isQueue());
+      
+      if (dest == null)
+      {
+         throw new IllegalStateException("Cannot find managed destination for dest: " + jmsDestination);
+      }
+      
+      Queue dlqToUse = dest.getDLQ() == null ? defaultDLQ : dest.getDLQ();
+      
+      Queue expiryQueueToUse = dest.getExpiryQueue() == null ? defaultExpiryQueue : dest.getExpiryQueue();
+            
       ServerConsumerEndpoint ep =
          new ServerConsumerEndpoint(consumerID, binding.getQueue(),
                                     binding.getQueue().getName(), this, selectorString, noLocal,
-                                    jmsDestination);
+                                    jmsDestination, dlqToUse, expiryQueueToUse);
       
       JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
 
@@ -1295,10 +1369,14 @@
       
       int prefetchSize = connectionEndpoint.getPrefetchSize();
       
+      Queue dlqToUse = mDest.getDLQ() == null ? defaultDLQ : mDest.getDLQ();
+      
+      Queue expiryQueueToUse = mDest.getExpiryQueue() == null ? defaultExpiryQueue : mDest.getExpiryQueue();
+      
       ServerConsumerEndpoint ep =
          new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
                   binding.getQueue().getName(), this, selectorString, noLocal,
-                  jmsDestination);
+                  jmsDestination, dlqToUse, expiryQueueToUse);
       
       JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
       
@@ -1342,7 +1420,7 @@
     * In such a case we might otherwise end up with the consumer closing but not all it's deliveries being
     * cancelled, which would mean they wouldn't be cancelled until the session is closed which is too late
     */
-   private class DeliveryRecord
+   private static class DeliveryRecord
    {
       Delivery del;
       

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -285,6 +285,10 @@
                   
                   dos.writeInt(cancel.getDeliveryCount());
                   
+                  dos.writeBoolean(cancel.isExpired());
+                  
+                  dos.writeBoolean(cancel.isReachedMaxDeliveryAttempts());
+                  
                   dos.flush();
    
                   if (trace) { log.trace("wrote cancelDelivery()"); }
@@ -308,6 +312,10 @@
                      dos.writeLong(cancel.getDeliveryId());
                      
                      dos.writeInt(cancel.getDeliveryCount());
+                     
+                     dos.writeBoolean(cancel.isExpired());
+                     
+                     dos.writeBoolean(cancel.isReachedMaxDeliveryAttempts());
                   }
    
                   dos.flush();
@@ -723,7 +731,11 @@
                
                int deliveryCount = dis.readInt();
                
-               Object[] args = new Object[] {new DefaultCancel(deliveryId, deliveryCount)};
+               boolean expired = dis.readBoolean();
+               
+               boolean reachedMaxDeliveries = dis.readBoolean();
+               
+               Object[] args = new Object[] {new DefaultCancel(deliveryId, deliveryCount, expired, reachedMaxDeliveries)};
    
                mi.setArguments(args);
    
@@ -749,7 +761,11 @@
                   
                   int deliveryCount = dis.readInt();
                   
-                  DefaultCancel cancel = new DefaultCancel(deliveryId, deliveryCount);
+                  boolean expired = dis.readBoolean();
+                  
+                  boolean reachedMaxDeliveries = dis.readBoolean();
+                  
+                  DefaultCancel cancel = new DefaultCancel(deliveryId, deliveryCount, expired, reachedMaxDeliveries);
                                     
                   acks.add(cancel);
                }

Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -48,7 +48,6 @@
    
    private static final Logger log = Logger.getLogger(ClientTransaction.class);
    
-
    public final static byte TX_OPEN = 0;
    
    public final static byte TX_ENDED = 1;

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -519,54 +519,45 @@
             {
                ref = nextReference(iter);               
             }
+            
             if (ref != null)
             {
-               // Check if message is expired (we also do this on the clientside) If so ack it from the channel
-               if (ref.isExpired())
+               // Attempt to push the ref to a receiver
+               
+               if (trace) { log.trace(this + " pushing " + ref); }                                  
+
+               Delivery del = router.handle(this, ref, null);
+
+               receiversReady = del != null;
+               
+               if (del == null)
                {
-                  expireRef(ref, iter);
+                  // No receiver, broken receiver or full receiver so we stop delivering
+                  if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
+
+                  break;
                }
-               else
+               else if (!del.isSelectorAccepted())
                {
-                  // Reference is not expired
-
-                  // Attempt to push the ref to a receiver
+                  // No receiver accepted the message because no selectors matched, so we create
+                  // an iterator (if we haven't already created it) to iterate through the refs
+                  // in the channel. No delivery was really performed
                   
-                  if (trace) { log.trace(this + " pushing " + ref); }                                  
-
-                  Delivery del = router.handle(this, ref, null);
-
-                  receiversReady = del != null;
-                  
-                  if (del == null)
+                  if (iter == null)
                   {
-                     // No receiver, broken receiver or full receiver so we stop delivering
-                     if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
-
-                     break;
-                  }
-                  else if (!del.isSelectorAccepted())
-                  {
-                     // No receiver accepted the message because no selectors matched, so we create
-                     // an iterator (if we haven't already created it) to iterate through the refs
-                     // in the channel. No delivery was really performed
-                     
-                     if (iter == null)
-                     {
-                        iter = messageRefs.iterator();
-                     }                     
-                  }
-                  else
-                  {
-                     if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
-                     
-                     // Receiver accepted the reference
-                     
-                     removeReference(iter);
-                     
-                     deliveringCount.increment();                     
-                  }
+                     iter = messageRefs.iterator();
+                  }                     
                }
+               else
+               {
+                  if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
+                  
+                  // Receiver accepted the reference
+                  
+                  removeReference(iter);
+                  
+                  deliveringCount.increment();                     
+               }               
             }
             else
             {
@@ -783,28 +774,6 @@
    
    // Private -------------------------------------------------------
    
-   private void expireRef(MessageReference ref, ListIterator iter) throws Exception
-   {
-      if (trace) { log.trace("Message reference: " + ref + " has expired"); }
-
-      // remove and acknowledge it
-      synchronized (refLock)
-      {
-         if (iter == null)
-         {
-            removeFirstInMemory();
-         }
-         else
-         {
-            iter.remove();
-         }
-      }
-
-      Delivery delivery = new SimpleDelivery(this, ref, true);
-
-      acknowledgeInternal(delivery, null, true, false);
-   }
-   
    private void removeReference(ListIterator iter) throws Exception
    {
       synchronized (refLock)

Modified: trunk/src/main/org/jboss/messaging/core/Routable.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Routable.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/messaging/core/Routable.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -72,6 +72,8 @@
    long getExpiration();
 
    boolean isExpired();
+   
+   void setExpiration(long expiration);
 
    /**
     * @return the time (in GMT milliseconds) when this routable was delivered to the provider.

Modified: trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -147,6 +147,11 @@
    {
       return expiration;
    }
+   
+   public void setExpiration(long expiration)
+   {
+      this.expiration = expiration;
+   }
 
    public long getTimestamp()
    {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -31,6 +31,7 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.management.ObjectName;
 import javax.naming.InitialContext;
 import javax.naming.NameNotFoundException;
 
@@ -72,13 +73,30 @@
    {
       if (ServerManagement.isRemote())
       {
-         //This test can only run in local mode
          return;
       }
-
+      
       ServerManagement.deployQueue("DLQ");
+      
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+      
+      ObjectName dlqObjectName = (ObjectName)ServerManagement.getAttribute(serverPeerObjectName, "DefaultDLQ");
+      
+      assertNotNull(dlqObjectName);
+            
+      String name = (String)ServerManagement.getAttribute(dlqObjectName, "Name");
+      
+      assertNotNull(name);
+      
+      assertEquals("DLQ", name);
 
-      org.jboss.messaging.core.Queue dlq = ServerManagement.getServer().getServerPeer().getDLQ();
+      String jndiName = (String)ServerManagement.getAttribute(dlqObjectName, "JNDIName");
+      
+      assertNotNull(jndiName);
+      
+      assertEquals("/queue/DLQ", jndiName);
+      
+      org.jboss.messaging.core.Queue dlq = ServerManagement.getServer().getServerPeer().getDefaultDLQInstance();
 
       assertNotNull(dlq);
 
@@ -98,9 +116,8 @@
       {
          if (ic != null) ic.close();
 
-         log.info("undeploying dlq");
          ServerManagement.undeployQueue("DLQ");
-         log.info("undeployed dlq");
+
       }
    }
 
@@ -108,12 +125,11 @@
    {
       if (ServerManagement.isRemote())
       {
-         //This test can only run in local mode
          return;
       }
+      
+      org.jboss.messaging.core.Queue dlq = ServerManagement.getServer().getServerPeer().getDefaultDLQInstance();
 
-      org.jboss.messaging.core.Queue dlq = ServerManagement.getServer().getServerPeer().getDLQ();
-
       assertNull(dlq);
 
       InitialContext ic = null;
@@ -138,272 +154,517 @@
          if (ic != null) ic.close();
       }
    }
-//
-//   public void testSendToDLQWithMessageListenerPersistent() throws Exception
-//   {
-//      sendToDLQWithMessageListener(true);
-//   }
-//
-//   public void testSendToDLQWithMessageListenerNonPersistent() throws Exception
-//   {
-//      sendToDLQWithMessageListener(false);
-//   }
-//
-//   public void testSendToDLQWithReceivePersistent() throws Exception
-//   {
-//      sendToDLQWithReceive(true);
-//   }
-//
-//   public void testSendToDLQWithReceiveNonPersistent() throws Exception
-//   {
-//      sendToDLQWithReceive(false);
-//   }
-//
-//   public void testSendToDLQWithReceivePartialPersistent() throws Exception
-//   {
-//      sendToDLQWithReceivePartial(true);
-//   }
-//
-//   public void testSendToDLQWithReceivePartialNonPersistent() throws Exception
-//   {
-//      sendToDLQWithReceivePartial(false);
-//   }
-//
-//   public void sendToDLQWithMessageListener(boolean persistent) throws Exception
-//   {
-//      Connection conn = null;
-//
-//      ServerManagement.deployQueue("DLQ");
-//
-//      Queue dlq = (Queue)ic.lookup("/queue/DLQ");
-//
-//      try
-//      {
-//         conn = cf.createConnection();
-//
-//         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-//         MessageProducer prod = sess.createProducer(queue);
-//
-//         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-//
-//         for (int i = 0; i < 10; i++)
-//         {
-//            TextMessage tm = sess.createTextMessage("Message:" + i);
-//
-//            prod.send(tm);
-//         }
-//
-//         MessageConsumer cons = sess.createConsumer(queue);
-//
-//         cons.setMessageListener(new FailingMessageListener());
-//
-//         conn.start();
-//
-//         Thread.sleep(4000);
-//
-//         cons.setMessageListener(null);
-//
-//         Message m = cons.receive(1000);
-//
-//         assertNull(m);
-//
-//         //Message should all be in the dlq - let's check
-//
-//         MessageConsumer cons2 = sess.createConsumer(dlq);
-//
-//         for (int i = 0; i < 10; i++)
-//         {
-//            TextMessage tm = (TextMessage)cons2.receive(1000);
-//
-//            assertNotNull(tm);
-//
-//            assertEquals("Message:" + i, tm.getText());
-//         }
-//
-//      }
-//      finally
-//      {
-//         ServerManagement.undeployQueue("DLQ");
-//
-//         if (conn != null) conn.close();
-//      }
-//   }
-//
-//   public void sendToDLQWithReceive(boolean persistent) throws Exception
-//   {
-//      Connection conn = null;
-//
-//      ServerManagement.deployQueue("DLQ");
-//
-//      Queue dlq = (Queue)ic.lookup("/queue/DLQ");
-//
-//      try
-//      {
-//         conn = cf.createConnection();
-//
-//         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-//         MessageProducer prod = sess.createProducer(queue);
-//
-//         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-//
-//         for (int i = 0; i < 10; i++)
-//         {
-//            TextMessage tm = sess.createTextMessage("Message:" + i);
-//
-//            prod.send(tm);
-//         }
-//
-//         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-//
-//         MessageConsumer cons = sess2.createConsumer(queue);
-//
-//         conn.start();
-//
-//         for (int i = 0; i < 10; i++)  // retries - default is 10
-//         {
-//            for (int j = 0; j < 10; j++)
-//            {
-//               TextMessage tm = (TextMessage)cons.receive(1000);
-//
-//               assertNotNull(tm);
-//
-//               assertEquals("Message:" + j, tm.getText());
-//            }
-//
-//            //rollback should cause redelivery
-//            sess2.rollback();
-//         }
-//
-//         cons.close();
-//
-//         MessageConsumer cons2 = sess2.createConsumer(queue);
-//
-//         Message m = cons2.receive(1000);
-//
-//         assertNull(m);
-//
-//         //Message should all be in the dlq - let's check
-//
-//         MessageConsumer cons3 = sess.createConsumer(dlq);
-//
-//         for (int i = 0; i < 10; i++)
-//         {
-//            TextMessage tm = (TextMessage)cons3.receive(1000);
-//
-//            assertNotNull(tm);
-//
-//            assertEquals("Message:" + i, tm.getText());
-//         }
-//
-//      }
-//      finally
-//      {
-//         ServerManagement.undeployQueue("DLQ");
-//
-//         if (conn != null) conn.close();
-//      }
-//   }
-//
-//   public void sendToDLQWithReceivePartial(boolean persistent) throws Exception
-//   {
-//      Connection conn = null;
-//
-//      ServerManagement.deployQueue("DLQ");
-//
-//      Queue dlq = (Queue)ic.lookup("/queue/DLQ");
-//
-//      try
-//      {
-//         conn = cf.createConnection();
-//
-//         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-//         MessageProducer prod = sess.createProducer(queue);
-//
-//         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-//
-//         for (int i = 0; i < 10; i++)
-//         {
-//            TextMessage tm = sess.createTextMessage("Message:" + i);
-//
-//            prod.send(tm);
-//         }
-//
-//         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-//
-//         MessageConsumer cons = sess2.createConsumer(queue);
-//
-//         conn.start();
-//
-//         for (int i = 0; i < 5; i++)  // retries - default is 10
-//         {
-//            for (int j = 0; j < 10; j++)
-//            {
-//               TextMessage tm = (TextMessage)cons.receive(1000);
-//
-//               assertNotNull(tm);
-//
-//               assertEquals("Message:" + j, tm.getText());
-//            }
-//
-//            //rollback should cause redelivery
-//            sess2.rollback();
-//         }
-//
-//         //They should now be cancelled back to the server
-//         cons.close();
-//
-//         cons = sess2.createConsumer(queue);
-//
-//         for (int i = 0; i < 5; i++)  // retries - default is 10
-//         {
-//            for (int j = 0; j < 10; j++)
-//            {
-//               TextMessage tm = (TextMessage)cons.receive(1000);
-//
-//               assertNotNull(tm);
-//
-//               assertEquals("Message:" + j, tm.getText());
-//            }
-//
-//            //rollback should cause redelivery
-//            sess2.rollback();
-//         }
-//
-//         cons.close();
-//
-//         //Now they should be in DLQ
-//
-//         MessageConsumer cons2 = sess2.createConsumer(queue);
-//
-//         Message m = cons2.receive(1000);
-//
-//         assertNull(m);
-//
-//         //Message should all be in the dlq - let's check
-//
-//         MessageConsumer cons3 = sess.createConsumer(dlq);
-//
-//         for (int i = 0; i < 10; i++)
-//         {
-//            TextMessage tm = (TextMessage)cons3.receive(1000);
-//
-//            assertNotNull(tm);
-//
-//            assertEquals("Message:" + i, tm.getText());
-//         }
-//
-//      }
-//      finally
-//      {
-//         ServerManagement.undeployQueue("DLQ");
-//
-//         if (conn != null) conn.close();
-//      }
-//   }
+   
+   public void testDefaultAndOverrideDLQ() throws Exception
+   {
+      if (ServerManagement.isRemote())
+      {
+         return;
+      }
+      
+      final int NUM_MESSAGES = 5;
+      
+      final int MAX_DELIVERIES = 8;
+      
+      ServerManagement.deployQueue("DefaultDLQ");
+      
+      ServerManagement.deployQueue("OverrideDLQ");
+      
+      ServerManagement.deployQueue("TestQueue");
+      
+      String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=DefaultDLQ";
+      
+      String overrideDLQObjectName = "jboss.messaging.destination:service=Queue,name=OverrideDLQ";
+      
+      String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=TestQueue";
+      
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+      
+      ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+            
+      ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+      
+      ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+      
+      Queue testQueue = (Queue)ic.lookup("/queue/TestQueue");
+      
+      Queue defaultDLQ = (Queue)ic.lookup("/queue/DefaultDLQ");
+      
+      Queue overrideDLQ = (Queue)ic.lookup("/queue/OverrideDLQ");
+      
+      drainDestination(cf, testQueue);
+            
+      drainDestination(cf, defaultDLQ);
+            
+      drainDestination(cf, overrideDLQ);
+            
+      Connection conn = null;
+      
+      try
+      {      
+         conn = cf.createConnection();
+         
+         {         
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+   
+            MessageProducer prod = sess.createProducer(testQueue);
+   
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = sess.createTextMessage("Message:" + i);
+   
+               prod.send(tm);
+            }
+   
+            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            
+            MessageConsumer cons = sess2.createConsumer(testQueue);
+            
+            conn.start();
+   
+            for (int i = 0; i < MAX_DELIVERIES; i++) 
+            {
+               for (int j = 0; j < NUM_MESSAGES; j++)
+               {
+                  TextMessage tm = (TextMessage)cons.receive(1000);
+   
+                  assertNotNull(tm);
+   
+                  assertEquals("Message:" + j, tm.getText());
+               }
+   
+               sess2.recover();
+            }
+            
+            //At this point all the messages have been delivered exactly MAX_DELIVERIES times 
+            
+            Message m = cons.receive(1000);
+            
+            assertNull(m);
+            
+            //Now should be in default dlq
+            
+            MessageConsumer cons3 = sess.createConsumer(defaultDLQ);
+            
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = (TextMessage)cons3.receive(1000);
+   
+               assertNotNull(tm);
+   
+               assertEquals("Message:" + i, tm.getText());
+            }
+            
+            conn.close();
+         }
+         
+         
+         {
+            //Now try with overriding the default dlq
+            
+            conn = cf.createConnection();
+            
+            log.info("I am setting dlq with " + overrideDLQObjectName);
+            ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", overrideDLQObjectName);
+            
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+   
+            MessageProducer prod = sess.createProducer(testQueue);
+   
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = sess.createTextMessage("Message:" + i);
+   
+               prod.send(tm);
+            }
+   
+            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            
+            MessageConsumer cons = sess2.createConsumer(testQueue);
+            
+            conn.start();
+   
+            for (int i = 0; i < MAX_DELIVERIES; i++) 
+            {
+               for (int j = 0; j < NUM_MESSAGES; j++)
+               {
+                  TextMessage tm = (TextMessage)cons.receive(1000);
+   
+                  assertNotNull(tm);
+   
+                  assertEquals("Message:" + j, tm.getText());
+               }
+   
+               sess2.recover();
+            }
+            
+            //At this point all the messages have been delivered exactly MAX_DELIVERIES times 
+            
+            Message m = cons.receive(1000);
+            
+            assertNull(m);
+            
+            //Now should be in override dlq
+            
+            MessageConsumer cons3 = sess.createConsumer(overrideDLQ);
+            
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = (TextMessage)cons3.receive(1000);
+   
+               assertNotNull(tm);
+   
+               assertEquals("Message:" + i, tm.getText());
+            }
+         }
+      }
+      finally
+      {
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=DLQ");
+                  
+         ServerManagement.undeployQueue("DefaultDLQ");
+         
+         ServerManagement.undeployQueue("OverrideDLQ");
+         
+         ServerManagement.undeployQueue("TestQueue");
+         
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+            
 
+   public void testWithMessageListenerPersistent() throws Exception
+   {
+      testWithMessageListener(true);
+   }
+
+   public void testWithMessageListenerNonPersistent() throws Exception
+   {
+      testWithMessageListener(false);
+   }
+
+   public void testWithReceiveClientAckPersistent() throws Exception
+   {
+      this.testWithReceiveClientAck(true);
+   }
+
+   public void testWithReceiveClientAckNonPersistent() throws Exception
+   {
+      testWithReceiveClientAck(false);
+   }
+   
+   public void testWithReceiveTransactionalPersistent() throws Exception
+   {
+      this.testWithReceiveTransactional(true);
+   }
+
+   public void testWithReceiveTransactionalNonPersistent() throws Exception
+   {
+      testWithReceiveTransactional(false);
+   }
+
+   public void testWithMessageListener(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+
+      ServerManagement.deployQueue("DLQ");
+
+      Queue dlq = (Queue)ic.lookup("/queue/DLQ");
+      
+      drainDestination(cf, dlq);
+      
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+      
+      final int MAX_DELIVERIES = 16;
+      
+      final int NUM_MESSAGES = 5;
+      
+      ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+      
+      int maxRedeliveryAttempts =
+         ((Integer)ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+      
+      assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+      
+      try
+      {
+         conn = cf.createConnection();
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(queue);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+
+            prod.send(tm);
+         }
+
+         MessageConsumer cons = sess.createConsumer(queue);
+         
+         FailingMessageListener listener  = new FailingMessageListener();
+
+         cons.setMessageListener(listener);
+
+         conn.start();
+
+         Thread.sleep(4000);
+
+         cons.setMessageListener(null);
+         
+         assertEquals(MAX_DELIVERIES * NUM_MESSAGES, listener.deliveryCount);
+
+         Message m = cons.receive(1000);
+
+         assertNull(m);
+
+         //Message should all be in the dlq - let's check
+
+         MessageConsumer cons2 = sess.createConsumer(dlq);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message:" + i, tm.getText());
+         }
+
+      }
+      finally
+      {
+         ServerManagement.undeployQueue("DLQ");
+
+         if (conn != null) conn.close();
+      }
+   }
+   
+
+   public void testWithReceiveClientAck(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+
+      ServerManagement.deployQueue("DLQ");
+
+      Queue dlq = (Queue)ic.lookup("/queue/DLQ");
+      
+      drainDestination(cf, dlq);
+      
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+      
+      final int MAX_DELIVERIES = 16;
+      
+      final int NUM_MESSAGES = 5;      
+         
+      ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+      
+      int maxRedeliveryAttempts =
+         ((Integer)ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+      
+      assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+      try
+      {
+         conn = cf.createConnection();
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(queue);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+    
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+
+            prod.send(tm);
+         }
+
+         Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         MessageConsumer cons = sess2.createConsumer(queue);
+
+         conn.start();
+
+         for (int i = 0; i < MAX_DELIVERIES; i++) 
+         {
+            for (int j = 0; j < NUM_MESSAGES; j++)
+            {
+               TextMessage tm = (TextMessage)cons.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("Message:" + j, tm.getText());
+            }
+
+            sess2.recover();
+         }
+         
+         //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
+         //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
+         
+         MessageConsumer cons3 = sess.createConsumer(dlq);
+         
+         Message m = cons3.receive(1000);
+         
+         assertNull(m);
+         
+         //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
+         //delivery attempts
+         
+         m = cons.receive(1000);
+         
+         assertNull(m);
+         
+         //Now, all the messages should now be in the DLQ
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+            
+            assertNotNull(tm);
+
+            assertEquals("Message:" + i, tm.getText());
+         }
+
+         //No more should be available
+         
+         m = cons.receive(1000);
+         
+         assertNull(m);
+         
+         cons.close();
+
+         MessageConsumer cons2 = sess2.createConsumer(queue);
+
+         m = cons2.receive(1000);
+
+         assertNull(m);
+      }
+      finally
+      {
+         ServerManagement.undeployQueue("DLQ");
+
+         if (conn != null) conn.close();
+      }
+   }
+   
+   public void testWithReceiveTransactional(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+
+      ServerManagement.deployQueue("DLQ");
+
+      Queue dlq = (Queue)ic.lookup("/queue/DLQ");
+      
+      drainDestination(cf, dlq);
+      
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+      
+      final int MAX_DELIVERIES = 16;
+      
+      final int NUM_MESSAGES = 5;      
+        
+      ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+      
+      int maxRedeliveryAttempts =
+         ((Integer)ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+      
+      assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+      try
+      {
+         conn = cf.createConnection();
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(queue);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+    
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+
+            prod.send(tm);
+         }
+
+         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageConsumer cons = sess2.createConsumer(queue);
+
+         conn.start();
+
+         for (int i = 0; i < MAX_DELIVERIES; i++) 
+         {
+            for (int j = 0; j < NUM_MESSAGES; j++)
+            {
+               TextMessage tm = (TextMessage)cons.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("Message:" + j, tm.getText());
+            }
+
+            sess2.rollback();
+         }
+         
+         //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
+         //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
+         
+         MessageConsumer cons3 = sess.createConsumer(dlq);
+         
+         Message m = cons3.receive(1000);
+         
+         assertNull(m);
+         
+         //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
+         //delivery attempts
+         m = cons.receive(1000);
+         
+         assertNull(m);
+         
+         //All the messages should now be in the DLQ
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+            
+            assertNotNull(tm);
+
+            assertEquals("Message:" + i, tm.getText());
+         }
+
+         //No more should be available
+         
+         m = cons.receive(1000);
+         
+         assertNull(m);
+         
+         cons.close();
+
+         MessageConsumer cons2 = sess2.createConsumer(queue);
+
+         m = cons2.receive(1000);
+
+         assertNull(m);
+      }
+      finally
+      {
+         ServerManagement.undeployQueue("DLQ");
+
+         if (conn != null) conn.close();
+      }
+   }
+
+      
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -439,9 +700,12 @@
    
    class FailingMessageListener implements MessageListener
    {
+      volatile int deliveryCount;
 
       public void onMessage(Message msg)
       {
+         deliveryCount++;
+         
          throw new RuntimeException("Your mum!");
       }
       

Added: trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -0,0 +1,631 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+
+import org.jboss.jms.destination.JBossQueue;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * 
+ * A ExpiryQueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ExpiryQueueTest extends MessagingTestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   protected InitialContext ic;
+   protected ConnectionFactory cf;
+   protected Queue queue;
+   protected Topic topic;
+
+   // Constructors --------------------------------------------------
+
+   public ExpiryQueueTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void testExpiryQueueAlreadyDeployed() throws Exception
+   {
+      if (ServerManagement.isRemote())
+      {
+         return;
+      }
+      ServerManagement.deployQueue("ExpiryQueue");
+      
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+      
+      ObjectName expiryQueueObjectName = (ObjectName)ServerManagement.getAttribute(serverPeerObjectName, "DefaultExpiryQueue");
+      
+      assertNotNull(expiryQueueObjectName);
+            
+      String name = (String)ServerManagement.getAttribute(expiryQueueObjectName, "Name");
+      
+      assertNotNull(name);
+      
+      assertEquals("ExpiryQueue", name);
+
+      String jndiName = (String)ServerManagement.getAttribute(expiryQueueObjectName, "JNDIName");
+      
+      assertNotNull(jndiName);
+      
+      assertEquals("/queue/ExpiryQueue", jndiName);
+      
+      org.jboss.messaging.core.Queue expiryQueue = ServerManagement.getServer().getServerPeer().getDefaultExpiryQueueInstance();
+
+      assertNotNull(expiryQueue);
+
+      InitialContext ic = null;
+
+      try
+      {
+         ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+         JBossQueue q = (JBossQueue)ic.lookup("/queue/ExpiryQueue");
+
+         assertNotNull(q);
+
+         assertEquals("ExpiryQueue", q.getName());
+      }
+      finally
+      {
+         if (ic != null) ic.close();
+
+         ServerManagement.undeployQueue("ExpiryQueue");
+
+      }
+   }
+
+   public void testExpiryQueueNotAlreadyDeployed() throws Exception
+   {
+      if (ServerManagement.isRemote())
+      {
+         return;
+      }
+      
+      org.jboss.messaging.core.Queue expiryQueue = ServerManagement.getServer().getServerPeer().getDefaultExpiryQueueInstance();
+
+      assertNull(expiryQueue);
+
+      InitialContext ic = null;
+
+      try
+      {
+         ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+         try
+         {
+            ic.lookup("/queue/ExpiryQueue");
+
+            fail();
+         }
+         catch (NameNotFoundException e)
+         {
+            //Ok
+         }
+      }
+      finally
+      {
+         if (ic != null) ic.close();
+      }
+   }
+   
+   public void testDefaultAndOverrideExpiryQueue() throws Exception
+   {
+      final int NUM_MESSAGES = 5;
+      
+      ServerManagement.deployQueue("DefaultExpiry");
+      
+      ServerManagement.deployQueue("OverrideExpiry");
+      
+      ServerManagement.deployQueue("TestQueue");
+      
+      String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=DefaultExpiry";
+      
+      String overrideExpiryObjectName = "jboss.messaging.destination:service=Queue,name=OverrideExpiry";
+      
+      String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=TestQueue";
+      
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+          
+      ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
+      
+      ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", "");
+      
+      Queue testQueue = (Queue)ic.lookup("/queue/TestQueue");
+      
+      Queue defaultExpiry = (Queue)ic.lookup("/queue/DefaultExpiry");
+      
+      Queue overrideExpiry = (Queue)ic.lookup("/queue/OverrideExpiry");
+      
+      drainDestination(cf, testQueue);
+            
+      drainDestination(cf, defaultExpiry);
+            
+      drainDestination(cf, overrideExpiry);
+            
+      Connection conn = null;
+      
+      try
+      {      
+         conn = cf.createConnection();
+         
+         {         
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+   
+            MessageProducer prod = sess.createProducer(testQueue);
+            
+            conn.start();
+   
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = sess.createTextMessage("Message:" + i);
+   
+               //Send messages with time to live of 2000 enough time to get to client consumer - so 
+               //they won't be expired on the server side
+               prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
+            }
+            
+            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                        
+            MessageConsumer cons = sess2.createConsumer(testQueue);
+            
+            //The messages should now be sitting in the consumer buffer
+            
+            //Now give them enough time to expire
+            
+            Thread.sleep(2500);
+            
+            //Now try and receive
+            
+            Message m = cons.receive(1000);
+
+            assertNull(m);
+
+            //Message should all be in the default expiry queue - let's check
+                        
+            MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
+            
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = (TextMessage)cons3.receive(1000);
+   
+               assertNotNull(tm);
+   
+               assertEquals("Message:" + i, tm.getText());
+            }
+            
+            conn.close();
+         }
+         
+         
+         //now try with overriding the default expiry queue
+         {         
+            ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", overrideExpiryObjectName);
+            
+            conn = cf.createConnection();
+            
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+   
+            MessageProducer prod = sess.createProducer(testQueue);
+            
+            conn.start();
+   
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = sess.createTextMessage("Message:" + i);
+   
+               //Send messages with time to live of 2000 enough time to get to client consumer - so 
+               //they won't be expired on the server side
+               prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
+            }
+            
+            Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                        
+            MessageConsumer cons = sess2.createConsumer(testQueue);
+            
+            //The messages should now be sitting in the consumer buffer
+            
+            //Now give them enough time to expire
+            
+            Thread.sleep(2500);
+            
+            //Now try and receive
+            
+            Message m = cons.receive(1000);
+
+            assertNull(m);
+
+            //Message should all be in the default expiry queue - let's check
+                        
+            MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
+            
+            for (int i = 0; i < NUM_MESSAGES; i++)
+            {
+               TextMessage tm = (TextMessage)cons3.receive(1000);
+   
+               assertNotNull(tm);
+   
+               assertEquals("Message:" + i, tm.getText());
+            }
+         }
+      }
+      finally
+      {
+         ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
+                  
+         ServerManagement.undeployQueue("DefaultDLQ");
+         
+         ServerManagement.undeployQueue("OverrideDLQ");
+         
+         ServerManagement.undeployQueue("TestQueue");
+         
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   public void testExpireSameMessagesMultiple() throws Exception
+   {
+      final int NUM_MESSAGES = 5;
+      
+      ServerManagement.deployQueue("ExpiryQueue");
+      
+      String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=ExpiryQueue";
+         
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+          
+      ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
+       
+      Queue defaultExpiry = (Queue)ic.lookup("/queue/ExpiryQueue");
+      
+      drainDestination(cf, defaultExpiry);
+                  
+      Connection conn = null;
+      
+      try
+      {      
+         conn = cf.createConnection();
+         
+         conn.setClientID("wib1");
+                        
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+   
+         MessageProducer prod = sess.createProducer(topic);
+            
+         conn.start();
+         
+         //Create 3 durable subscriptions
+         
+         MessageConsumer sub1 = sess.createDurableSubscriber(topic, "sub1");
+         
+         MessageConsumer sub2 = sess.createDurableSubscriber(topic, "sub2");
+         
+         MessageConsumer sub3 = sess.createDurableSubscriber(topic, "sub3");
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+
+            //Send messages with time to live of 3000 enough time to get to client consumer - so 
+            //they won't be expired on the server side
+            prod.send(tm, DeliveryMode.PERSISTENT, 4, 3000);
+         }
+         
+         //Now sleep. This wil give them enough time to expire
+         
+         Thread.sleep(3500);
+         
+         //Now try and consume from each - this should force the message to the expiry queue
+         
+         Message m = sub1.receive(500);
+         assertNull(m);
+         
+         m = sub2.receive(500);
+         assertNull(m);
+         
+         m = sub3.receive(500);
+         assertNull(m);
+         
+         //Now the messages should all be in the expiry queue
+         
+         MessageConsumer cons2 = sess.createConsumer(defaultExpiry);
+         
+         while (true)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(500);
+            
+            if (tm == null)
+            {
+               break;
+            }
+            
+            log.info("Got message: " + tm.getText());
+         }
+            
+      }
+      finally
+      {        
+         ServerManagement.undeployQueue("ExpiryQueue");
+         
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   public void testWithMessageListenerPersistent() throws Exception
+   {
+      testWithMessageListener(true);
+   }
+
+   public void testWithMessageListenerNonPersistent() throws Exception
+   {
+      testWithMessageListener(false);
+   }
+
+   public void testWithReceivePersistent() throws Exception
+   {
+      this.testWithReceive(true);
+   }
+
+   public void testWithReceiveNonPersistent() throws Exception
+   {
+      testWithReceive(false);
+   }   
+
+   public void testWithMessageListener(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+
+      ServerManagement.deployQueue("ExpiryQueue");
+
+      Queue expiryQueue = (Queue)ic.lookup("/queue/ExpiryQueue");
+      
+      drainDestination(cf, expiryQueue);
+       
+      final int NUM_MESSAGES = 5;
+
+      try
+      {
+         conn = cf.createConnection();
+         
+         conn.start();
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(queue);
+
+         int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+                        
+            //Send messages with time to live of 2000 enough time to get to client consumer - so 
+            //they won't be expired on the server side
+            prod.send(tm, deliveryMode, 4, 2000);
+         }
+
+         MessageConsumer cons = sess.createConsumer(queue);
+         
+         //The messages should now be sitting in the consumer buffer
+         
+         //Now give them enough time to expire
+         
+         Thread.sleep(2500);
+         
+         //Now set a listener
+         
+         FailingMessageListener listener  = new FailingMessageListener();
+
+         cons.setMessageListener(listener);
+         
+         Thread.sleep(1000);
+
+         cons.setMessageListener(null);
+         
+         //No messages should have been received
+         assertEquals(0, listener.deliveryCount);
+                  
+         //Shouldn't be able to receive any more
+         
+         Message m = cons.receive(1000);
+
+         assertNull(m);
+
+         //Message should all be in the expiry queue - let's check
+         
+         MessageConsumer cons2 = sess.createConsumer(expiryQueue);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+            
+            assertNotNull(tm);
+
+            assertEquals("Message:" + i, tm.getText());
+         }
+
+      }
+      finally
+      {
+         ServerManagement.undeployQueue("ExpiryQueue");
+
+         if (conn != null) conn.close();
+      }
+   }
+   
+   public void testWithReceive(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+
+      ServerManagement.deployQueue("ExpiryQueue");
+
+      Queue expiryQueue = (Queue)ic.lookup("/queue/ExpiryQueue");
+      
+      drainDestination(cf, expiryQueue);
+       
+      final int NUM_MESSAGES = 5;
+
+      try
+      {
+         conn = cf.createConnection();
+         
+         conn.start();
+
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer prod = sess.createProducer(queue);
+
+         int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+                        
+            //Send messages with time to live of 2000 enough time to get to client consumer - so 
+            //they won't be expired on the server side
+            prod.send(tm, deliveryMode, 4, 2000);
+         }
+
+         MessageConsumer cons = sess.createConsumer(queue);
+         
+         //The messages should now be sitting in the consumer buffer
+         
+         //Now give them enough time to expire
+         
+         Thread.sleep(2500);
+         
+         //Now try and receive
+         
+         Message m = cons.receive(1000);
+
+         assertNull(m);
+
+         //Message should all be in the expiry queue - let's check
+         
+         MessageConsumer cons2 = sess.createConsumer(expiryQueue);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+            
+            assertNotNull(tm);
+
+            assertEquals("Message:" + i, tm.getText());
+         }
+
+      }
+      finally
+      {
+         ServerManagement.undeployQueue("ExpiryQueue");
+
+         if (conn != null) conn.close();
+      }
+   }
+
+      
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      ServerManagement.start("all");
+
+      ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+      cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+      ServerManagement.deployQueue("Queue");
+      
+      ServerManagement.deployTopic("Topic");
+
+      queue = (Queue)ic.lookup("/queue/Queue");
+      
+      topic = (Topic)ic.lookup("/topic/Topic");
+
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+
+      ServerManagement.undeployQueue("Queue");
+      
+      ServerManagement.undeployTopic("Topic");
+
+      if (ic != null) ic.close();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+   
+   class FailingMessageListener implements MessageListener
+   {
+      volatile int deliveryCount;
+
+      public void onMessage(Message msg)
+      {
+         deliveryCount++;
+         
+         throw new RuntimeException("Your mum!");
+      }
+      
+   }
+
+}

Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -3055,6 +3055,7 @@
       Session sess1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
       MessageConsumer cons1 = sess1.createConsumer(queue);
       TextMessage tm2 = (TextMessage)cons1.receive(3000);
+      
       assertNotNull(tm2);
       assertEquals("testRedeliveredDifferentSessions", tm2.getText());
 
@@ -3064,6 +3065,7 @@
       Session sess2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
       MessageConsumer cons2 = sess2.createConsumer(queue);
       TextMessage tm3 = (TextMessage)cons2.receive(3000);
+      
       assertNotNull(tm3);
       assertEquals("testRedeliveredDifferentSessions", tm3.getText());
       

Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-12-31 09:49:46 UTC (rev 1879)
@@ -486,7 +486,7 @@
          
          int deliveryCount = 12;
          
-         Cancel cancel = new DefaultCancel(deliveryID, deliveryCount);
+         Cancel cancel = new DefaultCancel(deliveryID, deliveryCount, true, true);
          
          Object[] args = new Object[] { cancel };
          
@@ -530,9 +530,17 @@
          //Then delivery count
          int count = dis.readInt();
          
+         boolean expired = dis.readBoolean();
+         
+         boolean reachedMaxDeliveries = dis.readBoolean();
+         
          assertEquals(deliveryID, l);
          
          assertEquals(deliveryCount, count);
+         
+         assertEquals(expired, true);
+         
+         assertEquals(reachedMaxDeliveries, true);
 
          //Now eos
          try
@@ -567,6 +575,10 @@
          
          assertEquals(deliveryCount, l2.getDeliveryCount());
          
+         assertEquals(expired, l2.isExpired());
+         
+         assertEquals(reachedMaxDeliveries, l2.isReachedMaxDeliveryAttempts());
+         
       }
       
       public void testCancelDeliveries() throws Exception
@@ -577,8 +589,8 @@
          
          List cancels = new ArrayList();
          
-         DefaultCancel cancel1 = new DefaultCancel(65654, 43);
-         DefaultCancel cancel2 = new DefaultCancel(65765, 2);
+         DefaultCancel cancel1 = new DefaultCancel(65654, 43, true, false);
+         DefaultCancel cancel2 = new DefaultCancel(65765, 2, false, true);
          cancels.add(cancel1);
          cancels.add(cancel2);
          
@@ -629,19 +641,31 @@
          //then the AckInfos
          long deliveryId = dis.readLong();
          int deliveryCount = dis.readInt();
-         DefaultCancel rcancel1 = new DefaultCancel(deliveryId, deliveryCount);
+         boolean expired = dis.readBoolean();
+         boolean reachedMaxDeliveries = dis.readBoolean();
+         DefaultCancel rcancel1 = new DefaultCancel(deliveryId, deliveryCount, expired, reachedMaxDeliveries);
          
          deliveryId = dis.readLong();
          deliveryCount = dis.readInt();
-         DefaultCancel rcancel2 = new DefaultCancel(deliveryId, deliveryCount);
+         expired = dis.readBoolean();
+         reachedMaxDeliveries = dis.readBoolean();
+         DefaultCancel rcancel2 = new DefaultCancel(deliveryId, deliveryCount, expired, reachedMaxDeliveries);
 
          assertEquals(cancel1.getDeliveryCount(), rcancel1.getDeliveryCount());
          
          assertEquals(cancel1.getDeliveryId(), cancel1.getDeliveryId());
          
+         assertEquals(cancel1.isExpired(), cancel1.isExpired());
+         
+         assertEquals(cancel1.isReachedMaxDeliveryAttempts(), cancel1.isReachedMaxDeliveryAttempts());
+         
          assertEquals(cancel2.getDeliveryCount(), rcancel2.getDeliveryCount());
          
          assertEquals(cancel2.getDeliveryId(), cancel2.getDeliveryId());
+         
+         assertEquals(cancel2.isExpired(), cancel2.isExpired());
+         
+         assertEquals(cancel2.isReachedMaxDeliveryAttempts(), cancel2.isReachedMaxDeliveryAttempts());
                    
          //should be eos
                 




More information about the jboss-cvs-commits mailing list