Author: jmesnil
Date: 2009-12-21 08:46:07 -0500 (Mon, 21 Dec 2009)
New Revision: 8701
Added:
trunk/tests/src/org/hornetq/tests/integration/client/AddressOOME.java
Modified:
trunk/src/main/org/hornetq/core/paging/PagingManager.java
trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
HORNETQ-255: memory leaks when creating/deleting queue
* when the last binding to a queue is removed, remove the pagingStore corresponding
to the address and removed the consumer credit holder from the server session
* used a Map for the temporary queues' failureRunners and removes the runner when the
queue
is deleted by the user (from ServerSessionImpl.handleDeleteQueue)
Modified: trunk/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingManager.java 2009-12-21 13:41:03 UTC (rev
8700)
+++ trunk/src/main/org/hornetq/core/paging/PagingManager.java 2009-12-21 13:46:07 UTC (rev
8701)
@@ -82,4 +82,6 @@
void reloadStores() throws Exception;
SimpleString[] getStoreNames();
+
+ void deletePageStore(SimpleString storeName) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2009-12-21 13:41:03
UTC (rev 8700)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2009-12-21 13:46:07
UTC (rev 8701)
@@ -113,6 +113,15 @@
return store;
}
+
+ public void deletePageStore(final SimpleString storeName) throws Exception
+ {
+ PagingStore store = stores.remove(storeName);
+ if (store != null)
+ {
+ store.stop();
+ }
+ }
/** stores is a ConcurrentHashMap, so we don't need to synchronize this method */
public PagingStore getPageStore(final SimpleString storeName) throws Exception
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-12-21
13:41:03 UTC (rev 8700)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-12-21
13:46:07 UTC (rev 8701)
@@ -481,23 +481,20 @@
throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST);
}
+ if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null)
+ {
+ pagingManager.deletePageStore(binding.getAddress());
+
+ managementService.unregisterAddress(binding.getAddress());
+ }
+
if (binding.getType() == BindingType.LOCAL_QUEUE)
{
managementService.unregisterQueue(uniqueName, binding.getAddress());
-
- if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null)
- {
- managementService.unregisterAddress(binding.getAddress());
- }
}
else if (binding.getType() == BindingType.DIVERT)
{
managementService.unregisterDivert(uniqueName);
-
- if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null)
- {
- managementService.unregisterAddress(binding.getAddress());
- }
}
TypedProperties props = new TypedProperties();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-21 13:41:03
UTC (rev 8700)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-21 13:46:07
UTC (rev 8701)
@@ -155,7 +155,7 @@
private volatile boolean started = false;
- private final List<Runnable> failureRunners = new ArrayList<Runnable>();
+ private final Map<SimpleString, Runnable> failureRunners = new
HashMap<SimpleString, Runnable>();
private final String name;
@@ -471,7 +471,7 @@
// session is closed.
// It is up to the user to delete the queue when finished with it
- failureRunners.add(new Runnable()
+ failureRunners.put(name, new Runnable()
{
public void run()
{
@@ -480,6 +480,11 @@
if (postOffice.getBinding(name) != null)
{
postOffice.removeBinding(name);
+
+ if (postOffice.getBindingsForAddress(name).getBindings().size()
== 0)
+ {
+ creditManagerHolders.remove(name);
+ }
}
}
catch (Exception e)
@@ -532,6 +537,13 @@
server.destroyQueue(name, this);
+ failureRunners.remove(name);
+
+ if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
+ {
+ creditManagerHolders.remove(name);
+ }
+
response = new NullResponseMessage();
}
catch (Exception e)
@@ -1618,7 +1630,7 @@
{
ServerSessionImpl.log.warn("Client connection failed, clearing up resources
for session " + name);
- for (Runnable runner : failureRunners)
+ for (Runnable runner : failureRunners.values())
{
try
{
@@ -1644,7 +1656,7 @@
{
try
{
- for (Runnable runner : failureRunners)
+ for (Runnable runner : failureRunners.values())
{
try
{
Added: trunk/tests/src/org/hornetq/tests/integration/client/AddressOOME.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/AddressOOME.java
(rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/AddressOOME.java 2009-12-21
13:46:07 UTC (rev 8701)
@@ -0,0 +1,124 @@
+package org.hornetq.tests.integration.client;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.client.HornetQConnection;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQMessage;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+
+public class AddressOOME implements MessageListener
+{
+
+ public static HornetQSession coreSession;
+
+ public static void main(String[] args) throws Exception
+ {
+
+ final String queueName = "request-reply-queue";
+
+ ConfigurationImpl conf = new ConfigurationImpl();
+
+ final TransportConfiguration serverConfiguration = new
TransportConfiguration(InVMAcceptorFactory.class.getName());
+ final TransportConfiguration clientConfiguration = new
TransportConfiguration(InVMConnectorFactory.class.getName());
+ conf.getAcceptorConfigurations().add(serverConfiguration);
+ conf.setSecurityEnabled(false);
+
+ HornetQServer server = HornetQ.newHornetQServer(conf, false);
+ JMSServerManager serverManager = new JMSServerManagerImpl(server);
+ serverManager.setContext(null);
+ serverManager.start();
+ serverManager.createQueue(queueName, null, null, false);
+
+ MessageProducer controlProducer = init(queueName, clientConfiguration, new
AddressOOME());
+
+ try
+ {
+ int count = 0;
+ while (true)
+ {
+ HornetQMessage message =
(HornetQMessage)coreSession.createTextMessage("myMessageWithNumber[" + (count++)
+
+
"]");
+ TemporaryQueue replyQueue = coreSession.createTemporaryQueue();
+ message.setJMSReplyTo(replyQueue);
+ controlProducer.send(message);
+ MessageConsumer temporaryConsumer = coreSession.createConsumer(replyQueue);
+ TextMessage returnMessage = (TextMessage)temporaryConsumer.receive(3000);
+ if (returnMessage != null)
+ {
+ System.out.println("received answer: [" +
returnMessage.getText() + "]");
+ }
+ else
+ {
+ System.out.println("timeout on receiveing answer");
+ }
+ temporaryConsumer.close();
+ replyQueue.delete();
+ }
+ }
+ catch (JMSException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ return;
+ }
+
+ }
+
+ private static MessageProducer init(final String queueName,
+ final TransportConfiguration
transportConfiguration,
+ MessageListener listener) throws JMSException
+ {
+
+ MessageProducer controlProducer;
+ HornetQConnectionFactory cf = new
HornetQConnectionFactory(transportConfiguration);
+
+ HornetQConnection connection = (HornetQConnection)cf.createQueueConnection();
+ connection.start();
+ coreSession = (HornetQSession)connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = coreSession.createQueue(queueName);
+ controlProducer = coreSession.createProducer(queue);
+ coreSession.start();
+ MessageConsumer consumer = coreSession.createConsumer(queue);
+ consumer.setMessageListener(listener);
+
+ return controlProducer;
+ }
+
+ public void onMessage(Message msg)
+ {
+ TextMessage message = (TextMessage)msg;
+ try
+ {
+ String textMessage = (String)message.getText();
+ Destination replyDestination = msg.getJMSReplyTo();
+ MessageProducer replyProducer = coreSession.createProducer(replyDestination);
+ HornetQMessage replyMessage =
(HornetQMessage)coreSession.createTextMessage(textMessage + "_ANSWERED");
+ replyProducer.send(replyMessage);
+ replyProducer.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2009-12-21
13:41:03 UTC (rev 8700)
+++
trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2009-12-21
13:46:07 UTC (rev 8701)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.client;
+import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -93,7 +94,37 @@
session.close();
}
+
+ public void testPaginStoreIsRemovedWhenQueueIsDeleted() throws Exception
+ {
+ SimpleString queue = RandomUtil.randomSimpleString();
+ SimpleString address = RandomUtil.randomSimpleString();
+ session.createTemporaryQueue(address, queue);
+
+ ClientProducer producer = session.createProducer(address);
+ ClientMessage msg = session.createMessage(false);
+
+ producer.send(msg);
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage message = consumer.receive(500);
+ Assert.assertNotNull(message);
+ message.acknowledge();
+
+ SimpleString[] storeNames =
server.getPostOffice().getPagingManager().getStoreNames();
+ assertTrue(Arrays.asList(storeNames).contains(address));
+
+ consumer.close();
+ session.deleteQueue(queue);
+
+ storeNames = server.getPostOffice().getPagingManager().getStoreNames();
+ assertFalse(Arrays.asList(storeNames).contains(address));
+
+ session.close();
+ }
+
public void testConsumeFromTemporaryQueueCreatedByOtherSession() throws Exception
{
SimpleString queue = RandomUtil.randomSimpleString();
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-12-21
13:41:03 UTC (rev 8700)
+++
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-12-21
13:46:07 UTC (rev 8701)
@@ -811,6 +811,10 @@
{
return null;
}
+
+ public void deletePageStore(SimpleString storeName) throws Exception
+ {
+ }
public PageTransactionInfo getTransaction(final long transactionID)
{
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-12-21
13:41:03 UTC (rev 8700)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-12-21
13:46:07 UTC (rev 8701)
@@ -236,6 +236,10 @@
return null;
}
+ public void deletePageStore(SimpleString storeName) throws Exception
+ {
+ }
+
public PageTransactionInfo getTransaction(final long transactionID)
{
return null;