[hornetq-commits] JBoss hornetq SVN: r8701 - in trunk: src/main/org/hornetq/core/paging/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Dec 21 08:46:08 EST 2009


Author: jmesnil
Date: 2009-12-21 08:46:07 -0500 (Mon, 21 Dec 2009)
New Revision: 8701

Added:
   trunk/tests/src/org/hornetq/tests/integration/client/AddressOOME.java
Modified:
   trunk/src/main/org/hornetq/core/paging/PagingManager.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
HORNETQ-255: memory leaks when creating/deleting queue

* when the last binding to a queue is removed, remove the pagingStore corresponding
  to the address and removed the consumer credit holder from the server session
* used a Map for the temporary queues' failureRunners and removes the runner when the queue
  is deleted by the user (from ServerSessionImpl.handleDeleteQueue)

Modified: trunk/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingManager.java	2009-12-21 13:41:03 UTC (rev 8700)
+++ trunk/src/main/org/hornetq/core/paging/PagingManager.java	2009-12-21 13:46:07 UTC (rev 8701)
@@ -82,4 +82,6 @@
    void reloadStores() throws Exception;
 
    SimpleString[] getStoreNames();
+
+   void deletePageStore(SimpleString storeName) throws Exception;
 }

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2009-12-21 13:41:03 UTC (rev 8700)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2009-12-21 13:46:07 UTC (rev 8701)
@@ -113,6 +113,15 @@
 
       return store;
    }
+   
+   public void deletePageStore(final SimpleString storeName) throws Exception
+   {
+      PagingStore store = stores.remove(storeName);
+      if (store != null)
+      {
+         store.stop();
+      }
+   }
 
    /** stores is a ConcurrentHashMap, so we don't need to synchronize this method */
    public PagingStore getPageStore(final SimpleString storeName) throws Exception

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-12-21 13:41:03 UTC (rev 8700)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-12-21 13:46:07 UTC (rev 8701)
@@ -481,23 +481,20 @@
          throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST);
       }
 
+      if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null)
+      {
+         pagingManager.deletePageStore(binding.getAddress());
+         
+         managementService.unregisterAddress(binding.getAddress());
+      }
+      
       if (binding.getType() == BindingType.LOCAL_QUEUE)
       {
          managementService.unregisterQueue(uniqueName, binding.getAddress());
-
-         if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null)
-         {
-            managementService.unregisterAddress(binding.getAddress());
-         }
       }
       else if (binding.getType() == BindingType.DIVERT)
       {
          managementService.unregisterDivert(uniqueName);
-
-         if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null)
-         {
-            managementService.unregisterAddress(binding.getAddress());
-         }
       }
 
       TypedProperties props = new TypedProperties();

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-12-21 13:41:03 UTC (rev 8700)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-12-21 13:46:07 UTC (rev 8701)
@@ -155,7 +155,7 @@
 
    private volatile boolean started = false;
 
-   private final List<Runnable> failureRunners = new ArrayList<Runnable>();
+   private final Map<SimpleString, Runnable> failureRunners = new HashMap<SimpleString, Runnable>();
 
    private final String name;
 
@@ -471,7 +471,7 @@
             // session is closed.
             // It is up to the user to delete the queue when finished with it
 
