[jboss-cvs] JBoss Messaging SVN: r2760 - in trunk: src/main/org/jboss/jms/server/destination and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 6 04:17:48 EDT 2007


Author: sergeypk
Date: 2007-06-06 04:17:48 -0400 (Wed, 06 Jun 2007)
New Revision: 2760

Added:
   trunk/tests/src/org/jboss/test/messaging/jms/MaxDeliveryAttemptsTest.java
Modified:
   trunk/src/etc/xmdesc/Queue-xmbean.xml
   trunk/src/etc/xmdesc/Topic-xmbean.xml
   trunk/src/main/org/jboss/jms/server/destination/DestinationMBean.java
   trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
   trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-938 - Implement MaxDeliveryAttempts on destinations

Modified: trunk/src/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/Queue-xmbean.xml	2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/etc/xmdesc/Queue-xmbean.xml	2007-06-06 08:17:48 UTC (rev 2760)
@@ -141,6 +141,12 @@
       <type>int</type>
    </attribute>
    
+   <attribute access="read-write" getMethod="getMaxDeliveryAttempts" setMethod="setMaxDeliveryAttempts">
+      <description>The maximum delivery attempts to the queue</description>
+      <name>MaxDeliveryAttempts</name>
+      <type>int</type>
+   </attribute>
+   
    <attribute access="read-only" getMethod="getConsumerCount">
       <description>The number of consumers on the queue</description>
       <name>ConsumerCount</name>

Modified: trunk/src/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/Topic-xmbean.xml	2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/etc/xmdesc/Topic-xmbean.xml	2007-06-06 08:17:48 UTC (rev 2760)
@@ -113,7 +113,13 @@
       <description>The day limit for the message counters of this topic</description>
       <name>MessageCounterHistoryDayLimit</name>
       <type>int</type>
-   </attribute>   
+   </attribute>
+
+   <attribute access="read-write" getMethod="getMaxDeliveryAttempts" setMethod="setMaxDeliveryAttempts">
+      <description>The maximum delivery attempts to the topic</description>
+      <name>MaxDeliveryAttempts</name>
+      <type>int</type>
+   </attribute>
    
    <attribute access="read-only" getMethod="getMessageCounters">
       <description>The message counters for the topic</description>

Modified: trunk/src/main/org/jboss/jms/server/destination/DestinationMBean.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/DestinationMBean.java	2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/main/org/jboss/jms/server/destination/DestinationMBean.java	2007-06-06 08:17:48 UTC (rev 2760)
@@ -90,6 +90,10 @@
    
    void setMessageCounterHistoryDayLimit(int limit) throws Exception;
    
+   int getMaxDeliveryAttempts();
+   
+   void setMaxDeliveryAttempts(int maxDeliveryAttempts);
+   
    // JMX operations
    
    void removeAllMessages() throws Exception;

Modified: trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2007-06-06 08:17:48 UTC (rev 2760)
@@ -405,6 +405,16 @@
       destination.setMessageCounterHistoryDayLimit(limit);
    }
    
+   public int getMaxDeliveryAttempts()
+   {
+      return destination.getMaxDeliveryAttempts();
+   }
+   
+   public void setMaxDeliveryAttempts(int maxDeliveryAttempts)
+   {
+      destination.setMaxDeliveryAttempts(maxDeliveryAttempts);
+   }
+   
    // JMX managed operations ----------------------------------------
    
    public abstract void removeAllMessages() throws Exception;

Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java	2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedDestination.java	2007-06-06 08:17:48 UTC (rev 2760)
@@ -88,6 +88,8 @@
    protected int maxSize = -1;
    
    protected int messageCounterHistoryDayLimit = -1;
