[jboss-cvs] JBoss Messaging SVN: r7720 - trunk/src/main/org/jboss/messaging/core/postoffice/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 12 19:29:05 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-08-12 19:29:05 -0400 (Wed, 12 Aug 2009)
New Revision: 7720
Modified:
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
Log:
Fixing leakage on ReaperThread
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-08-12 17:01:19 UTC (rev 7719)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-08-12 23:29:05 UTC (rev 7720)
@@ -100,13 +100,13 @@
private final ManagementService managementService;
- private Thread expiryReaper;
+ private final Reaper reaperRunnable = new Reaper();
+ private volatile Thread reaperThread;
+
private final long reaperPeriod;
private final int reaperPriority;
-
- private Reaper reaper;
private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<SimpleString, DuplicateIDCache>();
@@ -195,23 +195,29 @@
// Injecting the postoffice (itself) on queueFactory for paging-control
queueFactory.setPostOffice(this);
+ // The flag started needs to be set before starting the Reaper Thread
+ // This is to avoid thread leakages where the Reaper would run beyong the life cycle of the PostOffice
+ started = true;
+
if (!backup)
{
startExpiryScanner();
}
+ }
- started = true;
- }
-
public synchronized void stop() throws Exception
{
+ started = false;
+
managementService.removeNotificationListener(this);
- if (reaper != null)
+ reaperRunnable.stop();
+
+ if (reaperThread != null)
{
- reaper.stop();
-
- expiryReaper.join();
+ reaperThread.join();
+
+ reaperThread = null;
}
addressManager.clear();
@@ -220,7 +226,6 @@
transientIDs.clear();
- started = false;
}
public boolean isStarted()
@@ -359,7 +364,8 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor(),
+ queue.addRedistributor(redistributionDelay,
+ redistributorExecutorFactory.getExecutor(),
server.getReplicatingChannel());
}
}
@@ -430,7 +436,8 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor(),
+ queue.addRedistributor(redistributionDelay,
+ redistributorExecutorFactory.getExecutor(),
server.getReplicatingChannel());
}
}
@@ -459,8 +466,8 @@
boolean existed = addressManager.addBinding(binding);
- //TODO - why is this code here?
- //Shouldn't it be in MessagingServerImpl::createQueue??
+ // TODO - why is this code here?
+ // Shouldn't it be in MessagingServerImpl::createQueue??
if (binding.getType() == BindingType.LOCAL_QUEUE)
{
Queue queue = (Queue)binding.getBindable();
@@ -507,7 +514,7 @@
public synchronized Binding removeBinding(final SimpleString uniqueName) throws Exception
{
Binding binding = addressManager.removeBinding(uniqueName);
-
+
if (binding == null)
{
throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
@@ -565,7 +572,7 @@
{
return addressManager.getBinding(name);
}
-
+
public Bindings getMatchingBindings(final SimpleString address)
{
return addressManager.getMatchingBindings(address);
@@ -576,7 +583,7 @@
SimpleString address = message.getDestination();
byte[] duplicateIDBytes = null;
-
+
Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
DuplicateIDCache cache = null;
@@ -593,7 +600,7 @@
{
duplicateIDBytes = (byte[])duplicateID;
}
-
+
if (cache.contains(duplicateIDBytes))
{
if (tx == null)
@@ -623,7 +630,7 @@
startedTx = true;
}
-
+
cache.addToCache(duplicateIDBytes, tx);
}
@@ -687,6 +694,7 @@
public List<Queue> activate()
{
+
backup = false;
pagingManager.activate();
@@ -733,7 +741,7 @@
return cache;
}
-
+
public Object getNotificationLock()
{
return notificationLock;
@@ -819,18 +827,17 @@
private synchronized void startExpiryScanner()
{
+
if (reaperPeriod > 0)
{
- reaper = new Reaper();
-
- expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
-
- expiryReaper.setPriority(reaperPriority);
-
- expiryReaper.start();
+ reaperThread = new Thread(reaperRunnable, "JBM-expiry-reaper");
+
+ reaperThread.setPriority(reaperPriority);
+
+ reaperThread.start();
}
}
-
+
private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
{
if (queue.getFilter() == null || queue.getFilter().match(message))
@@ -901,26 +908,35 @@
return oper;
}
}
-
+
private class Reaper implements Runnable
{
- private boolean closed;
-
+ private volatile boolean closed = false;
+
public synchronized void stop()
{
closed = true;
-
+
notify();
}
-
+
public synchronized void run()
{
- while (true)
+ if (closed)
{
+ // This shouldn't happen in a regular scenario
+ log.warn("Reaper thread being restarted");
+ closed = false;
+ }
+
+ // The reaper thread should be finished case the PostOffice is gone
+ // This is to avoid leaks on PostOffice between stops and starts
+ while (PostOfficeImpl.this.isStarted())
+ {
long toWait = reaperPeriod;
-
+
long start = System.currentTimeMillis();
-
+
while (!closed && toWait > 0)
{
try
@@ -930,33 +946,33 @@
catch (InterruptedException e)
{
}
-
+
long now = System.currentTimeMillis();
-
+
toWait -= now - start;
-
+
start = now;
}
-
+
if (closed)
{
return;
}
-
+
Map<SimpleString, Binding> nameMap = addressManager.getBindings();
-
+
List<Queue> queues = new ArrayList<Queue>();
-
+
for (Binding binding : nameMap.values())
{
if (binding.getType() == BindingType.LOCAL_QUEUE)
{
Queue queue = (Queue)binding.getBindable();
-
+
queues.add(queue);
}
}
-
+
for (Queue queue : queues)
{
try
@@ -981,8 +997,6 @@
messagesToPage.add(message);
}
-
-
/* (non-Javadoc)
* @see org.jboss.messaging.core.transaction.TransactionOperation#getDistinctQueues()
*/
More information about the jboss-cvs-commits
mailing list