[jboss-cvs] JBoss Messaging SVN: r1879 - in trunk: src/etc src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate src/main/org/jboss/jms/server src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/message tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Dec 31 04:50:12 EST 2006
Author: timfox
Date: 2006-12-31 04:49:46 -0500 (Sun, 31 Dec 2006)
New Revision: 1879
Added:
trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
Removed:
trunk/src/main/org/jboss/jms/client/container/ReceiverAspect.java
trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java
Modified:
trunk/src/etc/aop-messaging-client.xml
trunk/src/etc/server/default/deploy/destinations-service.xml
trunk/src/etc/server/default/deploy/messaging-service.xml
trunk/src/etc/xmdesc/Queue-xmbean.xml
trunk/src/etc/xmdesc/ServerPeer-xmbean.xml
trunk/src/etc/xmdesc/Topic-xmbean.xml
trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java
trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java
trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java
trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/Routable.java
trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java
trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java
trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
Expiry Queue functionality and now both expiry queue and dlq can be specified on a per queue basis
Modified: trunk/src/etc/aop-messaging-client.xml
===================================================================
--- trunk/src/etc/aop-messaging-client.xml 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/aop-messaging-client.xml 2006-12-31 09:49:46 UTC (rev 1879)
@@ -10,11 +10,9 @@
<interceptor class="org.jboss.jms.client.container.ConcurrencyInterceptor" scope="PER_INSTANCE"/>
<aspect class="org.jboss.jms.client.container.StateCreationAspect" scope="PER_VM"/>
- <aspect class="org.jboss.jms.client.container.ReceiverAspect" scope="PER_VM"/>
<aspect class="org.jboss.jms.client.container.ConsumerAspect" scope="PER_VM"/>
<aspect class="org.jboss.jms.client.container.ProducerAspect" scope="PER_INSTANCE"/>
<aspect class="org.jboss.jms.client.container.SessionAspect" scope="PER_VM"/>
- <aspect class="org.jboss.jms.client.container.TransactionAspect" scope="PER_VM"/>
<aspect class="org.jboss.jms.client.container.AsfAspect" scope="PER_INSTANCE"/>
<aspect class="org.jboss.jms.client.container.BrowserAspect" scope="PER_INSTANCE"/>
<aspect class="org.jboss.jms.client.container.ConnectionAspect" scope="PER_INSTANCE"/>
@@ -141,20 +139,14 @@
<advice name="handleClose" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->commit())">
- <advice name="handleCommit" aspect="org.jboss.jms.client.container.TransactionAspect"/>
+ <advice name="handleCommit" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->rollback())">
- <advice name="handleRollback" aspect="org.jboss.jms.client.container.TransactionAspect"/>
+ <advice name="handleRollback" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->preDeliver(..))">
- <advice name="handlePreDeliver" aspect="org.jboss.jms.client.container.TransactionAspect"/>
- </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->send(..))">
- <advice name="handleSend" aspect="org.jboss.jms.client.container.TransactionAspect"/>
+ <advice name="handleSend" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->close())">
- <advice name="handleClose" aspect="org.jboss.jms.client.container.TransactionAspect"/>
- </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->createConsumerDelegate(..))">
<advice name="handleCreateConsumerDelegate" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
@@ -181,16 +173,16 @@
<interceptor-ref name="org.jboss.jms.client.container.ClosedInterceptor"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->getMessageListener())">
- <advice name="handleGetMessageListener" aspect="org.jboss.jms.client.container.ReceiverAspect"/>
+ <advice name="handleGetMessageListener" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->setMessageListener(..))">
- <advice name="handleSetMessageListener" aspect="org.jboss.jms.client.container.ReceiverAspect"/>
+ <advice name="handleSetMessageListener" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->receive(..))">
- <advice name="handleReceive" aspect="org.jboss.jms.client.container.ReceiverAspect"/>
+ <advice name="handleReceive" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->receiveNoWait())">
- <advice name="handleReceiveNoWait" aspect="org.jboss.jms.client.container.ReceiverAspect"/>
+ <advice name="handleReceiveNoWait" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
</bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientConsumerDelegate->closing())">
<advice name="handleClosing" aspect="org.jboss.jms.client.container.ConsumerAspect"/>
Modified: trunk/src/etc/server/default/deploy/destinations-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/destinations-service.xml 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/server/default/deploy/destinations-service.xml 2006-12-31 09:49:46 UTC (rev 1879)
@@ -9,7 +9,7 @@
<server>
<!--
- The Dead Letter Queue. This destination is a dependency of an EJB MDB container.
+ The Default Dead Letter Queue. This destination is a dependency of an EJB MDB container.
-->
<mbean code="org.jboss.jms.server.destination.QueueService"
@@ -18,7 +18,18 @@
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
</mbean>
+
+ <!--
+ The Default Expiry Queue.
+ -->
+ <mbean code="org.jboss.jms.server.destination.QueueService"
+ name="jboss.messaging.destination:service=Queue,name=ExpiryQueue"
+ xmbean-dd="xmdesc/Queue-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=PostOffice</depends>
+ </mbean>
+
<!--
Example destinations.
-->
@@ -112,6 +123,41 @@
<depends>jboss.messaging:service=PostOffice</depends>
</mbean>
+ <!-- It's possible for indiviual queues and topics to use a specific queue for
+ an expiry or DLQ -->
+
+ <mbean code="org.jboss.jms.server.destination.QueueService"
+ name="jboss.messaging.destination:service=Queue,name=PrivateDLQ"
+ xmbean-dd="xmdesc/Queue-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=PostOffice</depends>
+ </mbean>
+
+ <mbean code="org.jboss.jms.server.destination.QueueService"
+ name="jboss.messaging.destination:service=Queue,name=PrivateExpiryQueue"
+ xmbean-dd="xmdesc/Queue-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=PostOffice</depends>
+ </mbean>
+
+ <mbean code="org.jboss.jms.server.destination.QueueService"
+ name="jboss.messaging.destination:service=Queue,name=QueueWithOwnDLQAndExpiryQueue"
+ xmbean-dd="xmdesc/Queue-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=PostOffice</depends>
+ <attribute name="DLQ">jboss.messaging.destination:service=Queue,name=PrivateDLQ</attribute>
+ <attribute name="ExpiryQueue">jboss.messaging.destination:service=Queue,name=PrivateExpiryQueue</attribute>
+ </mbean>
+
+ <mbean code="org.jboss.jms.server.destination.TopicService"
+ name="jboss.messaging.destination:service=Topic,name=TopicWithOwnDLQAndExpiryQueue"
+ xmbean-dd="xmdesc/Topic-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ <depends>jboss.messaging:service=PostOffice</depends>
+ <attribute name="DLQ">jboss.messaging.destination:service=Queue,name=PrivateDLQ</attribute>
+ <attribute name="ExpiryQueue">jboss.messaging.destination:service=Queue,name=PrivateExpiryQueue</attribute>
+ </mbean>
+
<!--
Example clustered destinations.
-->
Modified: trunk/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/messaging-service.xml 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/server/default/deploy/messaging-service.xml 2006-12-31 09:49:46 UTC (rev 1879)
@@ -22,20 +22,29 @@
</constructor>
<depends optional-attribute-name="PersistenceManager">jboss.messaging:service=PersistenceManager</depends>
+
<depends optional-attribute-name="JMSUserManager">jboss.messaging:service=JMSUserManager</depends>
+
<depends>jboss.messaging:service=Connector,transport=socket</depends>
<attribute name="PostOffice">jboss.messaging:service=PostOffice</attribute>
<attribute name="SecurityDomain">java:/jaas/messaging</attribute>
+
<attribute name="DefaultSecurityConfig">
<security>
<role name="guest" read="true" write="true" create="true"/>
</security>
</attribute>
- <attribute name="MaxDeliveryAttempts">10</attribute>
- <attribute name="DLQName">DLQ</attribute>
+
+ <attribute name="DefaultDLQ">jboss.messaging.destination:service=Queue,name=DLQ</attribute>
+
+ <attribute name="DefaultMaxDeliveryAttempts">10</attribute>
+
+ <attribute name="DefaultExpiryQueue">jboss.messaging.destination:service=Queue,name=ExpiryQueue</attribute>
+
<attribute name="FailoverStartTimeout">3000</attribute>
+
<attribute name="FailoverCompleteTimeout">12000</attribute>
</mbean>
Modified: trunk/src/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/Queue-xmbean.xml 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/xmdesc/Queue-xmbean.xml 2006-12-31 09:49:46 UTC (rev 1879)
@@ -59,6 +59,18 @@
<name>ServerPeer</name>
<type>javax.management.ObjectName</type>
</attribute>
+
+ <attribute access="read-write" getMethod="getDLQ" setMethod="setDLQ">
+ <description>The DLQ for this queue, overrides the default DLQ on the server peer</description>
+ <name>DLQ</name>
+ <type>javax.management.ObjectName</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getExpiryQueue" setMethod="setExpiryQueue">
+ <description>The expiry queue for this queue, overrides the default expiry queue on the server peer</description>
+ <name>ExpiryQueue</name>
+ <type>javax.management.ObjectName</type>
+ </attribute>
<attribute access="read-write" getMethod="getSecurityConfig" setMethod="setSecurityConfig">
<description>The destination's security configuration</description>
@@ -102,6 +114,14 @@
<type>boolean</type>
</attribute>
+ <!-- instance access -->
+
+ <attribute access="read-only" getMethod="getInstance">
+ <description>The instance to be accessed by select plugins via a typed hard reference</description>
+ <name>Instance</name>
+ <type>org.jboss.messaging.core.plugin.contract.MessagingComponent</type>
+ </attribute>
+
<!-- Managed operations -->
<operation>
Modified: trunk/src/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ServerPeer-xmbean.xml 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/xmdesc/ServerPeer-xmbean.xml 2006-12-31 09:49:46 UTC (rev 1879)
@@ -46,6 +46,18 @@
<type>javax.management.ObjectName</type>
</attribute>
+ <attribute access="read-write" getMethod="getDefaultDLQ" setMethod="setDefaultDLQ">
+ <description>The ObjectName of the default DLQ - used when an individual queue does not specify a DLQ</description>
+ <name>DefaultDLQ</name>
+ <type>javax.management.ObjectName</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getDefaultExpiryQueue" setMethod="setDefaultExpiryQueue">
+ <description>The ObjectName of the default expiry queue - used when an individual queue does not specify an expiry queue</description>
+ <name>DefaultExpiryQueue</name>
+ <type>javax.management.ObjectName</type>
+ </attribute>
+
<!-- instance access -->
<attribute access="read-only" getMethod="getInstance">
@@ -139,19 +151,13 @@
<name>QueuedExecutorPoolSize</name>
<type>int</type>
</attribute>
-
- <attribute access="read-write" getMethod="getMaxDeliveryAttempts" setMethod="setMaxDeliveryAttempts">
- <description>The maximum delivery attempts for destinations</description>
- <name>MaxDeliveryAttempts</name>
+
+ <attribute access="read-write" getMethod="getDefaultMaxDeliveryAttempts" setMethod="setDefaultMaxDeliveryAttempts">
+ <description>The default maximum delivery attempts for destinations, unless overridden on the destination</description>
+ <name>DefaultMaxDeliveryAttempts</name>
<type>int</type>
</attribute>
- <attribute access="read-write" getMethod="getDLQName" setMethod="setDLQName">
- <description>The JNDI name of the DLQ</description>
- <name>DLQName</name>
- <type>java.lang.String</type>
- </attribute>
-
<attribute access="read-write" getMethod="getFailoverStartTimeout" setMethod="setFailoverStartTimeout">
<description>The maximum amount of time to wait for failover to begin</description>
<name>FailoverStartTimeout</name>
Modified: trunk/src/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/Topic-xmbean.xml 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/etc/xmdesc/Topic-xmbean.xml 2006-12-31 09:49:46 UTC (rev 1879)
@@ -57,6 +57,18 @@
<name>ServerPeer</name>
<type>javax.management.ObjectName</type>
</attribute>
+
+ <attribute access="read-write" getMethod="getDLQ" setMethod="setDLQ">
+ <description>The DLQ for this queue, overrides the default DLQ on the server peer</description>
+ <name>DLQ</name>
+ <type>javax.management.ObjectName</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getExpiryQueue" setMethod="setExpiryQueue">
+ <description>The expiry queue for this queue, overrides the default expiry queue on the server peer</description>
+ <name>ExpiryQueue</name>
+ <type>javax.management.ObjectName</type>
+ </attribute>
<attribute access="read-write" getMethod="getSecurityConfig" setMethod="setSecurityConfig">
<description>The destination's security configuration</description>
@@ -93,7 +105,15 @@
<name>Clustered</name>
<type>boolean</type>
</attribute>
+
+ <!-- instance access -->
+ <attribute access="read-only" getMethod="getInstance">
+ <description>The instance to be accessed by select plugins via a typed hard reference</description>
+ <name>Instance</name>
+ <type>org.jboss.messaging.core.plugin.contract.MessagingComponent</type>
+ </attribute>
+
<!-- Managed operations -->
<operation>
Modified: trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -21,6 +21,8 @@
*/
package org.jboss.jms.client.container;
+import javax.jms.MessageListener;
+
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
import org.jboss.jms.client.delegate.DelegateSupport;
@@ -31,6 +33,8 @@
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
+import org.jboss.logging.Logger;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -50,6 +54,8 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(ConsumerAspect.class);
+
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -133,12 +139,42 @@
//in the right order
del.cancelInflightMessages(lastDeliveryId);
- //And then we cancel any messages still in the message callback handler buffer
+ //And then we cancel any messages still in the message callback handler buffer
consumerState.getMessageCallbackHandler().cancelBuffer();
return res;
}
+ public Object handleReceive(Invocation invocation) throws Throwable
+ {
+ MethodInvocation mi = (MethodInvocation)invocation;
+ Object[] args = mi.getArguments();
+ long timeout = args == null ? 0 : ((Long)args[0]).longValue();
+
+ return getMessageCallbackHandler(invocation).receive(timeout);
+ }
+
+ public Object handleReceiveNoWait(Invocation invocation) throws Throwable
+ {
+ return getMessageCallbackHandler(invocation).receive(-1);
+ }
+
+ public Object handleSetMessageListener(Invocation invocation) throws Throwable
+ {
+ MethodInvocation mi = (MethodInvocation)invocation;
+ Object[] args = mi.getArguments();
+ MessageListener l = (MessageListener)args[0];
+
+ getMessageCallbackHandler(invocation).setMessageListener(l);
+
+ return null;
+ }
+
+ public MessageListener handleGetMessageListener(Invocation invocation) throws Throwable
+ {
+ return getMessageCallbackHandler(invocation).getMessageListener();
+ }
+
public Object handleGetDestination(Invocation invocation) throws Throwable
{
return getState(invocation).getDestination();
@@ -165,5 +201,11 @@
return (ConsumerState)((DelegateSupport)inv.getTargetObject()).getState();
}
+ private MessageCallbackHandler getMessageCallbackHandler(Invocation inv)
+ {
+ ConsumerState state = (ConsumerState)((DelegateSupport)inv.getTargetObject()).getState();
+ return state.getMessageCallbackHandler();
+ }
+
// Inner classes -------------------------------------------------
}
Deleted: trunk/src/main/org/jboss/jms/client/container/ReceiverAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ReceiverAspect.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/container/ReceiverAspect.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -1,101 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.client.container;
-
-import javax.jms.MessageListener;
-
-import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.remoting.MessageCallbackHandler;
-import org.jboss.jms.client.state.ConsumerState;
-
-/**
- *
- * This aspect handles receive functionality for a message consumer
- *
- * This aspect is PER_VM.
- *
- *
- * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- */
-public class ReceiverAspect
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public Object handleReceive(Invocation invocation) throws Throwable
- {
- MethodInvocation mi = (MethodInvocation)invocation;
- Object[] args = mi.getArguments();
- long timeout = args == null ? 0 : ((Long)args[0]).longValue();
-
- return getMessageCallbackHandler(invocation).receive(timeout);
- }
-
- public Object handleReceiveNoWait(Invocation invocation) throws Throwable
- {
- return getMessageCallbackHandler(invocation).receive(-1);
- }
-
- public Object handleSetMessageListener(Invocation invocation) throws Throwable
- {
- MethodInvocation mi = (MethodInvocation)invocation;
- Object[] args = mi.getArguments();
- MessageListener l = (MessageListener)args[0];
-
- getMessageCallbackHandler(invocation).setMessageListener(l);
-
- return null;
- }
-
- public MessageListener handleGetMessageListener(Invocation invocation) throws Throwable
- {
- return getMessageCallbackHandler(invocation).getMessageListener();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private MessageCallbackHandler getMessageCallbackHandler(Invocation inv)
- {
- ConsumerState state = (ConsumerState)((DelegateSupport)inv.getTargetObject()).getState();
- return state.getMessageCallbackHandler();
- }
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -26,18 +26,24 @@
import java.util.List;
import javax.jms.IllegalStateException;
+import javax.jms.Message;
import javax.jms.Session;
+import javax.jms.TransactionInProgressException;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.MessageCallbackHandler;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.state.SessionState;
+import org.jboss.jms.delegate.ConnectionDelegate;
import org.jboss.jms.delegate.SessionDelegate;
+import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.server.endpoint.DefaultCancel;
import org.jboss.jms.server.endpoint.DeliveryInfo;
+import org.jboss.jms.tx.LocalTx;
import org.jboss.jms.tx.ResourceManager;
import org.jboss.logging.Logger;
@@ -109,7 +115,7 @@
ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
(state.isXA() && state.getCurrentTxId() == null))
{
- //Acknowledge any outstanding auto ack
+ //Acknowledge or cancel any outstanding auto ack
DeliveryInfo remainingAutoAck = state.getAutoAckInfo();
@@ -117,8 +123,8 @@
{
if (trace) { log.trace(this + " handleClosing(). Found remaining auto ack. Will ack it " + remainingAutoAck.getDeliveryId()); }
- ackDelivery(del, remainingAutoAck);
-
+ ackDelivery(del, remainingAutoAck);
+
if (trace) { log.trace(this + " acked it"); }
state.setAutoAckInfo(null);
@@ -139,34 +145,46 @@
{
DeliveryInfo ack = (DeliveryInfo)i.next();
- DefaultCancel cancel = new DefaultCancel(ack.getMessageProxy().getDeliveryId(), ack.getMessageProxy().getDeliveryCount());
+ DefaultCancel cancel = new DefaultCancel(ack.getMessageProxy().getDeliveryId(),
+ ack.getMessageProxy().getDeliveryCount(),
+ false, false);
+
cancels.add(cancel);
- }
+ }
if (!cancels.isEmpty())
- {
- del.cancelDeliveries(cancels);
+ {
+ del.cancelDeliveries(cancels);
}
state.getClientAckList().clear();
}
-
-
+
//TODO - we should also cancel any deliveries remaining in any transaction for the session
//so the delivery count gets updated to the server, and not rely on the server side close
//cancelling them
return invocation.invokeNext();
}
-
-
+
public Object handleClose(Invocation invocation) throws Throwable
{
Object res = invocation.invokeNext();
+
+ SessionState state = getState(invocation);
+ ConnectionState connState = (ConnectionState)state.getParent();
+
+ Object xid = state.getCurrentTxId();
+
+ if (xid != null)
+ {
+ //Remove transaction from the resource manager
+ connState.getResourceManager().removeTx(xid);
+ }
+
// We must explicitly shutdown the executor
- SessionState state = getState(invocation);
state.getExecutor().shutdownNow();
return res;
@@ -194,10 +212,7 @@
throw new IllegalStateException("CLIENT_ACKNOWLEDGE cannot be used with a connection consumer");
}
- state.getClientAckList().add(info);
-
- //We can return immediately
- return null;
+ state.getClientAckList().add(info);
}
else if (ackMode == Session.AUTO_ACKNOWLEDGE ||
ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
@@ -206,36 +221,39 @@
//We collect the single acknowledgement in the state.
//Currently DUPS_OK is treated the same as AUTO_ACKNOWLDGE
//Also XA sessions not enlisted in a global tx are treated as AUTO_ACKNOWLEDGE
-
-
+
if (trace) { log.trace(this + " delivery id: " + info.getDeliveryId() + " added to client ack member"); }
state.setAutoAckInfo(info);
-
- //We can return immediately
- return null;
}
-
- //Transactional - need to carry on down the stack
- return invocation.invokeNext();
- }
+ else
+ {
+ Object txID = state.getCurrentTxId();
- /* Used for client acknowledge */
- public Object handleAcknowledgeAll(Invocation invocation) throws Throwable
- {
- MethodInvocation mi = (MethodInvocation)invocation;
- SessionState state = getState(invocation);
- SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
- if (!state.getClientAckList().isEmpty())
- {
- //CLIENT_ACKNOWLEDGE can't be used with a MDB so it is safe to always acknowledge all
- //on this session (rather than the connection consumer session)
- del.acknowledgeDeliveries(state.getClientAckList());
+ if (txID != null)
+ {
+ // the session is non-XA and transacted, or XA and enrolled in a global transaction. An
+ // XA session that has not been enrolled in a global transaction behaves as a
+ // non-transacted session.
+
+ ConnectionState connState = (ConnectionState)state.getParent();
+
+ if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
+
+ //If the ack is for a delivery that came through via a connection consumer then we
+ //use the connectionConsumer session as the session id, otherwise we use this sessions'
+ //session id
+
+ ClientSessionDelegate connectionConsumerDelegate =
+ (ClientSessionDelegate)info.getConnectionConsumerSession();
+
+ int sessionId = connectionConsumerDelegate != null ?
+ connectionConsumerDelegate.getID() : state.getSessionId();
+
+ connState.getResourceManager().addAck(txID, sessionId, info);
+ }
+ }
- state.getClientAckList().clear();
- }
-
return null;
}
@@ -246,13 +264,6 @@
int ackMode = state.getAcknowledgeMode();
- boolean cancel = ((Boolean)mi.getArguments()[0]).booleanValue();
-
- if (cancel && ackMode != Session.AUTO_ACKNOWLEDGE && ackMode != Session.DUPS_OK_ACKNOWLEDGE)
- {
- throw new IllegalStateException("Ack mode must be AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE");
- }
-
if (ackMode == Session.AUTO_ACKNOWLEDGE ||
ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
(state.isXA() && state.getCurrentTxId() == null))
@@ -263,6 +274,10 @@
SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
+ //It is possible that session.recover() is called inside a message listener onMessage
+ //method - i.e. between the invocations of preDeliver and postDeliver.
+ //In this case we don't want to acknowledge the last delivered messages - since it
+ //will be redelivered
if (!state.isRecoverCalled())
{
DeliveryInfo deliveryInfo = state.getAutoAckInfo();
@@ -274,14 +289,7 @@
if (trace) { log.trace(this + " auto acking delivery " + deliveryInfo.getDeliveryId()); }
- if (cancel)
- {
- cancelDelivery(sd, deliveryInfo);
- }
- else
- {
- ackDelivery(sd, deliveryInfo);
- }
+ ackDelivery(sd, deliveryInfo);
state.setAutoAckInfo(null);
}
@@ -295,7 +303,26 @@
return null;
}
-
+
+ /* Used for client acknowledge */
+ public Object handleAcknowledgeAll(Invocation invocation) throws Throwable
+ {
+ MethodInvocation mi = (MethodInvocation)invocation;
+ SessionState state = getState(invocation);
+ SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+
+ if (!state.getClientAckList().isEmpty())
+ {
+ //CLIENT_ACKNOWLEDGE can't be used with a MDB so it is safe to always acknowledge all
+ //on this session (rather than the connection consumer session)
+ del.acknowledgeDeliveries(state.getClientAckList());
+
+ state.getClientAckList().clear();
+ }
+
+ return null;
+ }
+
/*
* Called when session.recover is called
*/
@@ -325,8 +352,12 @@
}
else
{
+ //auto_ack or dups_ok
+
DeliveryInfo info = state.getAutoAckInfo();
+ //Don't recover if it's already to cancel
+
if (info != null)
{
List redels = new ArrayList();
@@ -377,18 +408,17 @@
public Object handleRedeliver(Invocation invocation) throws Throwable
{
MethodInvocation mi = (MethodInvocation)invocation;
- SessionState state = getState(invocation);
+ SessionState state = getState(invocation);
- // We put the messages back in the front of their appropriate consumer buffers and set
- // JMSRedelivered to true.
+ // We put the messages back in the front of their appropriate consumer buffers
List toRedeliver = (List)mi.getArguments()[0];
-
+
if (trace) { log.trace(this + " handleRedeliver() called: " + toRedeliver); }
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
- // Need to be recovered in reverse order.
+ // Need to be redelivered in reverse order.
for (int i = toRedeliver.size() - 1; i >= 0; i--)
{
DeliveryInfo info = (DeliveryInfo)toRedeliver.get(i);
@@ -400,11 +430,14 @@
{
// This is ok. The original consumer has closed, so we cancel the message
+ //FIXME - this needs to be done atomically for all cancels
+
cancelDelivery(del, info);
}
else
{
if (trace) { log.trace("Adding proxy back to front of buffer"); }
+
handler.addToFrontOfBuffer(proxy);
}
}
@@ -412,6 +445,100 @@
return null;
}
+ public Object handleCommit(Invocation invocation) throws Throwable
+ {
+ SessionState state = getState(invocation);
+
+ if (!state.isTransacted())
+ {
+ throw new IllegalStateException("Cannot commit a non-transacted session");
+ }
+
+ if (state.isXA())
+ {
+ throw new TransactionInProgressException("Cannot call commit on an XA session");
+ }
+
+ ConnectionState connState = (ConnectionState)state.getParent();
+ ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
+
+ try
+ {
+ connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
+ }
+ finally
+ {
+ //Start new local tx
+ Object xid = connState.getResourceManager().createLocalTx();
+
+ state.setCurrentTxId(xid);
+ }
+
+ //TODO on commit we don't want to ACK any messages that have exceeded the max delivery count OR
+
+ return null;
+ }
+
+ public Object handleRollback(Invocation invocation) throws Throwable
+ {
+ SessionState state = getState(invocation);
+
+ if (!state.isTransacted())
+ {
+ throw new IllegalStateException("Cannot rollback a non-transacted session");
+ }
+
+ if (state.isXA())
+ {
+ throw new TransactionInProgressException("Cannot call rollback on an XA session");
+ }
+
+ ConnectionState connState = (ConnectionState)state.getParent();
+ ResourceManager rm = connState.getResourceManager();
+ ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
+
+ try
+ {
+ rm.rollbackLocal((LocalTx)state.getCurrentTxId(), conn);
+ }
+ finally
+ {
+ // start new local tx
+ Object xid = rm.createLocalTx();
+ state.setCurrentTxId(xid);
+ }
+
+ return null;
+ }
+
+ public Object handleSend(Invocation invocation) throws Throwable
+ {
+ SessionState state = getState(invocation);
+ Object txID = state.getCurrentTxId();
+
+ if (txID != null)
+ {
+ // the session is non-XA and transacted, or XA and enrolled in a global transaction, so
+ // we add the message to a transaction instead of sending it now. An XA session that has
+ // not been enrolled in a global transaction behaves as a non-transacted session.
+
+ ConnectionState connState = (ConnectionState)state.getParent();
+ MethodInvocation mi = (MethodInvocation)invocation;
+ Message m = (Message)mi.getArguments()[0];
+
+ if (trace) { log.trace("sending message " + m + " transactionally, queueing on resource manager txID=" + txID + " sessionID= " + state.getSessionId()); }
+
+ connState.getResourceManager().addMessage(txID, state.getSessionId(), (JBossMessage)m);
+
+ // ... and we don't invoke any further interceptors in the stack
+ return null;
+ }
+
+ if (trace) { log.trace("sending message NON-transactionally"); }
+
+ return invocation.invokeNext();
+ }
+
public Object handleGetXAResource(Invocation invocation) throws Throwable
{
return getState(invocation).getXAResource();
@@ -462,7 +589,8 @@
SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
- sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryId(), delivery.getMessageProxy().getDeliveryCount()));
+ sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryId(),
+ delivery.getMessageProxy().getDeliveryCount(), false, false));
}
// Inner Classes -------------------------------------------------
Deleted: trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -1,229 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.client.container;
-
-import javax.jms.IllegalStateException;
-import javax.jms.Message;
-import javax.jms.TransactionInProgressException;
-
-import org.jboss.aop.joinpoint.Invocation;
-import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.jms.client.delegate.ClientSessionDelegate;
-import org.jboss.jms.client.delegate.DelegateSupport;
-import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.client.state.HierarchicalState;
-import org.jboss.jms.client.state.SessionState;
-import org.jboss.jms.delegate.ConnectionDelegate;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.server.endpoint.DeliveryInfo;
-import org.jboss.jms.tx.LocalTx;
-import org.jboss.jms.tx.ResourceManager;
-import org.jboss.logging.Logger;
-
-/**
- * This aspect handles transaction related logic
- *
- * This aspect is PER_VM.
- *
- * @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
- * @author <a href="mailto:ovidiu at jboss.com>Ovidiu Feodorov</a>
- *
- * $Id$
- */
-public class TransactionAspect
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(TransactionAspect.class);
-
- // Attributes ----------------------------------------------------
-
- private boolean trace = log.isTraceEnabled();
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public Object handleClose(Invocation invocation) throws Throwable
- {
- Object res = invocation.invokeNext();
-
- SessionState state = (SessionState)getState(invocation);
-
- ConnectionState connState = (ConnectionState)state.getParent();
-
- Object xid = state.getCurrentTxId();
-
- if (xid != null)
- {
- //Remove transaction from the resource manager
- connState.getResourceManager().removeTx(xid);
- }
-
- return res;
- }
-
- public Object handleCommit(Invocation invocation) throws Throwable
- {
- SessionState state = (SessionState)getState(invocation);
-
- if (!state.isTransacted())
- {
- throw new IllegalStateException("Cannot commit a non-transacted session");
- }
-
- if (state.isXA())
- {
- throw new TransactionInProgressException("Cannot call commit on an XA session");
- }
-
- ConnectionState connState = (ConnectionState)state.getParent();
- ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-
- try
- {
- connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
- }
- finally
- {
- //Start new local tx
- Object xid = connState.getResourceManager().createLocalTx();
-
- state.setCurrentTxId(xid);
- }
-
- return null;
- }
-
- public Object handleRollback(Invocation invocation) throws Throwable
- {
- SessionState state = (SessionState)getState(invocation);
-
- if (!state.isTransacted())
- {
- throw new IllegalStateException("Cannot rollback a non-transacted session");
- }
-
- if (state.isXA())
- {
- throw new TransactionInProgressException("Cannot call rollback on an XA session");
- }
-
- ConnectionState connState = (ConnectionState)state.getParent();
- ResourceManager rm = connState.getResourceManager();
- ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-
- try
- {
- rm.rollbackLocal((LocalTx)state.getCurrentTxId(), conn);
- }
- finally
- {
- // start new local tx
- Object xid = rm.createLocalTx();
- state.setCurrentTxId(xid);
- }
-
- return null;
- }
-
- public Object handleSend(Invocation invocation) throws Throwable
- {
- SessionState state = (SessionState)getState(invocation);
- Object txID = state.getCurrentTxId();
-
- if (txID != null)
- {
- // the session is non-XA and transacted, or XA and enrolled in a global transaction, so
- // we add the message to a transaction instead of sending it now. An XA session that has
- // not been enrolled in a global transaction behaves as a non-transacted session.
-
- ConnectionState connState = (ConnectionState)state.getParent();
- MethodInvocation mi = (MethodInvocation)invocation;
- Message m = (Message)mi.getArguments()[0];
-
- if (trace) { log.trace("sending message " + m + " transactionally, queueing on resource manager txID=" + txID + " sessionID= " + state.getSessionId()); }
-
- connState.getResourceManager().addMessage(txID, state.getSessionId(), (JBossMessage)m);
-
- // ... and we don't invoke any further interceptors in the stack
- return null;
- }
-
- if (trace) { log.trace("sending message NON-transactionally"); }
-
- return invocation.invokeNext();
- }
-
- public Object handlePreDeliver(Invocation invocation) throws Throwable
- {
- SessionState state = (SessionState)getState(invocation);
- Object txID = state.getCurrentTxId();
-
- if (txID != null)
- {
- // the session is non-XA and transacted, or XA and enrolled in a global transaction. An
- // XA session that has not been enrolled in a global transaction behaves as a
- // non-transacted session.
-
- MethodInvocation mi = (MethodInvocation)invocation;
-
- DeliveryInfo info = (DeliveryInfo)mi.getArguments()[0];
-
- ConnectionState connState = (ConnectionState)state.getParent();
-
- if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
-
- //If the ack is for a delivery that came through via a connection consumer then we
- //use the connectionConsumer session as the session id, otherwise we use this sessions'
- //session id
-
- ClientSessionDelegate connectionConsumerDelegate =
- (ClientSessionDelegate)info.getConnectionConsumerSession();
-
- int sessionId = connectionConsumerDelegate != null ?
- connectionConsumerDelegate.getID() : state.getSessionId();
-
- connState.getResourceManager().addAck(txID, sessionId, info);
- }
-
- return null;
- }
-
- // Protected ------------------------------------------------------
-
- // Package Private ------------------------------------------------
-
- // Private --------------------------------------------------------
-
- private HierarchicalState getState(Invocation inv)
- {
- return ((DelegateSupport)inv.getTargetObject()).getState();
- }
-
- // Inner Classes --------------------------------------------------
-
-}
-
-
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -306,7 +306,7 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void postDeliver(boolean cancel) throws JMSException
+ public void postDeliver() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -34,11 +34,14 @@
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.message.MessageProxy;
+import org.jboss.jms.server.endpoint.Cancel;
import org.jboss.jms.server.endpoint.DefaultCancel;
import org.jboss.jms.server.endpoint.DeliveryInfo;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Message;
import org.jboss.messaging.util.Future;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
@@ -63,6 +66,67 @@
log = Logger.getLogger(MessageCallbackHandler.class);
trace = log.isTraceEnabled();
}
+
+ //FIXME temporary - until remoting provides true asynch invocations
+ static Executor exec = new QueuedExecutor();
+
+ private static boolean checkExpiredOrReachedMaxdeliveries(MessageProxy proxy, SessionDelegate del,
+ int maxDeliveries) throws JMSException
+ {
+ Message msg = proxy.getMessage();
+
+ boolean expired = msg.isExpired();
+
+ boolean reachedMaxDeliveries = proxy.getDeliveryCount() == maxDeliveries;
+
+ if (expired || reachedMaxDeliveries)
+ {
+ if (trace)
+ {
+ if (expired)
+ {
+ log.trace("Message " + proxy.getMessage() + " has expired - cancelling to server");
+ }
+ else
+ {
+ log.trace("Message " + proxy.getMessage() + " has reached max deliveries - cancelling to server");
+ }
+ }
+ final Cancel cancel = new DefaultCancel(proxy.getDeliveryId(), proxy.getDeliveryCount(),
+ expired, reachedMaxDeliveries);
+
+ //FIXME - this cancel should be sent using remoting true asynch invocations
+ //for now we just send on a different thread to prevent deadlocks
+
+ final SessionDelegate sess = del;
+
+ try
+ {
+
+ exec.execute(new Runnable() { public void run()
+ {
+ try
+ {
+ sess.cancelDelivery(cancel);
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to cancel delivery", e);
+ }
+ }});
+ }
+ catch (InterruptedException e)
+ {
+ log.error("Thread interrupted", e);
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
//This is static so it can be called by the asf layer too
public static void callOnMessage(SessionDelegate sess,
@@ -75,79 +139,58 @@
int maxDeliveries,
SessionDelegate connectionConsumerSession)
throws JMSException
- {
+ {
+ if (checkExpiredOrReachedMaxdeliveries(m, sess, maxDeliveries))
+ {
+ //Message has been cancelled
+ return;
+ }
+
+ DeliveryInfo deliveryInfo = new DeliveryInfo(m, consumerID, channelID, connectionConsumerSession);
+
+ m.incDeliveryCount();
+
// If this is the callback-handler for a connection consumer we don't want to acknowledge or
// add anything to the tx for this session.
if (!isConnectionConsumer)
{
- sess.preDeliver(new DeliveryInfo(m, consumerID, channelID, connectionConsumerSession));
- }
-
- int tries = 0;
+ //We need to call preDeliver, deliver the message then call postDeliver - this is because
+ //it is legal to call session.recover(), or session.rollback() from within the onMessage()
+ //method in which case the last message needs to be delivered so it needs to know about it
+ sess.preDeliver(deliveryInfo);
+ }
- boolean cancel = false;
-
- while (true)
+ try
{
- try
- {
- if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
-
- m.incDeliveryCount();
-
- listener.onMessage(m);
+ if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
+
+ listener.onMessage(m);
- if (trace) { log.trace("listener's onMessage() finished"); }
-
- break;
- }
- catch (RuntimeException e)
- {
- long id = m.getMessage().getMessageID();
-
- log.error("RuntimeException was thrown from onMessage, " + id + " will be redelivered", e);
-
- // See JMS 1.1 spec 4.5.2
-
- if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
- {
- // We redeliver a certain number of times
- if (tries < maxDeliveries)
- {
- tries++;
- }
- else
- {
- log.error("Max redeliveries has occurred for message: " + m.getJMSMessageID());
-
- // postdeliver will do a cancel rather than an ack which will cause the mesage
- // to end up in the DLQ
-
- cancel = true;
-
- break;
- }
- }
- else
- {
- // Session is either transacted or CLIENT_ACKNOWLEDGE
- // We just deliver next message
- if (trace) { log.trace("ignoring exception on " + id); }
-
- break;
- }
- }
+ if (trace) { log.trace("listener's onMessage() finished"); }
}
+ catch (RuntimeException e)
+ {
+ long id = m.getMessage().getMessageID();
+ log.error("RuntimeException was thrown from onMessage, " + id + " will be redelivered", e);
+
+ // See JMS 1.1 spec 4.5.2
+
+ if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ {
+ sess.recover();
+ }
+ }
+
if (!sess.isClosed())
{
// postDeliver only if the session is not closed
-
+
// If this is the callback-handler for a connection consumer we don't want to acknowledge or
// add anything to the tx for this session
if (!isConnectionConsumer)
{
- sess.postDeliver(cancel);
+ sess.postDeliver();
}
}
}
@@ -278,7 +321,7 @@
// delivered in a session.
if (!buffer.isEmpty())
- {
+ {
// Now we cancel any deliveries that might be waiting in our buffer. This is because
// otherwise the messages wouldn't get cancelled until the corresponding session died.
// So if another consumer in another session tried to consume from the channel before that
@@ -293,7 +336,7 @@
{
MessageProxy mp = (MessageProxy)i.next();
- DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount());
+ DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount(), false, false);
cancels.add(ack);
}
@@ -409,17 +452,20 @@
if (trace) { log.trace(this + " received " + m + " after being blocked on buffer"); }
- // If message is expired we still call pre and post deliver. This makes sure the
- // message is acknowledged so it gets removed from the queue/subscription.
-
- if (!isConnectionConsumer)
+ boolean ignore = checkExpiredOrReachedMaxdeliveries(m, sessionDelegate, maxDeliveries);
+
+ if (!isConnectionConsumer && !ignore)
{
- sessionDelegate.preDeliver(new DeliveryInfo(m, consumerID, channelID, null));
+ DeliveryInfo info = new DeliveryInfo(m, consumerID, channelID, null);
+
+ m.incDeliveryCount();
+
+ sessionDelegate.preDeliver(info);
- sessionDelegate.postDeliver(false);
+ sessionDelegate.postDeliver();
}
-
- if (!m.getMessage().isExpired())
+
+ if (!ignore)
{
if (trace) { log.trace(this + ": message " + m + " is not expired, pushing it to the caller"); }
@@ -428,7 +474,7 @@
if (trace)
{
- log.trace(this + ": message expired, discarding");
+ log.trace(this + ": message expired or exceeded max deliveries, discarding");
}
// the message expired, so discard the message, adjust timeout and reenter the buffer
@@ -456,8 +502,6 @@
sendChangeRateMessage(1);
}
- m.incDeliveryCount();
-
if (trace) { log.trace(this + " receive() returning " + m); }
return m;
@@ -480,7 +524,7 @@
public void setConsumerId(int consumerId)
{
- this.consumerID=consumerId;
+ this.consumerID = consumerId;
}
public void addToFrontOfBuffer(MessageProxy proxy)
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -72,6 +72,9 @@
// List<DeliveryInfo>
private List clientAckList;
+ // List<DeliveryInfo>
+ //private List clientCancelList;
+
private DeliveryInfo autoAckInfo;
private ConnectionState parent;
@@ -111,6 +114,8 @@
clientAckList = new ArrayList();
+ // clientCancelList = new ArrayList();
+
// TODO could optimise this to use the same map of callbackmanagers (which holds refs
// to callbackhandlers) in the connection, instead of maintaining another map
callbackHandlers = new HashMap();
@@ -135,8 +140,7 @@
{
this.delegate=(SessionDelegate)delegate;
}
-
-
+
/**
* @return List<AckInfo>
*/
@@ -232,9 +236,7 @@
// When failing over a session, we keep the old session's state but there are certain fields
// we need to update
public void copyState(SessionState newState)
- {
- //this.delegate = newState.delegate;
-
+ {
this.sessionId = newState.sessionId;
}
}
Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -68,7 +68,7 @@
void preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
- void postDeliver(boolean cancel) throws JMSException;
+ void postDeliver() throws JMSException;
MessageListener getMessageListener() throws JMSException;
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -30,6 +30,7 @@
import java.util.Set;
import javax.management.Attribute;
+import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -37,6 +38,7 @@
import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
import org.jboss.jms.server.connectormanager.SimpleConnectorManager;
+import org.jboss.jms.server.destination.ManagedQueue;
import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import org.jboss.jms.server.plugin.contract.JMSUserManager;
import org.jboss.jms.server.remoting.JMSServerInvocationHandler;
@@ -106,10 +108,9 @@
private int objectIDSequence = 1;
- private int maxDeliveryAttempts = 10;
+ //The default maximum number of delivery attempts before sending to DLQ - can be overridden on the destination
+ private int defaultMaxDeliveryAttempts = 10;
- private String dlqName;
-
private Object failoverStatusLock;
private long failoverStartTimeout = 3000;
@@ -143,6 +144,12 @@
protected ObjectName jmsUserManagerObjectName;
protected JMSUserManager jmsUserManager;
+
+ protected ObjectName defaultDLQObjectName;
+ protected Queue defaultDLQ;
+
+ protected ObjectName defaultExpiryQueueObjectName;
+ protected Queue defaultExpiryQueue;
//Other stuff
@@ -304,60 +311,81 @@
// JMX Attributes ------------------------------------------------
- public String getDLQName()
+ // Plugins
+
+ public synchronized ObjectName getPersistenceManager()
{
- return dlqName;
+ return persistenceManagerObjectName;
}
- public void setDLQName(String dlqName)
+ public synchronized void setPersistenceManager(ObjectName on)
{
- this.dlqName = dlqName;
+ if (started)
+ {
+ log.warn("Cannot set persistence manager on server peer when server peer is started");
+ return;
+ }
+ persistenceManagerObjectName = on;
}
- public int getMaxDeliveryAttempts()
+ public synchronized ObjectName getPostOffice()
{
- return maxDeliveryAttempts;
+ return postOfficeObjectName;
}
- public void setMaxDeliveryAttempts(int attempts)
+ public synchronized void setPostOffice(ObjectName on)
{
- this.maxDeliveryAttempts = attempts;
+ if (started)
+ {
+ log.warn("Cannot set post office on server peer when server peer is started");
+ return;
+ }
+ postOfficeObjectName = on;
}
- public ObjectName getPersistenceManager()
+ public synchronized ObjectName getJmsUserManager()
{
- return persistenceManagerObjectName;
+ return jmsUserManagerObjectName;
}
- public void setPersistenceManager(ObjectName on)
+ public synchronized void setJMSUserManager(ObjectName on)
{
- persistenceManagerObjectName = on;
+ if (started)
+ {
+ log.warn("Cannot set jms user manager on server peer when server peer is started");
+ return;
+ }
+ jmsUserManagerObjectName = on;
}
-
- public ObjectName getPostOffice()
+
+ public synchronized ObjectName getDefaultDLQ()
{
- return postOfficeObjectName;
+ return defaultDLQObjectName;
}
- public void setPostOffice(ObjectName on)
+ public synchronized void setDefaultDLQ(ObjectName on)
{
- postOfficeObjectName = on;
+ defaultDLQObjectName = on;
}
-
- public ObjectName getJmsUserManager()
+
+ public synchronized ObjectName getDefaultExpiryQueue()
{
- return jmsUserManagerObjectName;
+ return defaultExpiryQueueObjectName;
}
- public void setJMSUserManager(ObjectName on)
+ public synchronized void setDefaultExpiryQueue(ObjectName on)
{
- jmsUserManagerObjectName = on;
+ this.defaultExpiryQueueObjectName = on;
}
+
+ // Instance access
public Object getInstance()
{
return this;
}
+
+ //read only JMX attributes
public String getJMSVersion()
{
@@ -408,8 +436,10 @@
{
return defaultTopicJNDIContext;
}
+
+ //Read - write attributes
- public void setSecurityDomain(String securityDomain) throws Exception
+ public synchronized void setSecurityDomain(String securityDomain) throws Exception
{
try
{
@@ -421,66 +451,66 @@
}
}
- public String getSecurityDomain()
+ public synchronized String getSecurityDomain()
{
return securityStore.getSecurityDomain();
}
- public void setDefaultSecurityConfig(Element conf) throws Exception
+ public synchronized void setDefaultSecurityConfig(Element conf) throws Exception
{
securityStore.setDefaultSecurityConfig(conf);
}
- public Element getDefaultSecurityConfig()
+ public synchronized Element getDefaultSecurityConfig()
{
return securityStore.getDefaultSecurityConfig();
}
-
- public IDManager getMessageIDManager()
+
+ public synchronized int getQueuedExecutorPoolSize()
{
- return messageIDManager;
- }
-
- public IDManager getChannelIDManager()
- {
- return channelIDManager;
- }
-
- public ServerInvocationHandler getInvocationHandler()
- {
- return handler;
- }
-
- public int getQueuedExecutorPoolSize()
- {
return queuedExecutorPoolSize;
}
- public void setQueuedExecutorPoolSize(int poolSize)
+ public synchronized void setQueuedExecutorPoolSize(int poolSize)
{
+ if (started)
+ {
+ log.warn("Cannot set jms queued executor pool size on server peer when server peer is started");
+ return;
+ }
this.queuedExecutorPoolSize = poolSize;
}
- public long getFailoverStartTimeout()
+ public synchronized long getFailoverStartTimeout()
{
return this.failoverStartTimeout;
}
- public void setFailoverStartTimeout(long timeout)
+ public synchronized void setFailoverStartTimeout(long timeout)
{
this.failoverStartTimeout = timeout;
}
- public long getFailoverCompleteTimeout()
+ public synchronized long getFailoverCompleteTimeout()
{
return this.failoverCompleteTimeout;
}
- public void setFailoverCompleteTimeout(long timeout)
+ public synchronized void setFailoverCompleteTimeout(long timeout)
{
this.failoverCompleteTimeout = timeout;
}
+ public synchronized int getDefaultMaxDeliveryAttempts()
+ {
+ return defaultMaxDeliveryAttempts;
+ }
+
+ public synchronized void setDefaultMaxDeliveryAttempts(int attempts)
+ {
+ this.defaultMaxDeliveryAttempts = attempts;
+ }
+
// JMX Operations ------------------------------------------------
public String createQueue(String name, String jndiName) throws Exception
@@ -569,6 +599,21 @@
// Public --------------------------------------------------------
+ public IDManager getMessageIDManager()
+ {
+ return messageIDManager;
+ }
+
+ public IDManager getChannelIDManager()
+ {
+ return channelIDManager;
+ }
+
+ public ServerInvocationHandler getInvocationHandler()
+ {
+ return handler;
+ }
+
public ServerSessionEndpoint getSession(Integer sessionID)
{
return (ServerSessionEndpoint)sessions.get(sessionID);
@@ -587,26 +632,72 @@
}
}
- public Queue getDLQ() throws Exception
+ public synchronized Queue getDefaultDLQInstance() throws Exception
{
- if (dlqName == null)
- {
- //No DLQ name specified so there is no DLQ
- return null;
+ Queue dlq = null;
+
+ if (defaultDLQObjectName != null)
+ {
+ ManagedQueue dest = null;
+
+ try
+ {
+ dest = (ManagedQueue)getServer().
+ getAttribute(defaultDLQObjectName, "Instance");
+ }
+ catch (InstanceNotFoundException e)
+ {
+ //Ok
+ }
+
+ if (dest != null)
+ {
+ PostOffice po = getPostOfficeInstance();
+
+ Binding binding = po.getBindingForQueueName(dest.getName());
+
+ if (binding != null && binding.getQueue().isActive())
+ {
+ dlq = binding.getQueue();
+ }
+ }
}
-
- PostOffice postOffice = getPostOfficeInstance();
-
- Binding binding = postOffice.getBindingForQueueName(dlqName);
- if (binding != null && binding.getQueue().isActive())
+ return dlq;
+ }
+
+ public synchronized Queue getDefaultExpiryQueueInstance() throws Exception
+ {
+ Queue expiryQueue = null;
+
+ if (defaultExpiryQueueObjectName != null)
{
- return binding.getQueue();
+ ManagedQueue dest = null;
+
+ try
+ {
+ dest = (ManagedQueue)getServer().
+ getAttribute(defaultExpiryQueueObjectName, "Instance");
+ }
+ catch (InstanceNotFoundException e)
+ {
+ //Ok
+ }
+
+ if (dest != null)
+ {
+ PostOffice po = getPostOfficeInstance();
+
+ Binding binding = po.getBindingForQueueName(dest.getName());
+
+ if (binding != null && binding.getQueue().isActive())
+ {
+ expiryQueue = binding.getQueue();
+ }
+ }
}
- else
- {
- return null;
- }
+
+ return expiryQueue;
}
public TransactionRepository getTxRepository()
@@ -735,8 +826,8 @@
Replicator replicator = getReplicator();
- long startToWait = failoverStartTimeout;
- long completeToWait = failoverCompleteTimeout;
+ long startToWait = getFailoverStartTimeout();
+ long completeToWait = getFailoverCompleteTimeout();
// Must lock here
synchronized (failoverStatusLock)
Modified: trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -6,6 +6,7 @@
*/
package org.jboss.jms.server.destination;
+import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import org.jboss.jms.server.DestinationManager;
@@ -13,10 +14,14 @@
import org.jboss.jms.server.SecurityManager;
import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.util.ExceptionUtil;
+import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.plugin.IDManager;
import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.plugin.contract.MessagingComponent;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.plugin.contract.ServerPlugin;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.system.ServiceMBeanSupport;
import org.w3c.dom.Element;
@@ -33,7 +38,7 @@
*
* $Id$
*/
-public abstract class DestinationServiceSupport extends ServiceMBeanSupport
+public abstract class DestinationServiceSupport extends ServiceMBeanSupport implements ServerPlugin
{
// Constants -----------------------------------------------------
@@ -43,6 +48,10 @@
private ObjectName serverPeerObjectName;
+ private ObjectName dlqObjectName;
+
+ private ObjectName expiryQueueObjectName;
+
protected boolean started = false;
protected ManagedDestination destination;
@@ -79,6 +88,13 @@
public DestinationServiceSupport()
{
}
+
+ // ServerPlugin implementation ------------------------------------------
+
+ public MessagingComponent getInstance()
+ {
+ return destination;
+ }
// ServiceMBeanSupport overrides -----------------------------------
@@ -167,7 +183,74 @@
{
return serverPeerObjectName;
}
-
+
+ public void setDLQ(ObjectName on)
+ {
+ dlqObjectName = on;
+
+ ManagedQueue dest = null;
+
+ try
+ {
+
+ try
+ {
+ dest = (ManagedQueue)getServer().
+ getAttribute(dlqObjectName, "Instance");
+ }
+ catch (InstanceNotFoundException e)
+ {
+ //Ok
+ }
+
+ Queue dlq = null;
+
+ if (dest != null)
+ {
+ Binding binding = postOffice.getBindingForQueueName(dest.getName());
+
+ if (binding != null && binding.getQueue().isActive())
+ {
+ dlq = binding.getQueue();
+ }
+ }
+
+ destination.setDLQ(dlq);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to set DLQ", e);
+ }
+ }
+
+ public ObjectName getDLQ()
+ {
+ return dlqObjectName;
+ }
+
+ public void setExpiryQueue(ObjectName on)
+ {
+ expiryQueueObjectName = on;
+
+ Queue expiryQueue = null;
+
+ try
+ {
+ expiryQueue = (Queue)server.getAttribute(expiryQueueObjectName, "Instance");
+ }
+ catch (Exception e)
+ {
+ //Ok
+ }
+
+ destination.setExpiryQueue(expiryQueue);
+ }
+
+ public ObjectName getExpiryQueue()
+ {
+ return expiryQueueObjectName;
+ }
+
public void setSecurityConfig(Element securityConfig) throws Exception
{
try
Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -21,6 +21,8 @@
*/
package org.jboss.jms.server.destination;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.plugin.contract.MessagingComponent;
import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.w3c.dom.Element;
@@ -33,7 +35,7 @@
* $Id$
*
*/
-public abstract class ManagedDestination
+public abstract class ManagedDestination implements MessagingComponent
{
private static final int DEFAULT_FULL_SIZE = 75000;
@@ -62,6 +64,10 @@
protected PostOffice postOffice;
+ protected Queue dlq;
+
+ protected Queue expiryQueue;
+
public ManagedDestination()
{
}
@@ -167,5 +173,35 @@
this.temporary = temporary;
}
+ public Queue getDLQ()
+ {
+ return dlq;
+ }
+
+ public void setDLQ(Queue dlq)
+ {
+ this.dlq = dlq;
+ }
+
+ public Queue getExpiryQueue()
+ {
+ return expiryQueue;
+ }
+
+ public void setExpiryQueue(Queue expiryQueue)
+ {
+ this.expiryQueue = expiryQueue;
+ }
+
public abstract boolean isQueue();
+
+ public void start() throws Exception
+ {
+ //NOOP
+ }
+
+ public void stop() throws Exception
+ {
+ //NOOP
+ }
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -35,4 +35,8 @@
long getDeliveryId();
int getDeliveryCount();
+
+ boolean isExpired();
+
+ boolean isReachedMaxDeliveryAttempts();
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -46,7 +46,11 @@
private long deliveryId;
- private int deliveryCount;
+ private int deliveryCount;
+
+ private boolean expired;
+
+ private boolean reachedMaxDeliveryAttempts;
// Static --------------------------------------------------------
@@ -56,11 +60,15 @@
{
}
- public DefaultCancel(long deliveryId, int deliveryCount)
+ public DefaultCancel(long deliveryId, int deliveryCount, boolean expired, boolean maxDeliveries)
{
this.deliveryId = deliveryId;
this.deliveryCount = deliveryCount;
+
+ this.expired = expired;
+
+ this.reachedMaxDeliveryAttempts = maxDeliveries;
}
// Public --------------------------------------------------------
@@ -74,6 +82,16 @@
{
return deliveryCount;
}
+
+ public boolean isExpired()
+ {
+ return expired;
+ }
+
+ public boolean isReachedMaxDeliveryAttempts()
+ {
+ return reachedMaxDeliveryAttempts;
+ }
// Class YYY overrides -------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -58,6 +58,10 @@
//to the connection consumer's session, otherwise it will be null
private SessionDelegate connectionConsumerSession;
+ // private boolean exceedDeliveryAttempts;
+
+ // private boolean expired;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -95,6 +99,31 @@
{
return connectionConsumerSession;
}
+
+// public void setExceededDeliveryAttempts()
+// {
+// exceedDeliveryAttempts = true;
+// }
+//
+// public void setExpired()
+// {
+// expired = true;
+// }
+//
+// public boolean exceededDeliveryAttempts()
+// {
+// return exceedDeliveryAttempts;
+// }
+//
+// public boolean isExpired()
+// {
+// return expired;
+// }
+//
+// public boolean isToCancel()
+// {
+// return exceedDeliveryAttempts || expired;
+// }
public String toString()
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -29,6 +29,7 @@
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.message.MessageProxy;
import org.jboss.jms.selector.Selector;
+import org.jboss.jms.server.destination.ManagedDestination;
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.jms.server.remoting.MessagingMarshallable;
import org.jboss.jms.util.ExceptionUtil;
@@ -37,6 +38,7 @@
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.Receiver;
import org.jboss.messaging.core.Routable;
import org.jboss.messaging.core.SimpleDelivery;
@@ -92,6 +94,10 @@
private JBossDestination destination;
+ private Queue dlq;
+
+ private Queue expiryQueue;
+
private boolean started;
//This lock protects starting and stopping
@@ -103,9 +109,10 @@
// Constructors --------------------------------------------------
ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
- ServerSessionEndpoint sessionEndpoint,
- String selector, boolean noLocal, JBossDestination dest)
- throws InvalidSelectorException
+ ServerSessionEndpoint sessionEndpoint,
+ String selector, boolean noLocal, JBossDestination dest,
+ Queue dlq, Queue expiryQueue)
+ throws InvalidSelectorException
{
if (trace) { log.trace("constructing consumer endpoint " + id); }
@@ -125,6 +132,10 @@
this.destination = dest;
+ this.dlq = dlq;
+
+ this.expiryQueue = expiryQueue;
+
//Always start as false - wait for consumer to initiate
this.clientAccepting = false;
@@ -164,6 +175,20 @@
return null;
}
+
+ if (ref.isExpired())
+ {
+ SimpleDelivery delivery = new SimpleDelivery(observer, ref);
+
+ try
+ {
+ sessionEndpoint.expireDelivery(delivery, expiryQueue);
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to expire delivery: " + delivery, t);
+ }
+ }
synchronized (startStopLock)
{
@@ -385,6 +410,16 @@
// Package protected ---------------------------------------------
+ Queue getDLQ()
+ {
+ return dlq;
+ }
+
+ Queue getExpiryQueue()
+ {
+ return expiryQueue;
+ }
+
void localClose() throws Throwable
{
if (trace) { log.trace(this + " grabbed the main lock in close() " + this); }
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -135,7 +135,8 @@
private PostOffice postOffice;
private int nodeId;
private int maxDeliveryAttempts;
- private Queue dlq;
+ private Queue defaultDLQ;
+ private Queue defaultExpiryQueue;
// Map < deliveryId, Delivery>
private Map deliveries;
@@ -166,9 +167,10 @@
consumers = new HashMap();
browsers = new HashMap();
- dlq = sp.getDLQ();
+ defaultDLQ = sp.getDefaultDLQInstance();
+ defaultExpiryQueue = sp.getDefaultExpiryQueueInstance();
tr = sp.getTxRepository();
- maxDeliveryAttempts = sp.getMaxDeliveryAttempts();
+ maxDeliveryAttempts = sp.getDefaultMaxDeliveryAttempts();
deliveries = new ConcurrentHashMap();
@@ -371,7 +373,7 @@
throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledgeBatch");
}
}
-
+
public void cancelDelivery(Cancel cancel) throws JMSException
{
if (trace) {log.trace(this + " cancelDelivery " + cancel); }
@@ -386,31 +388,30 @@
catch (Throwable t)
{
throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDelivery");
- }
-
- }
+ }
+ }
public void cancelDeliveries(List cancels) throws JMSException
{
if (trace) {log.trace(this + " cancelDeliveries " + cancels); }
-
+
try
{
// deliveries must be cancelled in reverse order
Set channels = new HashSet();
-
+
for (int i = cancels.size() - 1; i >= 0; i--)
{
Cancel cancel = (Cancel)cancels.get(i);
if (trace) { log.trace(this + " cancelling delivery " + cancel.getDeliveryId()); }
-
+
Delivery del = cancelDeliveryInternal(cancel);
channels.add(del.getObserver());
}
-
+
// need to prompt delivery for all affected channels
promptDelivery(channels);
@@ -431,8 +432,6 @@
throw new IllegalStateException("Recovering deliveries but post office is not clustered!");
}
- ClusteredPostOffice po = (ClusteredPostOffice)postOffice;
-
long maxDeliveryId = 0;
//Sort into different list for each channel
@@ -465,7 +464,7 @@
Long channelId = (Long)entry.getKey();
//Look up channel
- Binding binding = po.getBindingforChannelId(channelId.longValue());
+ Binding binding = postOffice.getBindingforChannelId(channelId.longValue());
if (binding == null)
{
@@ -703,7 +702,23 @@
}
// Package protected ---------------------------------------------
+
+ void expireDelivery(Delivery del, Queue expiryQueue) throws Throwable
+ {
+ if (trace) { log.trace("Reference has expired: " + del.getReference()); }
+ if (expiryQueue != null)
+ {
+ if (trace) { log.trace("Sending to expiry queue"); }
+
+ moveInTransaction(del, expiryQueue);
+ }
+ else
+ {
+ log.warn("No expiry queue has been configured so removing the reference");
+ }
+ }
+
void cancelDeliveriesForConsumerAfterDeliveryId(int consumerId, long lastDeliveryId) throws Throwable
{
//Need to cancel in reverse
@@ -921,20 +936,6 @@
// Private -------------------------------------------------------
- private void acknowledgeDeliveryInternal(Ack ack) throws Throwable
- {
- if (trace) { log.trace(this + " acknowledging delivery " + ack.getDeliveryId()); }
-
- DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryId()));
-
- if (rec == null)
- {
- throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
- }
-
- rec.del.acknowledge(null);
- }
-
private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
{
DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
@@ -943,51 +944,112 @@
{
throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
}
-
- if (cancel.getDeliveryCount() >= maxDeliveryAttempts)
+
+ //Note we check the flag *and* evaluate again, this is because the server and client clocks may
+ //be out of synch and don't want to send back to the client a message it thought it has sent to
+ //the expiry queue
+ boolean expired = cancel.isExpired() || rec.del.getReference().isExpired();
+
+ //Note we check the flag *and* evaluate again, this is because the server value of maxDeliveries
+ //might get changed after the client has sent the cancel - and we don't want to end up cancelling
+ //back to the original queue
+ boolean reachedMaxDeliveryAttempts =
+ cancel.isReachedMaxDeliveryAttempts() || cancel.getDeliveryCount() >= maxDeliveryAttempts;
+
+ if (!expired && !reachedMaxDeliveryAttempts)
{
- //Send to DLQ
+ //Normal cancel back to the queue
- //We do this in a tx so we don't end up with the message in both the original queue
- //and the dlq if it fails half way through
- Transaction tx = tr.createTransaction();
+ rec.del.getReference().setDeliveryCount(cancel.getDeliveryCount());
- try
- {
- if (dlq != null)
- {
- //reset delivery count to zero
- rec.del.getReference().setDeliveryCount(0);
-
- dlq.handle(null, rec.del.getReference(), tx);
-
- rec.del.acknowledge(tx);
- }
- else
- {
- log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
-
- rec.del.acknowledge(tx);
- }
-
- tx.commit();
+ rec.del.cancel();
+ }
+ else
+ {
+ ServerConsumerEndpoint consumer = null;
+
+ synchronized (consumers)
+ {
+ consumer = (ServerConsumerEndpoint)consumers.get(new Integer(rec.consumerId));
}
- catch (Throwable t)
+
+ if (consumer == null)
{
- tx.rollback();
+ throw new IllegalStateException("Cannot find consumer with id " + rec.consumerId);
+ }
+
+ if (expired)
+ {
+ //Sent to expiry queue
- throw t;
- }
+ this.moveInTransaction(rec.del, consumer.getExpiryQueue());
+ }
+ else
+ {
+ //Send to DLQ
+
+ this.moveInTransaction(rec.del, consumer.getDLQ());
+ }
+ }
+
+ return rec.del;
+ }
+
+ private void moveInTransaction(Delivery del, Queue queue) throws Throwable
+ {
+ Transaction tx = tr.createTransaction();
+
+ try
+ {
+ if (queue != null)
+ {
+ //Need to reset expiration and delivery account
+ del.getReference().setExpiration(0);
+ del.getReference().getMessage().setExpiration(0);
+ del.getReference().setDeliveryCount(0);
+
+ queue.handle(null, del.getReference(), tx);
+
+ del.acknowledge(tx);
+ }
+ else
+ {
+ log.warn("Cannot move to destination since destination has not been deployed! The message will be removed");
+
+ del.acknowledge(tx);
+ }
+
+ tx.commit();
+
+ if (queue != null)
+ {
+ queue.deliver(false);
+ }
}
- else
- {
- rec.del.getReference().setDeliveryCount(cancel.getDeliveryCount());
+ catch (Throwable t)
+ {
+ tx.rollback();
- rec.del.cancel();
+ throw t;
+ }
+ }
+
+
+ private void acknowledgeDeliveryInternal(Ack ack) throws Throwable
+ {
+ if (trace) { log.trace(this + " acknowledging delivery " + ack.getDeliveryId()); }
+
+ DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryId()));
+
+ if (rec == null)
+ {
+ throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
}
- return rec.del;
- }
+ rec.del.acknowledge(null);
+ }
+
+
private ConsumerDelegate failoverConsumer(JBossDestination jmsDestination,
String selectorString,
@@ -1023,10 +1085,22 @@
int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
int prefetchSize = connectionEndpoint.getPrefetchSize();
+ ManagedDestination dest =
+ sp.getDestinationManager().getDestination(jmsDestination.getName(), jmsDestination.isQueue());
+
+ if (dest == null)
+ {
+ throw new IllegalStateException("Cannot find managed destination for dest: " + jmsDestination);
+ }
+
+ Queue dlqToUse = dest.getDLQ() == null ? defaultDLQ : dest.getDLQ();
+
+ Queue expiryQueueToUse = dest.getExpiryQueue() == null ? defaultExpiryQueue : dest.getExpiryQueue();
+
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, binding.getQueue(),
binding.getQueue().getName(), this, selectorString, noLocal,
- jmsDestination);
+ jmsDestination, dlqToUse, expiryQueueToUse);
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
@@ -1295,10 +1369,14 @@
int prefetchSize = connectionEndpoint.getPrefetchSize();
+ Queue dlqToUse = mDest.getDLQ() == null ? defaultDLQ : mDest.getDLQ();
+
+ Queue expiryQueueToUse = mDest.getExpiryQueue() == null ? defaultExpiryQueue : mDest.getExpiryQueue();
+
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
binding.getQueue().getName(), this, selectorString, noLocal,
- jmsDestination);
+ jmsDestination, dlqToUse, expiryQueueToUse);
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
@@ -1342,7 +1420,7 @@
* In such a case we might otherwise end up with the consumer closing but not all it's deliveries being
* cancelled, which would mean they wouldn't be cancelled until the session is closed which is too late
*/
- private class DeliveryRecord
+ private static class DeliveryRecord
{
Delivery del;
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -285,6 +285,10 @@
dos.writeInt(cancel.getDeliveryCount());
+ dos.writeBoolean(cancel.isExpired());
+
+ dos.writeBoolean(cancel.isReachedMaxDeliveryAttempts());
+
dos.flush();
if (trace) { log.trace("wrote cancelDelivery()"); }
@@ -308,6 +312,10 @@
dos.writeLong(cancel.getDeliveryId());
dos.writeInt(cancel.getDeliveryCount());
+
+ dos.writeBoolean(cancel.isExpired());
+
+ dos.writeBoolean(cancel.isReachedMaxDeliveryAttempts());
}
dos.flush();
@@ -723,7 +731,11 @@
int deliveryCount = dis.readInt();
- Object[] args = new Object[] {new DefaultCancel(deliveryId, deliveryCount)};
+ boolean expired = dis.readBoolean();
+
+ boolean reachedMaxDeliveries = dis.readBoolean();
+
+ Object[] args = new Object[] {new DefaultCancel(deliveryId, deliveryCount, expired, reachedMaxDeliveries)};
mi.setArguments(args);
@@ -749,7 +761,11 @@
int deliveryCount = dis.readInt();
- DefaultCancel cancel = new DefaultCancel(deliveryId, deliveryCount);
+ boolean expired = dis.readBoolean();
+
+ boolean reachedMaxDeliveries = dis.readBoolean();
+
+ DefaultCancel cancel = new DefaultCancel(deliveryId, deliveryCount, expired, reachedMaxDeliveries);
acks.add(cancel);
}
Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -48,7 +48,6 @@
private static final Logger log = Logger.getLogger(ClientTransaction.class);
-
public final static byte TX_OPEN = 0;
public final static byte TX_ENDED = 1;
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -519,54 +519,45 @@
{
ref = nextReference(iter);
}
+
if (ref != null)
{
- // Check if message is expired (we also do this on the clientside) If so ack it from the channel
- if (ref.isExpired())
+ // Attempt to push the ref to a receiver
+
+ if (trace) { log.trace(this + " pushing " + ref); }
+
+ Delivery del = router.handle(this, ref, null);
+
+ receiversReady = del != null;
+
+ if (del == null)
{
- expireRef(ref, iter);
+ // No receiver, broken receiver or full receiver so we stop delivering
+ if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
+
+ break;
}
- else
+ else if (!del.isSelectorAccepted())
{
- // Reference is not expired
-
- // Attempt to push the ref to a receiver
+ // No receiver accepted the message because no selectors matched, so we create
+ // an iterator (if we haven't already created it) to iterate through the refs
+ // in the channel. No delivery was really performed
- if (trace) { log.trace(this + " pushing " + ref); }
-
- Delivery del = router.handle(this, ref, null);
-
- receiversReady = del != null;
-
- if (del == null)
+ if (iter == null)
{
- // No receiver, broken receiver or full receiver so we stop delivering
- if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
-
- break;
- }
- else if (!del.isSelectorAccepted())
- {
- // No receiver accepted the message because no selectors matched, so we create
- // an iterator (if we haven't already created it) to iterate through the refs
- // in the channel. No delivery was really performed
-
- if (iter == null)
- {
- iter = messageRefs.iterator();
- }
- }
- else
- {
- if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
-
- // Receiver accepted the reference
-
- removeReference(iter);
-
- deliveringCount.increment();
- }
+ iter = messageRefs.iterator();
+ }
}
+ else
+ {
+ if (trace) { log.trace(this + ": " + del + " returned for message:" + ref); }
+
+ // Receiver accepted the reference
+
+ removeReference(iter);
+
+ deliveringCount.increment();
+ }
}
else
{
@@ -783,28 +774,6 @@
// Private -------------------------------------------------------
- private void expireRef(MessageReference ref, ListIterator iter) throws Exception
- {
- if (trace) { log.trace("Message reference: " + ref + " has expired"); }
-
- // remove and acknowledge it
- synchronized (refLock)
- {
- if (iter == null)
- {
- removeFirstInMemory();
- }
- else
- {
- iter.remove();
- }
- }
-
- Delivery delivery = new SimpleDelivery(this, ref, true);
-
- acknowledgeInternal(delivery, null, true, false);
- }
-
private void removeReference(ListIterator iter) throws Exception
{
synchronized (refLock)
Modified: trunk/src/main/org/jboss/messaging/core/Routable.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Routable.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/messaging/core/Routable.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -72,6 +72,8 @@
long getExpiration();
boolean isExpired();
+
+ void setExpiration(long expiration);
/**
* @return the time (in GMT milliseconds) when this routable was delivered to the provider.
Modified: trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/src/main/org/jboss/messaging/core/message/RoutableSupport.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -147,6 +147,11 @@
{
return expiration;
}
+
+ public void setExpiration(long expiration)
+ {
+ this.expiration = expiration;
+ }
public long getTimestamp()
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -31,6 +31,7 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.naming.NameNotFoundException;
@@ -72,13 +73,30 @@
{
if (ServerManagement.isRemote())
{
- //This test can only run in local mode
return;
}
-
+
ServerManagement.deployQueue("DLQ");
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ ObjectName dlqObjectName = (ObjectName)ServerManagement.getAttribute(serverPeerObjectName, "DefaultDLQ");
+
+ assertNotNull(dlqObjectName);
+
+ String name = (String)ServerManagement.getAttribute(dlqObjectName, "Name");
+
+ assertNotNull(name);
+
+ assertEquals("DLQ", name);
- org.jboss.messaging.core.Queue dlq = ServerManagement.getServer().getServerPeer().getDLQ();
+ String jndiName = (String)ServerManagement.getAttribute(dlqObjectName, "JNDIName");
+
+ assertNotNull(jndiName);
+
+ assertEquals("/queue/DLQ", jndiName);
+
+ org.jboss.messaging.core.Queue dlq = ServerManagement.getServer().getServerPeer().getDefaultDLQInstance();
assertNotNull(dlq);
@@ -98,9 +116,8 @@
{
if (ic != null) ic.close();
- log.info("undeploying dlq");
ServerManagement.undeployQueue("DLQ");
- log.info("undeployed dlq");
+
}
}
@@ -108,12 +125,11 @@
{
if (ServerManagement.isRemote())
{
- //This test can only run in local mode
return;
}
+
+ org.jboss.messaging.core.Queue dlq = ServerManagement.getServer().getServerPeer().getDefaultDLQInstance();
- org.jboss.messaging.core.Queue dlq = ServerManagement.getServer().getServerPeer().getDLQ();
-
assertNull(dlq);
InitialContext ic = null;
@@ -138,272 +154,517 @@
if (ic != null) ic.close();
}
}
-//
-// public void testSendToDLQWithMessageListenerPersistent() throws Exception
-// {
-// sendToDLQWithMessageListener(true);
-// }
-//
-// public void testSendToDLQWithMessageListenerNonPersistent() throws Exception
-// {
-// sendToDLQWithMessageListener(false);
-// }
-//
-// public void testSendToDLQWithReceivePersistent() throws Exception
-// {
-// sendToDLQWithReceive(true);
-// }
-//
-// public void testSendToDLQWithReceiveNonPersistent() throws Exception
-// {
-// sendToDLQWithReceive(false);
-// }
-//
-// public void testSendToDLQWithReceivePartialPersistent() throws Exception
-// {
-// sendToDLQWithReceivePartial(true);
-// }
-//
-// public void testSendToDLQWithReceivePartialNonPersistent() throws Exception
-// {
-// sendToDLQWithReceivePartial(false);
-// }
-//
-// public void sendToDLQWithMessageListener(boolean persistent) throws Exception
-// {
-// Connection conn = null;
-//
-// ServerManagement.deployQueue("DLQ");
-//
-// Queue dlq = (Queue)ic.lookup("/queue/DLQ");
-//
-// try
-// {
-// conn = cf.createConnection();
-//
-// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue);
-//
-// prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-//
-// for (int i = 0; i < 10; i++)
-// {
-// TextMessage tm = sess.createTextMessage("Message:" + i);
-//
-// prod.send(tm);
-// }
-//
-// MessageConsumer cons = sess.createConsumer(queue);
-//
-// cons.setMessageListener(new FailingMessageListener());
-//
-// conn.start();
-//
-// Thread.sleep(4000);
-//
-// cons.setMessageListener(null);
-//
-// Message m = cons.receive(1000);
-//
-// assertNull(m);
-//
-// //Message should all be in the dlq - let's check
-//
-// MessageConsumer cons2 = sess.createConsumer(dlq);
-//
-// for (int i = 0; i < 10; i++)
-// {
-// TextMessage tm = (TextMessage)cons2.receive(1000);
-//
-// assertNotNull(tm);
-//
-// assertEquals("Message:" + i, tm.getText());
-// }
-//
-// }
-// finally
-// {
-// ServerManagement.undeployQueue("DLQ");
-//
-// if (conn != null) conn.close();
-// }
-// }
-//
-// public void sendToDLQWithReceive(boolean persistent) throws Exception
-// {
-// Connection conn = null;
-//
-// ServerManagement.deployQueue("DLQ");
-//
-// Queue dlq = (Queue)ic.lookup("/queue/DLQ");
-//
-// try
-// {
-// conn = cf.createConnection();
-//
-// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue);
-//
-// prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-//
-// for (int i = 0; i < 10; i++)
-// {
-// TextMessage tm = sess.createTextMessage("Message:" + i);
-//
-// prod.send(tm);
-// }
-//
-// Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-//
-// MessageConsumer cons = sess2.createConsumer(queue);
-//
-// conn.start();
-//
-// for (int i = 0; i < 10; i++) // retries - default is 10
-// {
-// for (int j = 0; j < 10; j++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(1000);
-//
-// assertNotNull(tm);
-//
-// assertEquals("Message:" + j, tm.getText());
-// }
-//
-// //rollback should cause redelivery
-// sess2.rollback();
-// }
-//
-// cons.close();
-//
-// MessageConsumer cons2 = sess2.createConsumer(queue);
-//
-// Message m = cons2.receive(1000);
-//
-// assertNull(m);
-//
-// //Message should all be in the dlq - let's check
-//
-// MessageConsumer cons3 = sess.createConsumer(dlq);
-//
-// for (int i = 0; i < 10; i++)
-// {
-// TextMessage tm = (TextMessage)cons3.receive(1000);
-//
-// assertNotNull(tm);
-//
-// assertEquals("Message:" + i, tm.getText());
-// }
-//
-// }
-// finally
-// {
-// ServerManagement.undeployQueue("DLQ");
-//
-// if (conn != null) conn.close();
-// }
-// }
-//
-// public void sendToDLQWithReceivePartial(boolean persistent) throws Exception
-// {
-// Connection conn = null;
-//
-// ServerManagement.deployQueue("DLQ");
-//
-// Queue dlq = (Queue)ic.lookup("/queue/DLQ");
-//
-// try
-// {
-// conn = cf.createConnection();
-//
-// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue);
-//
-// prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-//
-// for (int i = 0; i < 10; i++)
-// {
-// TextMessage tm = sess.createTextMessage("Message:" + i);
-//
-// prod.send(tm);
-// }
-//
-// Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-//
-// MessageConsumer cons = sess2.createConsumer(queue);
-//
-// conn.start();
-//
-// for (int i = 0; i < 5; i++) // retries - default is 10
-// {
-// for (int j = 0; j < 10; j++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(1000);
-//
-// assertNotNull(tm);
-//
-// assertEquals("Message:" + j, tm.getText());
-// }
-//
-// //rollback should cause redelivery
-// sess2.rollback();
-// }
-//
-// //They should now be cancelled back to the server
-// cons.close();
-//
-// cons = sess2.createConsumer(queue);
-//
-// for (int i = 0; i < 5; i++) // retries - default is 10
-// {
-// for (int j = 0; j < 10; j++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(1000);
-//
-// assertNotNull(tm);
-//
-// assertEquals("Message:" + j, tm.getText());
-// }
-//
-// //rollback should cause redelivery
-// sess2.rollback();
-// }
-//
-// cons.close();
-//
-// //Now they should be in DLQ
-//
-// MessageConsumer cons2 = sess2.createConsumer(queue);
-//
-// Message m = cons2.receive(1000);
-//
-// assertNull(m);
-//
-// //Message should all be in the dlq - let's check
-//
-// MessageConsumer cons3 = sess.createConsumer(dlq);
-//
-// for (int i = 0; i < 10; i++)
-// {
-// TextMessage tm = (TextMessage)cons3.receive(1000);
-//
-// assertNotNull(tm);
-//
-// assertEquals("Message:" + i, tm.getText());
-// }
-//
-// }
-// finally
-// {
-// ServerManagement.undeployQueue("DLQ");
-//
-// if (conn != null) conn.close();
-// }
-// }
+
+ public void testDefaultAndOverrideDLQ() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ return;
+ }
+
+ final int NUM_MESSAGES = 5;
+
+ final int MAX_DELIVERIES = 8;
+
+ ServerManagement.deployQueue("DefaultDLQ");
+
+ ServerManagement.deployQueue("OverrideDLQ");
+
+ ServerManagement.deployQueue("TestQueue");
+
+ String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=DefaultDLQ";
+
+ String overrideDLQObjectName = "jboss.messaging.destination:service=Queue,name=OverrideDLQ";
+
+ String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=TestQueue";
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+ Queue testQueue = (Queue)ic.lookup("/queue/TestQueue");
+
+ Queue defaultDLQ = (Queue)ic.lookup("/queue/DefaultDLQ");
+
+ Queue overrideDLQ = (Queue)ic.lookup("/queue/OverrideDLQ");
+
+ drainDestination(cf, testQueue);
+
+ drainDestination(cf, defaultDLQ);
+
+ drainDestination(cf, overrideDLQ);
+
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ {
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(testQueue);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(testQueue);
+
+ conn.start();
+
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ for (int j = 0; j < NUM_MESSAGES; j++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + j, tm.getText());
+ }
+
+ sess2.recover();
+ }
+
+ //At this point all the messages have been delivered exactly MAX_DELIVERIES times
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Now should be in default dlq
+
+ MessageConsumer cons3 = sess.createConsumer(defaultDLQ);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ conn.close();
+ }
+
+
+ {
+ //Now try with overriding the default dlq
+
+ conn = cf.createConnection();
+
+ log.info("I am setting dlq with " + overrideDLQObjectName);
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", overrideDLQObjectName);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(testQueue);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(testQueue);
+
+ conn.start();
+
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ for (int j = 0; j < NUM_MESSAGES; j++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + j, tm.getText());
+ }
+
+ sess2.recover();
+ }
+
+ //At this point all the messages have been delivered exactly MAX_DELIVERIES times
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Now should be in override dlq
+
+ MessageConsumer cons3 = sess.createConsumer(overrideDLQ);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+ }
+ }
+ finally
+ {
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=DLQ");
+
+ ServerManagement.undeployQueue("DefaultDLQ");
+
+ ServerManagement.undeployQueue("OverrideDLQ");
+
+ ServerManagement.undeployQueue("TestQueue");
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testWithMessageListenerPersistent() throws Exception
+ {
+ testWithMessageListener(true);
+ }
+
+ public void testWithMessageListenerNonPersistent() throws Exception
+ {
+ testWithMessageListener(false);
+ }
+
+ public void testWithReceiveClientAckPersistent() throws Exception
+ {
+ this.testWithReceiveClientAck(true);
+ }
+
+ public void testWithReceiveClientAckNonPersistent() throws Exception
+ {
+ testWithReceiveClientAck(false);
+ }
+
+ public void testWithReceiveTransactionalPersistent() throws Exception
+ {
+ this.testWithReceiveTransactional(true);
+ }
+
+ public void testWithReceiveTransactionalNonPersistent() throws Exception
+ {
+ testWithReceiveTransactional(false);
+ }
+
+ public void testWithMessageListener(boolean persistent) throws Exception
+ {
+ Connection conn = null;
+
+ ServerManagement.deployQueue("DLQ");
+
+ Queue dlq = (Queue)ic.lookup("/queue/DLQ");
+
+ drainDestination(cf, dlq);
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ final int MAX_DELIVERIES = 16;
+
+ final int NUM_MESSAGES = 5;
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+ int maxRedeliveryAttempts =
+ ((Integer)ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+ assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ FailingMessageListener listener = new FailingMessageListener();
+
+ cons.setMessageListener(listener);
+
+ conn.start();
+
+ Thread.sleep(4000);
+
+ cons.setMessageListener(null);
+
+ assertEquals(MAX_DELIVERIES * NUM_MESSAGES, listener.deliveryCount);
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Message should all be in the dlq - let's check
+
+ MessageConsumer cons2 = sess.createConsumer(dlq);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ }
+ finally
+ {
+ ServerManagement.undeployQueue("DLQ");
+
+ if (conn != null) conn.close();
+ }
+ }
+
+
+ public void testWithReceiveClientAck(boolean persistent) throws Exception
+ {
+ Connection conn = null;
+
+ ServerManagement.deployQueue("DLQ");
+
+ Queue dlq = (Queue)ic.lookup("/queue/DLQ");
+
+ drainDestination(cf, dlq);
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ final int MAX_DELIVERIES = 16;
+
+ final int NUM_MESSAGES = 5;
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+ int maxRedeliveryAttempts =
+ ((Integer)ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+ assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(queue);
+
+ conn.start();
+
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ for (int j = 0; j < NUM_MESSAGES; j++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + j, tm.getText());
+ }
+
+ sess2.recover();
+ }
+
+ //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
+ //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
+
+ MessageConsumer cons3 = sess.createConsumer(dlq);
+
+ Message m = cons3.receive(1000);
+
+ assertNull(m);
+
+ //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
+ //delivery attempts
+
+ m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Now, all the messages should now be in the DLQ
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ //No more should be available
+
+ m = cons.receive(1000);
+
+ assertNull(m);
+
+ cons.close();
+
+ MessageConsumer cons2 = sess2.createConsumer(queue);
+
+ m = cons2.receive(1000);
+
+ assertNull(m);
+ }
+ finally
+ {
+ ServerManagement.undeployQueue("DLQ");
+
+ if (conn != null) conn.close();
+ }
+ }
+
+ public void testWithReceiveTransactional(boolean persistent) throws Exception
+ {
+ Connection conn = null;
+
+ ServerManagement.deployQueue("DLQ");
+
+ Queue dlq = (Queue)ic.lookup("/queue/DLQ");
+
+ drainDestination(cf, dlq);
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ final int MAX_DELIVERIES = 16;
+
+ final int NUM_MESSAGES = 5;
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+ int maxRedeliveryAttempts =
+ ((Integer)ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+ assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer cons = sess2.createConsumer(queue);
+
+ conn.start();
+
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ for (int j = 0; j < NUM_MESSAGES; j++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + j, tm.getText());
+ }
+
+ sess2.rollback();
+ }
+
+ //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
+ //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
+
+ MessageConsumer cons3 = sess.createConsumer(dlq);
+
+ Message m = cons3.receive(1000);
+
+ assertNull(m);
+
+ //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
+ //delivery attempts
+ m = cons.receive(1000);
+
+ assertNull(m);
+
+ //All the messages should now be in the DLQ
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ //No more should be available
+
+ m = cons.receive(1000);
+
+ assertNull(m);
+
+ cons.close();
+
+ MessageConsumer cons2 = sess2.createConsumer(queue);
+
+ m = cons2.receive(1000);
+
+ assertNull(m);
+ }
+ finally
+ {
+ ServerManagement.undeployQueue("DLQ");
+
+ if (conn != null) conn.close();
+ }
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -439,9 +700,12 @@
class FailingMessageListener implements MessageListener
{
+ volatile int deliveryCount;
public void onMessage(Message msg)
{
+ deliveryCount++;
+
throw new RuntimeException("Your mum!");
}
Added: trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -0,0 +1,631 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+
+import org.jboss.jms.destination.JBossQueue;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ *
+ * A ExpiryQueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ExpiryQueueTest extends MessagingTestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected InitialContext ic;
+ protected ConnectionFactory cf;
+ protected Queue queue;
+ protected Topic topic;
+
+ // Constructors --------------------------------------------------
+
+ public ExpiryQueueTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void testExpiryQueueAlreadyDeployed() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ return;
+ }
+ ServerManagement.deployQueue("ExpiryQueue");
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ ObjectName expiryQueueObjectName = (ObjectName)ServerManagement.getAttribute(serverPeerObjectName, "DefaultExpiryQueue");
+
+ assertNotNull(expiryQueueObjectName);
+
+ String name = (String)ServerManagement.getAttribute(expiryQueueObjectName, "Name");
+
+ assertNotNull(name);
+
+ assertEquals("ExpiryQueue", name);
+
+ String jndiName = (String)ServerManagement.getAttribute(expiryQueueObjectName, "JNDIName");
+
+ assertNotNull(jndiName);
+
+ assertEquals("/queue/ExpiryQueue", jndiName);
+
+ org.jboss.messaging.core.Queue expiryQueue = ServerManagement.getServer().getServerPeer().getDefaultExpiryQueueInstance();
+
+ assertNotNull(expiryQueue);
+
+ InitialContext ic = null;
+
+ try
+ {
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+ JBossQueue q = (JBossQueue)ic.lookup("/queue/ExpiryQueue");
+
+ assertNotNull(q);
+
+ assertEquals("ExpiryQueue", q.getName());
+ }
+ finally
+ {
+ if (ic != null) ic.close();
+
+ ServerManagement.undeployQueue("ExpiryQueue");
+
+ }
+ }
+
+ public void testExpiryQueueNotAlreadyDeployed() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ return;
+ }
+
+ org.jboss.messaging.core.Queue expiryQueue = ServerManagement.getServer().getServerPeer().getDefaultExpiryQueueInstance();
+
+ assertNull(expiryQueue);
+
+ InitialContext ic = null;
+
+ try
+ {
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+ try
+ {
+ ic.lookup("/queue/ExpiryQueue");
+
+ fail();
+ }
+ catch (NameNotFoundException e)
+ {
+ //Ok
+ }
+ }
+ finally
+ {
+ if (ic != null) ic.close();
+ }
+ }
+
+ public void testDefaultAndOverrideExpiryQueue() throws Exception
+ {
+ final int NUM_MESSAGES = 5;
+
+ ServerManagement.deployQueue("DefaultExpiry");
+
+ ServerManagement.deployQueue("OverrideExpiry");
+
+ ServerManagement.deployQueue("TestQueue");
+
+ String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=DefaultExpiry";
+
+ String overrideExpiryObjectName = "jboss.messaging.destination:service=Queue,name=OverrideExpiry";
+
+ String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=TestQueue";
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
+
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", "");
+
+ Queue testQueue = (Queue)ic.lookup("/queue/TestQueue");
+
+ Queue defaultExpiry = (Queue)ic.lookup("/queue/DefaultExpiry");
+
+ Queue overrideExpiry = (Queue)ic.lookup("/queue/OverrideExpiry");
+
+ drainDestination(cf, testQueue);
+
+ drainDestination(cf, defaultExpiry);
+
+ drainDestination(cf, overrideExpiry);
+
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ {
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(testQueue);
+
+ conn.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ //Send messages with time to live of 2000 enough time to get to client consumer - so
+ //they won't be expired on the server side
+ prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(testQueue);
+
+ //The messages should now be sitting in the consumer buffer
+
+ //Now give them enough time to expire
+
+ Thread.sleep(2500);
+
+ //Now try and receive
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Message should all be in the default expiry queue - let's check
+
+ MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ conn.close();
+ }
+
+
+ //now try with overriding the default expiry queue
+ {
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", overrideExpiryObjectName);
+
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(testQueue);
+
+ conn.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ //Send messages with time to live of 2000 enough time to get to client consumer - so
+ //they won't be expired on the server side
+ prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(testQueue);
+
+ //The messages should now be sitting in the consumer buffer
+
+ //Now give them enough time to expire
+
+ Thread.sleep(2500);
+
+ //Now try and receive
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Message should all be in the default expiry queue - let's check
+
+ MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+ }
+ }
+ finally
+ {
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
+
+ ServerManagement.undeployQueue("DefaultDLQ");
+
+ ServerManagement.undeployQueue("OverrideDLQ");
+
+ ServerManagement.undeployQueue("TestQueue");
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testExpireSameMessagesMultiple() throws Exception
+ {
+ final int NUM_MESSAGES = 5;
+
+ ServerManagement.deployQueue("ExpiryQueue");
+
+ String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=ExpiryQueue";
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
+
+ Queue defaultExpiry = (Queue)ic.lookup("/queue/ExpiryQueue");
+
+ drainDestination(cf, defaultExpiry);
+
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.setClientID("wib1");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(topic);
+
+ conn.start();
+
+ //Create 3 durable subscriptions
+
+ MessageConsumer sub1 = sess.createDurableSubscriber(topic, "sub1");
+
+ MessageConsumer sub2 = sess.createDurableSubscriber(topic, "sub2");
+
+ MessageConsumer sub3 = sess.createDurableSubscriber(topic, "sub3");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ //Send messages with time to live of 3000 enough time to get to client consumer - so
+ //they won't be expired on the server side
+ prod.send(tm, DeliveryMode.PERSISTENT, 4, 3000);
+ }
+
+ //Now sleep. This wil give them enough time to expire
+
+ Thread.sleep(3500);
+
+ //Now try and consume from each - this should force the message to the expiry queue
+
+ Message m = sub1.receive(500);
+ assertNull(m);
+
+ m = sub2.receive(500);
+ assertNull(m);
+
+ m = sub3.receive(500);
+ assertNull(m);
+
+ //Now the messages should all be in the expiry queue
+
+ MessageConsumer cons2 = sess.createConsumer(defaultExpiry);
+
+ while (true)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(500);
+
+ if (tm == null)
+ {
+ break;
+ }
+
+ log.info("Got message: " + tm.getText());
+ }
+
+ }
+ finally
+ {
+ ServerManagement.undeployQueue("ExpiryQueue");
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testWithMessageListenerPersistent() throws Exception
+ {
+ testWithMessageListener(true);
+ }
+
+ public void testWithMessageListenerNonPersistent() throws Exception
+ {
+ testWithMessageListener(false);
+ }
+
+ public void testWithReceivePersistent() throws Exception
+ {
+ this.testWithReceive(true);
+ }
+
+ public void testWithReceiveNonPersistent() throws Exception
+ {
+ testWithReceive(false);
+ }
+
+ public void testWithMessageListener(boolean persistent) throws Exception
+ {
+ Connection conn = null;
+
+ ServerManagement.deployQueue("ExpiryQueue");
+
+ Queue expiryQueue = (Queue)ic.lookup("/queue/ExpiryQueue");
+
+ drainDestination(cf, expiryQueue);
+
+ final int NUM_MESSAGES = 5;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ //Send messages with time to live of 2000 enough time to get to client consumer - so
+ //they won't be expired on the server side
+ prod.send(tm, deliveryMode, 4, 2000);
+ }
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ //The messages should now be sitting in the consumer buffer
+
+ //Now give them enough time to expire
+
+ Thread.sleep(2500);
+
+ //Now set a listener
+
+ FailingMessageListener listener = new FailingMessageListener();
+
+ cons.setMessageListener(listener);
+
+ Thread.sleep(1000);
+
+ cons.setMessageListener(null);
+
+ //No messages should have been received
+ assertEquals(0, listener.deliveryCount);
+
+ //Shouldn't be able to receive any more
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Message should all be in the expiry queue - let's check
+
+ MessageConsumer cons2 = sess.createConsumer(expiryQueue);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ }
+ finally
+ {
+ ServerManagement.undeployQueue("ExpiryQueue");
+
+ if (conn != null) conn.close();
+ }
+ }
+
+ public void testWithReceive(boolean persistent) throws Exception
+ {
+ Connection conn = null;
+
+ ServerManagement.deployQueue("ExpiryQueue");
+
+ Queue expiryQueue = (Queue)ic.lookup("/queue/ExpiryQueue");
+
+ drainDestination(cf, expiryQueue);
+
+ final int NUM_MESSAGES = 5;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue);
+
+ int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ //Send messages with time to live of 2000 enough time to get to client consumer - so
+ //they won't be expired on the server side
+ prod.send(tm, deliveryMode, 4, 2000);
+ }
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ //The messages should now be sitting in the consumer buffer
+
+ //Now give them enough time to expire
+
+ Thread.sleep(2500);
+
+ //Now try and receive
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Message should all be in the expiry queue - let's check
+
+ MessageConsumer cons2 = sess.createConsumer(expiryQueue);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ }
+ finally
+ {
+ ServerManagement.undeployQueue("ExpiryQueue");
+
+ if (conn != null) conn.close();
+ }
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ServerManagement.start("all");
+
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+ cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ ServerManagement.deployQueue("Queue");
+
+ ServerManagement.deployTopic("Topic");
+
+ queue = (Queue)ic.lookup("/queue/Queue");
+
+ topic = (Topic)ic.lookup("/topic/Topic");
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ ServerManagement.undeployQueue("Queue");
+
+ ServerManagement.undeployTopic("Topic");
+
+ if (ic != null) ic.close();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ class FailingMessageListener implements MessageListener
+ {
+ volatile int deliveryCount;
+
+ public void onMessage(Message msg)
+ {
+ deliveryCount++;
+
+ throw new RuntimeException("Your mum!");
+ }
+
+ }
+
+}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -3055,6 +3055,7 @@
Session sess1 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer cons1 = sess1.createConsumer(queue);
TextMessage tm2 = (TextMessage)cons1.receive(3000);
+
assertNotNull(tm2);
assertEquals("testRedeliveredDifferentSessions", tm2.getText());
@@ -3064,6 +3065,7 @@
Session sess2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer cons2 = sess2.createConsumer(queue);
TextMessage tm3 = (TextMessage)cons2.receive(3000);
+
assertNotNull(tm3);
assertEquals("testRedeliveredDifferentSessions", tm3.getText());
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-12-30 08:02:32 UTC (rev 1878)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-12-31 09:49:46 UTC (rev 1879)
@@ -486,7 +486,7 @@
int deliveryCount = 12;
- Cancel cancel = new DefaultCancel(deliveryID, deliveryCount);
+ Cancel cancel = new DefaultCancel(deliveryID, deliveryCount, true, true);
Object[] args = new Object[] { cancel };
@@ -530,9 +530,17 @@
//Then delivery count
int count = dis.readInt();
+ boolean expired = dis.readBoolean();
+
+ boolean reachedMaxDeliveries = dis.readBoolean();
+
assertEquals(deliveryID, l);
assertEquals(deliveryCount, count);
+
+ assertEquals(expired, true);
+
+ assertEquals(reachedMaxDeliveries, true);
//Now eos
try
@@ -567,6 +575,10 @@
assertEquals(deliveryCount, l2.getDeliveryCount());
+ assertEquals(expired, l2.isExpired());
+
+ assertEquals(reachedMaxDeliveries, l2.isReachedMaxDeliveryAttempts());
+
}
public void testCancelDeliveries() throws Exception
@@ -577,8 +589,8 @@
List cancels = new ArrayList();
- DefaultCancel cancel1 = new DefaultCancel(65654, 43);
- DefaultCancel cancel2 = new DefaultCancel(65765, 2);
+ DefaultCancel cancel1 = new DefaultCancel(65654, 43, true, false);
+ DefaultCancel cancel2 = new DefaultCancel(65765, 2, false, true);
cancels.add(cancel1);
cancels.add(cancel2);
@@ -629,19 +641,31 @@
//then the AckInfos
long deliveryId = dis.readLong();
int deliveryCount = dis.readInt();
- DefaultCancel rcancel1 = new DefaultCancel(deliveryId, deliveryCount);
+ boolean expired = dis.readBoolean();
+ boolean reachedMaxDeliveries = dis.readBoolean();
+ DefaultCancel rcancel1 = new DefaultCancel(deliveryId, deliveryCount, expired, reachedMaxDeliveries);
deliveryId = dis.readLong();
deliveryCount = dis.readInt();
- DefaultCancel rcancel2 = new DefaultCancel(deliveryId, deliveryCount);
+ expired = dis.readBoolean();
+ reachedMaxDeliveries = dis.readBoolean();
+ DefaultCancel rcancel2 = new DefaultCancel(deliveryId, deliveryCount, expired, reachedMaxDeliveries);
assertEquals(cancel1.getDeliveryCount(), rcancel1.getDeliveryCount());
assertEquals(cancel1.getDeliveryId(), cancel1.getDeliveryId());
+ assertEquals(cancel1.isExpired(), cancel1.isExpired());
+
+ assertEquals(cancel1.isReachedMaxDeliveryAttempts(), cancel1.isReachedMaxDeliveryAttempts());
+
assertEquals(cancel2.getDeliveryCount(), rcancel2.getDeliveryCount());
assertEquals(cancel2.getDeliveryId(), cancel2.getDeliveryId());
+
+ assertEquals(cancel2.isExpired(), cancel2.isExpired());
+
+ assertEquals(cancel2.isReachedMaxDeliveryAttempts(), cancel2.isReachedMaxDeliveryAttempts());
//should be eos
More information about the jboss-cvs-commits
mailing list