+   
+   protected int maxDeliveryAttempts = -1;
     
    public ManagedDestination()
    {      
@@ -285,6 +287,16 @@
    {
       this.messageCounterHistoryDayLimit = limit;
    }
+   
+   public int getMaxDeliveryAttempts()
+   {
+      return this.maxDeliveryAttempts;
+   }
+   
+   public void setMaxDeliveryAttempts(int maxDeliveryAttempts)
+   {
+      this.maxDeliveryAttempts = maxDeliveryAttempts;
+   }
      
    public abstract boolean isQueue();
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-06-06 08:17:48 UTC (rev 2760)
@@ -95,6 +95,8 @@
    private Queue expiryQueue;
 
    private long redeliveryDelay;
+   
+   private int maxDeliveryAttempts;
 
    private boolean started;
 
@@ -113,7 +115,7 @@
    ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
             ServerSessionEndpoint sessionEndpoint, String selector,
             boolean noLocal, JBossDestination dest, Queue dlq,
-            Queue expiryQueue, long redeliveryDelay)
+            Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts)
             throws InvalidSelectorException
    {
       if (trace)
@@ -141,6 +143,8 @@
       this.redeliveryDelay = redeliveryDelay;
 
       this.expiryQueue = expiryQueue;
+      
+      this.maxDeliveryAttempts = maxDeliveryAttempts;
 
       // Always start as false - wait for consumer to initiate.
       this.clientAccepting = false;
@@ -246,7 +250,7 @@
 
          if (storeDeliveries)
          {
-            deliveryId = sessionEndpoint.addDelivery(delivery, id, dlq, expiryQueue, redeliveryDelay);
+            deliveryId = sessionEndpoint.addDelivery(delivery, id, dlq, expiryQueue, redeliveryDelay, maxDeliveryAttempts);
          }
          else
          {

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-06-06 07:00:52 UTC (rev 2759)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-06-06 08:17:48 UTC (rev 2760)
@@ -143,7 +143,7 @@
    private TransactionRepository tr;
    private PostOffice postOffice;
    private int nodeId;
-   private int maxDeliveryAttempts;
+   private int defaultMaxDeliveryAttempts;
    private Queue defaultDLQ;
    private Queue defaultExpiryQueue;
    
@@ -180,7 +180,7 @@
       defaultDLQ = sp.getDefaultDLQInstance();
       defaultExpiryQueue = sp.getDefaultExpiryQueueInstance();
       tr = sp.getTxRepository();
-      maxDeliveryAttempts = sp.getDefaultMaxDeliveryAttempts();
+      defaultMaxDeliveryAttempts = sp.getDefaultMaxDeliveryAttempts();
       
       deliveries = new ConcurrentHashMap();
       
@@ -465,6 +465,9 @@
             Queue expiryQueueToUse =
                dest.getExpiryQueue() == null ? defaultExpiryQueue : dest.getExpiryQueue();
             
+            int maxDeliveryAttemptsToUse =
+               dest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : dest.getMaxDeliveryAttempts();
+            
             List dels = queue.recoverDeliveries(ids);
             
             Iterator iter2 = dels.iterator();
@@ -485,7 +488,7 @@
                
                deliveries.put(new Long(deliveryId),
                               new DeliveryRecord(del, -1, dlqToUse,
-                                                 expiryQueueToUse, dest.getRedeliveryDelay()));
+                                                 expiryQueueToUse, dest.getRedeliveryDelay(), maxDeliveryAttemptsToUse));
             }
          }
          
@@ -885,11 +888,11 @@
       rec.del.cancel();
    }
    
-   long addDelivery(Delivery del, int consumerId, Queue dlq, Queue expiryQueue, long redeliveryDelay)
+   long addDelivery(Delivery del, int consumerId, Queue dlq, Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts)
    {
       long deliveryId = deliveryIdSequence.increment();
       
-      deliveries.put(new Long(deliveryId), new DeliveryRecord(del, consumerId, dlq, expiryQueue, redeliveryDelay));
+      deliveries.put(new Long(deliveryId), new DeliveryRecord(del, consumerId, dlq, expiryQueue, redeliveryDelay, maxDeliveryAttempts));
       
       if (trace) { log.trace(this + " added delivery " + deliveryId + ": " + del); }
       
@@ -994,7 +997,7 @@
       //might get changed after the client has sent the cancel - and we don't want to end up cancelling
       //back to the original queue
       boolean reachedMaxDeliveryAttempts =
-         cancel.isReachedMaxDeliveryAttempts() || cancel.getDeliveryCount() >= maxDeliveryAttempts;
+         cancel.isReachedMaxDeliveryAttempts() || cancel.getDeliveryCount() >= rec.maxDeliveryAttempts;
                     
       Delivery del = rec.del;   
          
@@ -1440,6 +1443,8 @@
       
       Queue expiryQueueToUse = mDest.getExpiryQueue() == null ? defaultExpiryQueue : mDest.getExpiryQueue();
       
+      int maxDeliveryAttemptsToUse = mDest.getMaxDeliveryAttempts() == -1 ? defaultMaxDeliveryAttempts : mDest.getMaxDeliveryAttempts();
+      
       long redeliveryDelay = mDest.getRedeliveryDelay();
       
       if (redeliveryDelay == 0)
@@ -1450,7 +1455,7 @@
       ServerConsumerEndpoint ep =
          new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
                   binding.getQueue().getName(), this, selectorString, noLocal,
-                  jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelay);
+                  jmsDestination, dlqToUse, expiryQueueToUse, redeliveryDelay, maxDeliveryAttemptsToUse);
       
       ConsumerAdvised advised;
       
@@ -1465,7 +1470,7 @@
       
       ClientConsumerDelegate stub =
          new ClientConsumerDelegate(consumerID,
-                                    prefetchSize, maxDeliveryAttempts);
+                                    prefetchSize, maxDeliveryAttemptsToUse);
       
       synchronized (consumers)
       {
@@ -1574,7 +1579,9 @@
       
       long redeliveryDelay;
       
-      DeliveryRecord(Delivery del, int consumerId, Queue dlq, Queue expiryQueue, long redeliveryDelay)
+      int maxDeliveryAttempts;
+      
+      DeliveryRecord(Delivery del, int consumerId, Queue dlq, Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts)
       {
          this.del = del;
          
@@ -1585,6 +1592,8 @@
          this.expiryQueue = expiryQueue;
          
          this.redeliveryDelay = redeliveryDelay;
+         
+         this.maxDeliveryAttempts = maxDeliveryAttempts;
       }            
    }
    

