[hornetq-commits] JBoss hornetq SVN: r8912 - trunk/src/main/org/hornetq/ra/inflow.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Mar 4 06:44:07 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-03-04 06:44:07 -0500 (Thu, 04 Mar 2010)
New Revision: 8912

Modified:
   trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
Deleting temporary queues on non-durable topic subscriptions when deactivating the MDB

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2010-03-04 11:20:12 UTC (rev 8911)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2010-03-04 11:44:07 UTC (rev 8912)
@@ -56,6 +56,8 @@
     */
    private final ClientSession session;
 
+   private ClientConsumer consumer;
+
    /**
     * The endpoint
     */
@@ -85,7 +87,6 @@
       String selector = spec.getMessageSelector();
 
       // Create the message consumer
-      ClientConsumer consumer;
       SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
       if (activation.isTopic() && spec.isSubscriptionDurable())
       {
@@ -193,12 +194,32 @@
          if (endpoint != null)
          {
             endpoint.release();
+            endpoint = null;
          }
       }
       catch (Throwable t)
       {
          HornetQMessageHandler.log.debug("Error releasing endpoint " + endpoint, t);
       }
+      
+      try
+      {
+         consumer.close();
+         if (activation.getTopicTemporaryQueue() != null)
+         {
+            // We need to delete temporary topics when the activation is stopped or messages will build up on the server
+            SimpleString tmpQueue = activation.getTopicTemporaryQueue();
+            QueueQuery subResponse = session.queueQuery(tmpQueue);
+            if (subResponse.getConsumerCount() == 0)
+            {
+               session.deleteQueue(tmpQueue);
+            }
+         }
+      }
+      catch (Throwable t)
+      {
+         HornetQMessageHandler.log.debug("Error closing core-queue consumer", t);
+      }
 
       try
       {



More information about the hornetq-commits mailing list