[jboss-cvs] JBoss Messaging SVN: r2760 - in trunk: src/main/org/jboss/jms/server/destination and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 6 04:17:48 EDT 2007
Author: sergeypk
Date: 2007-06-06 04:17:48 -0400 (Wed, 06 Jun 2007)
New Revision: 2760
Added:
trunk/tests/src/org/jboss/test/messaging/jms/MaxDeliveryAttemptsTest.java
Modified:
trunk/src/etc/xmdesc/Queue-xmbean.xml
trunk/src/etc/xmdesc/Topic-xmbean.xml
trunk/src/main/org/jboss/jms/server/destination/DestinationMBean.java
trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-938 - Implement MaxDeliveryAttempts on destinations
Modified: trunk/src/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/Queue-xmbean.xml 2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/etc/xmdesc/Queue-xmbean.xml 2007-06-06 08:17:48 UTC (rev 2760)
@@ -141,6 +141,12 @@
<type>int</type>
</attribute>
+ <attribute access="read-write" getMethod="getMaxDeliveryAttempts" setMethod="setMaxDeliveryAttempts">
+ <description>The maximum delivery attempts to the queue</description>
+ <name>MaxDeliveryAttempts</name>
+ <type>int</type>
+ </attribute>
+
<attribute access="read-only" getMethod="getConsumerCount">
<description>The number of consumers on the queue</description>
<name>ConsumerCount</name>
Modified: trunk/src/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/Topic-xmbean.xml 2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/etc/xmdesc/Topic-xmbean.xml 2007-06-06 08:17:48 UTC (rev 2760)
@@ -113,7 +113,13 @@
<description>The day limit for the message counters of this topic</description>
<name>MessageCounterHistoryDayLimit</name>
<type>int</type>
- </attribute>
+ </attribute>
+
+ <attribute access="read-write" getMethod="getMaxDeliveryAttempts" setMethod="setMaxDeliveryAttempts">
+ <description>The maximum delivery attempts to the topic</description>
+ <name>MaxDeliveryAttempts</name>
+ <type>int</type>
+ </attribute>
<attribute access="read-only" getMethod="getMessageCounters">
<description>The message counters for the topic</description>
Modified: trunk/src/main/org/jboss/jms/server/destination/DestinationMBean.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/DestinationMBean.java 2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/main/org/jboss/jms/server/destination/DestinationMBean.java 2007-06-06 08:17:48 UTC (rev 2760)
@@ -90,6 +90,10 @@
void setMessageCounterHistoryDayLimit(int limit) throws Exception;
+ int getMaxDeliveryAttempts();
+
+ void setMaxDeliveryAttempts(int maxDeliveryAttempts);
+
// JMX operations
void removeAllMessages() throws Exception;
Modified: trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2007-06-06 08:17:48 UTC (rev 2760)
@@ -405,6 +405,16 @@
destination.setMessageCounterHistoryDayLimit(limit);
}
+ public int getMaxDeliveryAttempts()
+ {
+ return destination.getMaxDeliveryAttempts();
+ }
+
+ public void setMaxDeliveryAttempts(int maxDeliveryAttempts)
+ {
+ destination.setMaxDeliveryAttempts(maxDeliveryAttempts);
+ }
+
// JMX managed operations ----------------------------------------
public abstract void removeAllMessages() throws Exception;
Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2007-06-06 08:17:48 UTC (rev 2760)
@@ -88,6 +88,8 @@
protected int maxSize = -1;
protected int messageCounterHistoryDayLimit = -1;
+
+ protected int maxDeliveryAttempts = -1;
public ManagedDestination()
{
@@ -285,6 +287,16 @@
{
this.messageCounterHistoryDayLimit = limit;
}
+
+ public int getMaxDeliveryAttempts()
+ {
+ return this.maxDeliveryAttempts;
+ }
+
+ public void setMaxDeliveryAttempts(int maxDeliveryAttempts)
+ {
+ this.maxDeliveryAttempts = maxDeliveryAttempts;
+ }
public abstract boolean isQueue();
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-06-06 08:17:48 UTC (rev 2760)
@@ -95,6 +95,8 @@
private Queue expiryQueue;
private long redeliveryDelay;
+
+ private int maxDeliveryAttempts;
private boolean started;
@@ -113,7 +115,7 @@
ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
ServerSessionEndpoint sessionEndpoint, String selector,
boolean noLocal, JBossDestination dest, Queue dlq,
- Queue expiryQueue, long redeliveryDelay)
+ Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts)
throws InvalidSelectorException
{
if (trace)
@@ -141,6 +143,8 @@
this.redeliveryDelay = redeliveryDelay;
this.expiryQueue = expiryQueue;
+
+ this.maxDeliveryAttempts = maxDeliveryAttempts;
// Always start as false - wait for consumer to initiate.
this.clientAccepting = false;
@@ -246,7 +250,7 @@
if (storeDeliveries)
{
- deliveryId = sessionEndpoint.addDelivery(delivery, id, dlq, expiryQueue, redeliveryDelay);
+ deliveryId = sessionEndpoint.addDelivery(delivery, id, dlq, expiryQueue, redeliveryDelay, maxDeliveryAttempts);
}
else
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-06-06 08:17:48 UTC (rev 2760)
@@ -143,7 +143,7 @@
private TransactionRepository tr;
private PostOffice postOffice;
private int nodeId;
- private int maxDeliveryAttempts;
+ private int defaultMaxDeliveryAttempts;
private Queue defaultDLQ;
private Queue defaultExpiryQueue;
@@ -180,7 +180,7 @@
defaultDLQ = sp.getDefaultDLQInstance();
defaultExpiryQueue = sp.getDefaultExpiryQueueInstance();
tr = sp.getTxRepository();
- maxDeliveryAttempts = sp.getDefaultMaxDeliveryAttempts();
+ defaultMaxDeliveryAttempts = sp.getDefaultMaxDeliveryAttempts();
deliveries = new ConcurrentHashMap();
@@ -465,6 +465,9 @@
Queue expiryQueueToUse =
dest.getExpiryQueue() == null ? defaultExpiryQueue : dest.getExpiryQueue();
+ int maxDeliveryAttemptsToUse =
+ dest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : dest.getMaxDeliveryAttempts();
+
List dels = queue.recoverDeliveries(ids);
Iterator iter2 = dels.iterator();
@@ -485,7 +488,7 @@
deliveries.put(new Long(deliveryId),
new DeliveryRecord(del, -1, dlqToUse,
- expiryQueueToUse, dest.getRedeliveryDelay()));
+ expiryQueueToUse, dest.getRedeliveryDelay(), maxDeliveryAttemptsToUse));
}
}
@@ -885,11 +888,11 @@
rec.del.cancel();
}
- long addDelivery(Delivery del, int consumerId, Queue dlq, Queue expiryQueue, long redeliveryDelay)
+ long addDelivery(Delivery del, int consumerId, Queue dlq, Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts)
{
long deliveryId = deliveryIdSequence.increment();
- deliveries.put(new Long(deliveryId), new DeliveryRecord(del, consumerId, dlq, expiryQueue, redeliveryDelay));
+ deliveries.put(new Long(deliveryId), new DeliveryRecord(del, consumerId, dlq, expiryQueue, redeliveryDelay, maxDeliveryAttempts));
if (trace) { log.trace(this + " added delivery " + deliveryId + ": " + del); }
@@ -994,7 +997,7 @@
//might get changed after the client has sent the cancel - and we don't want to end up cancelling
//back to the original queue
boolean reachedMaxDeliveryAttempts =
- cancel.isReachedMaxDeliveryAttempts() || cancel.getDeliveryCount() >= maxDeliveryAttempts;
+ cancel.isReachedMaxDeliveryAttempts() || cancel.getDeliveryCount() >= rec.maxDeliveryAttempts;
Delivery del = rec.del;
@@ -1440,6 +1443,8 @@
Queue expiryQueueToUse = mDest.getExpiryQueue() == null ? defaultExpiryQueue : mDest.getExpiryQueue();
+ int maxDeliveryAttemptsToUse = mDest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : mDest.getMaxDeliveryAttempts();
+
long redeliveryDelay = mDest.getRedeliveryDelay();
if (redeliveryDelay == 0)
@@ -1450,7 +1455,7 @@
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
binding.getQueue().getName(), this, selectorString, noLocal,
- jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelay);
+ jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelay, maxDeliveryAttemptsToUse);
ConsumerAdvised advised;
@@ -1465,7 +1470,7 @@
ClientConsumerDelegate stub =
new ClientConsumerDelegate(consumerID,
- prefetchSize, maxDeliveryAttempts);
+ prefetchSize, maxDeliveryAttemptsToUse);
synchronized (consumers)
{
@@ -1574,7 +1579,9 @@
long redeliveryDelay;
- DeliveryRecord(Delivery del, int consumerId, Queue dlq, Queue expiryQueue, long redeliveryDelay)
+ int maxDeliveryAttempts;
+
+ DeliveryRecord(Delivery del, int consumerId, Queue dlq, Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts)
{
this.del = del;
@@ -1585,6 +1592,8 @@
this.expiryQueue = expiryQueue;
this.redeliveryDelay = redeliveryDelay;
+
+ this.maxDeliveryAttempts = maxDeliveryAttempts;
}
}
Added: trunk/tests/src/org/jboss/test/messaging/jms/MaxDeliveryAttemptsTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MaxDeliveryAttemptsTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MaxDeliveryAttemptsTest.java 2007-06-06 08:17:48 UTC (rev 2760)
@@ -0,0 +1,202 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A test for MaxDeliveryAttempts destination setting.
+ *
+ * @author <a href="sergey.koshcheyev at jboss.com">Sergey Koshcheyev</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class MaxDeliveryAttemptsTest extends MessagingTestCase
+{
+ // Constants -----------------------------------------------------
+ private final String DLQ_NAME = "DLQ";
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected InitialContext ic;
+ protected ConnectionFactory cf;
+ protected Queue dlq;
+ protected int defaultMaxDeliveryAttempts;
+
+ // Constructors --------------------------------------------------
+
+ public MaxDeliveryAttemptsTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void testOverrideDefaultMaxDeliveryAttemptsForQueue() throws Exception
+ {
+ final String QUEUE_NAME = "Queue";
+
+ ServerManagement.deployQueue(QUEUE_NAME);
+
+ try
+ {
+ testMaxDeliveryAttempts(
+ "/queue/" + QUEUE_NAME,
+ new ObjectName("jboss.messaging.destination:service=Queue,name=" + QUEUE_NAME));
+ }
+ finally
+ {
+ ServerManagement.undeployQueue(QUEUE_NAME);
+ }
+ }
+
+ public void testOverrideDefaultMaxDeliveryAttemptsForTopic() throws Exception
+ {
+ final String TOPIC_NAME = "Topic";
+
+ ServerManagement.deployTopic(TOPIC_NAME);
+
+ try
+ {
+ testMaxDeliveryAttempts(
+ "/topic/" + TOPIC_NAME,
+ new ObjectName("jboss.messaging.destination:service=Topic,name=" + TOPIC_NAME));
+ }
+ finally
+ {
+ ServerManagement.undeployTopic(TOPIC_NAME);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void testMaxDeliveryAttempts(String destJndiName, ObjectName destObjectName) throws Exception
+ {
+ int destMaxDeliveryAttempts = defaultMaxDeliveryAttempts + 5;
+
+ Destination destination = (Destination) ic.lookup(destJndiName);
+
+ ServerManagement.setAttribute(destObjectName, "MaxDeliveryAttempts",
+ Integer.toString(destMaxDeliveryAttempts));
+
+ Connection conn = cf.createConnection();
+
+ try
+ {
+ // Create the consumer before the producer so that the message we send doesn't
+ // get lost if the destination is a Topic.
+ Session consumingSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer destinationConsumer = consumingSession.createConsumer(destination);
+
+ {
+ Session producingSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = producingSession.createProducer(destination);
+ TextMessage tm = producingSession.createTextMessage("Message");
+ prod.send(tm);
+ }
+
+ conn.start();
+
+ // Make delivery attempts up to the maximum. The message should not end up in the DLQ.
+ for (int i = 0; i < destMaxDeliveryAttempts; i++)
+ {
+ TextMessage tm = (TextMessage)destinationConsumer.receive(1000);
+ assertNotNull("No message received on delivery attempt number " + (i + 1), tm);
+ assertEquals("Message", tm.getText());
+ consumingSession.recover();
+ }
+
+ // At this point the message should not yet be in the DLQ
+ MessageConsumer dlqConsumer = consumingSession.createConsumer(dlq);
+ Message m = dlqConsumer.receive(1000);
+ assertNull(m);
+
+ // Now we try to consume the message again from the destination, which causes it
+ // to go to the DLQ instead.
+ m = destinationConsumer.receive(1000);
+ assertNull(m);
+
+ // The message should be in the DLQ now
+ m = dlqConsumer.receive(1000);
+ assertNotNull(m);
+ assertTrue(m instanceof TextMessage);
+ assertEquals("Message", ((TextMessage) m).getText());
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ServerManagement.start("all");
+ ServerManagement.deployQueue(DLQ_NAME);
+
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+ cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+ dlq = (Queue) ic.lookup("/queue/" + DLQ_NAME);
+
+ drainDestination(cf, dlq);
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+ defaultMaxDeliveryAttempts =
+ ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (ic != null) ic.close();
+
+ ServerManagement.undeployQueue(DLQ_NAME);
+ ServerManagement.stop();
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Property changes on: trunk/tests/src/org/jboss/test/messaging/jms/MaxDeliveryAttemptsTest.java
___________________________________________________________________
Name: svn:keywords
+ Id LastChangedDate Author Revision
Name: svn:eol-style
+ native
More information about the jboss-cvs-commits
mailing list