Added: trunk/tests/src/org/jboss/test/messaging/jms/MaxDeliveryAttemptsTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/MaxDeliveryAttemptsTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/MaxDeliveryAttemptsTest.java	2007-06-06 08:17:48 UTC (rev 2760)
@@ -0,0 +1,202 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2007, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A test for MaxDeliveryAttempts destination setting.
+ *
+ * @author <a href="sergey.koshcheyev at jboss.com">Sergey Koshcheyev</a>
+ * @version <tt>$Revision$</tt>
+ * 
+ * $Id$
+ */
+public class MaxDeliveryAttemptsTest extends MessagingTestCase
+{
+   // Constants -----------------------------------------------------
+   private final String DLQ_NAME = "DLQ";
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   protected InitialContext ic;
+   protected ConnectionFactory cf;
+   protected Queue dlq;
+   protected int defaultMaxDeliveryAttempts;
+
+   // Constructors --------------------------------------------------
+   
+   public MaxDeliveryAttemptsTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void testOverrideDefaultMaxDeliveryAttemptsForQueue() throws Exception
+   {
+      final String QUEUE_NAME = "Queue";
+      
+      ServerManagement.deployQueue(QUEUE_NAME);
+
+      try
+      {
+         testMaxDeliveryAttempts(
+               "/queue/" + QUEUE_NAME,
+               new ObjectName("jboss.messaging.destination:service=Queue,name=" + QUEUE_NAME));
+      }
+      finally
+      {
+         ServerManagement.undeployQueue(QUEUE_NAME);
+      }
+   }
+
+   public void testOverrideDefaultMaxDeliveryAttemptsForTopic() throws Exception
+   {
+      final String TOPIC_NAME = "Topic";
+      
+      ServerManagement.deployTopic(TOPIC_NAME);
+
+      try
+      {
+         testMaxDeliveryAttempts(
+               "/topic/" + TOPIC_NAME,
+               new ObjectName("jboss.messaging.destination:service=Topic,name=" + TOPIC_NAME));
+      }
+      finally
+      {
+         ServerManagement.undeployTopic(TOPIC_NAME);
+      }
+   }
+      
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   
+   protected void testMaxDeliveryAttempts(String destJndiName, ObjectName destObjectName) throws Exception
+   {
+      int destMaxDeliveryAttempts = defaultMaxDeliveryAttempts + 5;
+
+      Destination destination = (Destination) ic.lookup(destJndiName);
+      
+      ServerManagement.setAttribute(destObjectName, "MaxDeliveryAttempts",
+            Integer.toString(destMaxDeliveryAttempts));
+
+      Connection conn = cf.createConnection();
+      
+      try
+      {
+         // Create the consumer before the producer so that the message we send doesn't
+         // get lost if the destination is a Topic.
+         Session consumingSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);         
+         MessageConsumer destinationConsumer = consumingSession.createConsumer(destination);
+         
+         {
+            Session producingSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer prod = producingSession.createProducer(destination);
+            TextMessage tm = producingSession.createTextMessage("Message");
+            prod.send(tm);
+         }
+
+         conn.start();
+
+         // Make delivery attempts up to the maximum. The message should not end up in the DLQ.
+         for (int i = 0; i < destMaxDeliveryAttempts; i++)
+         {
+            TextMessage tm = (TextMessage)destinationConsumer.receive(1000);
+            assertNotNull("No message received on delivery attempt number " + (i + 1), tm);
+            assertEquals("Message", tm.getText());
+            consumingSession.recover();
+         }
+
+         // At this point the message should not yet be in the DLQ
+         MessageConsumer dlqConsumer = consumingSession.createConsumer(dlq);
+         Message m = dlqConsumer.receive(1000);
+         assertNull(m);
+         
+         // Now we try to consume the message again from the destination, which causes it
+         // to go to the DLQ instead.
+         m = destinationConsumer.receive(1000);
+         assertNull(m);
+         
+         // The message should be in the DLQ now
+         m = dlqConsumer.receive(1000);
+         assertNotNull(m);
+         assertTrue(m instanceof TextMessage);
+         assertEquals("Message", ((TextMessage) m).getText());
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      ServerManagement.start("all");
+      ServerManagement.deployQueue(DLQ_NAME);
+
+      ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+      cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+      dlq = (Queue) ic.lookup("/queue/" + DLQ_NAME);         
+
+      drainDestination(cf, dlq);
+
+      ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+      defaultMaxDeliveryAttempts =
+         ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();      
+   }
+
+   protected void tearDown() throws Exception
+   {
+      if (ic != null) ic.close();
+
+      ServerManagement.undeployQueue(DLQ_NAME);
+      ServerManagement.stop();
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}


Property changes on: trunk/tests/src/org/jboss/test/messaging/jms/MaxDeliveryAttemptsTest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision
Name: svn:eol-style
   + native




More information about the jboss-cvs-commits mailing list