-            failureRunners.add(new Runnable()
+            failureRunners.put(name, new Runnable()
             {
                public void run()
                {
@@ -480,6 +480,11 @@
                      if (postOffice.getBinding(name) != null)
                      {
                         postOffice.removeBinding(name);
+                        
+                        if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
+                        {
+                           creditManagerHolders.remove(name);
+                        }
                      }
                   }
                   catch (Exception e)
@@ -532,6 +537,13 @@
 
          server.destroyQueue(name, this);
 
+         failureRunners.remove(name);
+         
+         if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
+         {
+            creditManagerHolders.remove(name);
+         }
+
          response = new NullResponseMessage();
       }
       catch (Exception e)
@@ -1618,7 +1630,7 @@
       {
          ServerSessionImpl.log.warn("Client connection failed, clearing up resources for session " + name);
 
-         for (Runnable runner : failureRunners)
+         for (Runnable runner : failureRunners.values())
          {
             try
             {
@@ -1644,7 +1656,7 @@
    {
       try
       {
-         for (Runnable runner : failureRunners)
+         for (Runnable runner : failureRunners.values())
          {
             try
             {

Added: trunk/tests/src/org/hornetq/tests/integration/client/AddressOOME.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/AddressOOME.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/AddressOOME.java	2009-12-21 13:46:07 UTC (rev 8701)
@@ -0,0 +1,124 @@
+package org.hornetq.tests.integration.client;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQConnection;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+
+public class AddressOOME implements MessageListener
+{
+
+   public static HornetQSession coreSession;
+
+   public static void main(String[] args) throws Exception
+   {
+
+      final String queueName = "request-reply-queue";
+
+      ConfigurationImpl conf = new ConfigurationImpl();
+
+      final TransportConfiguration serverConfiguration = new TransportConfiguration(InVMAcceptorFactory.class.getName());
+      final TransportConfiguration clientConfiguration = new TransportConfiguration(InVMConnectorFactory.class.getName());
+      conf.getAcceptorConfigurations().add(serverConfiguration);
+      conf.setSecurityEnabled(false);
+
+      HornetQServer server = HornetQ.newHornetQServer(conf, false);
+      JMSServerManager serverManager = new JMSServerManagerImpl(server);
+      serverManager.setContext(null);
+      serverManager.start();
+      serverManager.createQueue(queueName, null, null, false);
+
+      MessageProducer controlProducer = init(queueName, clientConfiguration, new AddressOOME());
+
+      try
+      {
+         int count = 0;
+         while (true)
+         {
+            HornetQMessage message = (HornetQMessage)coreSession.createTextMessage("myMessageWithNumber[" + (count++) +
+                                                                                     "]");
+            TemporaryQueue replyQueue = coreSession.createTemporaryQueue();
+            message.setJMSReplyTo(replyQueue);
+            controlProducer.send(message);
+            MessageConsumer temporaryConsumer = coreSession.createConsumer(replyQueue);
+            TextMessage returnMessage = (TextMessage)temporaryConsumer.receive(3000);
+            if (returnMessage != null)
+            {
+               System.out.println("received answer: [" + returnMessage.getText() + "]");
+            }
+            else
+            {
+               System.out.println("timeout on receiveing answer");
+            }
+            temporaryConsumer.close();
+            replyQueue.delete();
+         }
+      }
+      catch (JMSException e)
+      {
+         // TODO Auto-generated catch block
+         e.printStackTrace();
+         return;
+      }
+
+   }
+
+   private static MessageProducer init(final String queueName,
+                                       final TransportConfiguration transportConfiguration,
+                                       MessageListener listener) throws JMSException
+   {
+
+      MessageProducer controlProducer;
+      HornetQConnectionFactory cf = new HornetQConnectionFactory(transportConfiguration);
+
+      HornetQConnection connection = (HornetQConnection)cf.createQueueConnection();
+      connection.start();
+      coreSession = (HornetQSession)connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = coreSession.createQueue(queueName);
+      controlProducer = coreSession.createProducer(queue);
+      coreSession.start();
+      MessageConsumer consumer = coreSession.createConsumer(queue);
+      consumer.setMessageListener(listener);
+
+      return controlProducer;
+   }
+
+   public void onMessage(Message msg)
+   {
+      TextMessage message = (TextMessage)msg;
+      try
+      {
+         String textMessage = (String)message.getText();
+         Destination replyDestination = msg.getJMSReplyTo();
+         MessageProducer replyProducer = coreSession.createProducer(replyDestination);
+         HornetQMessage replyMessage = (HornetQMessage)coreSession.createTextMessage(textMessage + "_ANSWERED");
+         replyProducer.send(replyMessage);
+         replyProducer.close();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
+}
\ No newline at end of file

Modified: trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2009-12-21 13:41:03 UTC (rev 8700)
+++ trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2009-12-21 13:46:07 UTC (rev 8701)
@@ -13,6 +13,7 @@
 
 package org.hornetq.tests.integration.client;
 
+import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -93,7 +94,37 @@
 
       session.close();
    }
+   
+   public void testPaginStoreIsRemovedWhenQueueIsDeleted() throws Exception
+   {
+      SimpleString queue = RandomUtil.randomSimpleString();
+      SimpleString address = RandomUtil.randomSimpleString();
 
+      session.createTemporaryQueue(address, queue);
+
+      ClientProducer producer = session.createProducer(address);
+      ClientMessage msg = session.createMessage(false);
+
+      producer.send(msg);
+
+      session.start();
+      ClientConsumer consumer = session.createConsumer(queue);
+      ClientMessage message = consumer.receive(500);
+      Assert.assertNotNull(message);
+      message.acknowledge();
+
+      SimpleString[] storeNames = server.getPostOffice().getPagingManager().getStoreNames();
+      assertTrue(Arrays.asList(storeNames).contains(address));      
+
+      consumer.close();
+      session.deleteQueue(queue);
+
+      storeNames = server.getPostOffice().getPagingManager().getStoreNames();
+      assertFalse(Arrays.asList(storeNames).contains(address));
+
+      session.close();
+   }
+   
    public void testConsumeFromTemporaryQueueCreatedByOtherSession() throws Exception
    {
       SimpleString queue = RandomUtil.randomSimpleString();

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-12-21 13:41:03 UTC (rev 8700)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-12-21 13:46:07 UTC (rev 8701)
@@ -811,6 +811,10 @@
       {
          return null;
       }
+      
+      public void deletePageStore(SimpleString storeName) throws Exception
+      {
+      }
 
       public PageTransactionInfo getTransaction(final long transactionID)
       {

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2009-12-21 13:41:03 UTC (rev 8700)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2009-12-21 13:46:07 UTC (rev 8701)
@@ -236,6 +236,10 @@
          return null;
       }
 
+      public void deletePageStore(SimpleString storeName) throws Exception
+      {
+      }
+
       public PageTransactionInfo getTransaction(final long transactionID)
       {
          return null;



More information about the hornetq-commits mailing list