[jboss-cvs] JBoss Messaging SVN: r7720 - trunk/src/main/org/jboss/messaging/core/postoffice/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 12 19:29:05 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-08-12 19:29:05 -0400 (Wed, 12 Aug 2009)
New Revision: 7720

Modified:
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
Log:
Fixing leakage on ReaperThread

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-08-12 17:01:19 UTC (rev 7719)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-08-12 23:29:05 UTC (rev 7720)
@@ -100,13 +100,13 @@
 
    private final ManagementService managementService;
 
-   private Thread expiryReaper;
+   private final Reaper reaperRunnable = new Reaper();
 
+   private volatile Thread reaperThread;
+
    private final long reaperPeriod;
 
    private final int reaperPriority;
-   
-   private Reaper reaper;
 
    private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<SimpleString, DuplicateIDCache>();
 
@@ -195,23 +195,29 @@
       // Injecting the postoffice (itself) on queueFactory for paging-control
       queueFactory.setPostOffice(this);
 
+      // The flag started needs to be set before starting the Reaper Thread
+      // This is to avoid thread leakages where the Reaper would run beyong the life cycle of the PostOffice
+      started = true;
+
       if (!backup)
       {
          startExpiryScanner();
       }
+   }
 
-      started = true;
-   }
-     
    public synchronized void stop() throws Exception
    {
+      started = false;
+
       managementService.removeNotificationListener(this);
 
-      if (reaper != null)
+      reaperRunnable.stop();
+
+      if (reaperThread != null)
       {
-         reaper.stop();
-         
-         expiryReaper.join();
+         reaperThread.join();
+
+         reaperThread = null;
       }
 
       addressManager.clear();
@@ -220,7 +226,6 @@
 
       transientIDs.clear();
 
-      started = false;
    }
 
    public boolean isStarted()
@@ -359,7 +364,8 @@
 
                      if (redistributionDelay != -1)
                      {
-                        queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor(),
+                        queue.addRedistributor(redistributionDelay,
+                                               redistributorExecutorFactory.getExecutor(),
                                                server.getReplicatingChannel());
                      }
                   }
@@ -430,7 +436,8 @@
 
                      if (redistributionDelay != -1)
                      {
-                        queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor(),
+                        queue.addRedistributor(redistributionDelay,
+                                               redistributorExecutorFactory.getExecutor(),
                                                server.getReplicatingChannel());
                      }
                   }
@@ -459,8 +466,8 @@
 
       boolean existed = addressManager.addBinding(binding);
 
-      //TODO - why is this code here?
-      //Shouldn't it be in MessagingServerImpl::createQueue??
+      // TODO - why is this code here?
+      // Shouldn't it be in MessagingServerImpl::createQueue??
       if (binding.getType() == BindingType.LOCAL_QUEUE)
       {
          Queue queue = (Queue)binding.getBindable();
@@ -507,7 +514,7 @@
    public synchronized Binding removeBinding(final SimpleString uniqueName) throws Exception
    {
       Binding binding = addressManager.removeBinding(uniqueName);
-      
+
       if (binding == null)
       {
          throw new MessagingException(MessagingException.QUEUE_DOES_NOT_EXIST);
@@ -565,7 +572,7 @@
    {
       return addressManager.getBinding(name);
    }
-   
+
    public Bindings getMatchingBindings(final SimpleString address)
    {
       return addressManager.getMatchingBindings(address);
@@ -576,7 +583,7 @@
       SimpleString address = message.getDestination();
 
       byte[] duplicateIDBytes = null;
-      
+
       Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
 
       DuplicateIDCache cache = null;
@@ -593,7 +600,7 @@
          {
             duplicateIDBytes = (byte[])duplicateID;
          }
-         
+
          if (cache.contains(duplicateIDBytes))
          {
             if (tx == null)
@@ -623,7 +630,7 @@
 
             startedTx = true;
          }
-         
+
          cache.addToCache(duplicateIDBytes, tx);
       }
 
@@ -687,6 +694,7 @@
 
    public List<Queue> activate()
    {
+
       backup = false;
 
       pagingManager.activate();
@@ -733,7 +741,7 @@
 
       return cache;
    }
-   
+
    public Object getNotificationLock()
    {
       return notificationLock;
@@ -819,18 +827,17 @@
 
    private synchronized void startExpiryScanner()
    {
+
       if (reaperPeriod > 0)
       {
-         reaper = new Reaper();
-         
-         expiryReaper = new Thread(reaper, "JBM-expiry-reaper");
-         
-         expiryReaper.setPriority(reaperPriority);
-         
-         expiryReaper.start();
+         reaperThread = new Thread(reaperRunnable, "JBM-expiry-reaper");
+
+         reaperThread.setPriority(reaperPriority);
+
+         reaperThread.start();
       }
    }
-   
+
    private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
    {
       if (queue.getFilter() == null || queue.getFilter().match(message))
@@ -901,26 +908,35 @@
          return oper;
       }
    }
-   
+
    private class Reaper implements Runnable
    {
-      private boolean closed;
-      
+      private volatile boolean closed = false;
+
       public synchronized void stop()
       {
          closed = true;
-         
+
          notify();
       }
-      
+
       public synchronized void run()
       {
-         while (true)
+         if (closed)
          {
+            // This shouldn't happen in a regular scenario
+            log.warn("Reaper thread being restarted");
+            closed = false;
+         }
+         
+         // The reaper thread should be finished case the PostOffice is gone
+         // This is to avoid leaks on PostOffice between stops and starts
+         while (PostOfficeImpl.this.isStarted())
+         {
             long toWait = reaperPeriod;
-   
+
             long start = System.currentTimeMillis();
-   
+
             while (!closed && toWait > 0)
             {
                try
@@ -930,33 +946,33 @@
                catch (InterruptedException e)
                {
                }
-   
+
                long now = System.currentTimeMillis();
-   
+
                toWait -= now - start;
-   
+
                start = now;
             }
-            
+
             if (closed)
             {
                return;
             }
-   
+
             Map<SimpleString, Binding> nameMap = addressManager.getBindings();
-   
+
             List<Queue> queues = new ArrayList<Queue>();
-   
+
             for (Binding binding : nameMap.values())
             {
                if (binding.getType() == BindingType.LOCAL_QUEUE)
                {
                   Queue queue = (Queue)binding.getBindable();
-   
+
                   queues.add(queue);
                }
             }
-   
+
             for (Queue queue : queues)
             {
                try
@@ -981,8 +997,6 @@
          messagesToPage.add(message);
       }
 
-      
-
       /* (non-Javadoc)
        * @see org.jboss.messaging.core.transaction.TransactionOperation#getDistinctQueues()
        */




More information about the jboss-cvs-commits mailing list