[jboss-cvs] JBoss Messaging SVN: r7560 - in trunk: src/main/org/jboss/messaging/core/management/impl and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jul 10 11:07:49 EDT 2009


Author: jmesnil
Date: 2009-07-10 11:07:49 -0400 (Fri, 10 Jul 2009)
New Revision: 7560

Modified:
   trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/messagecounter/impl/MessageCounterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
* removed Timers in MessageCounterManagerImpl & ResourceManagerImpl
* replaced them by Runnable executed by the server's scheduled thread pool


Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java	2009-07-10 15:07:49 UTC (rev 7560)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.management;
 
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.management.ObjectName;
 
@@ -88,6 +89,7 @@
                                          RemotingService remotingService,
                                          MessagingServer messagingServer,
                                          QueueFactory queueFactory,
+                                         ScheduledExecutorService scheduledThreadPool,
                                          boolean backup) throws Exception;
 
    void unregisterServer() throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-07-10 15:07:49 UTC (rev 7560)
@@ -30,6 +30,7 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.management.MBeanServer;
 import javax.management.NotificationBroadcasterSupport;
@@ -118,7 +119,7 @@
 
    private MessagingServerControlImpl messagingServerControl;
 
-   private final MessageCounterManager messageCounterManager;
+   private MessageCounterManager messageCounterManager;
 
    private final SimpleString managementNotificationAddress;
 
@@ -169,9 +170,6 @@
       registry = new HashMap<String, Object>();
       broadcaster = new NotificationBroadcasterSupport();
       notificationsEnabled = true;
-      messageCounterManager = new MessageCounterManagerImpl();
-      messageCounterManager.setMaxDayCount(configuration.getMessageCounterMaxDayHistory());
-      messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
 
       replicationInvoker = new ReplicationOperationInvokerImpl(managementClusterUser,
                                                                managementClusterPassword,
@@ -198,6 +196,7 @@
                                                     final RemotingService remotingService,
                                                     final MessagingServer messagingServer,
                                                     final QueueFactory queueFactory,
+                                                    final ScheduledExecutorService scheduledThreadPool,
                                                     final boolean backup) throws Exception
    {
       this.postOffice = postOffice;
@@ -206,6 +205,10 @@
       this.storageManager = storageManager;
       this.messagingServer = messagingServer;
 
+      this.messageCounterManager = new MessageCounterManagerImpl(scheduledThreadPool);
+      messageCounterManager.setMaxDayCount(configuration.getMessageCounterMaxDayHistory());
+      messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
+
       messagingServerControl = new MessagingServerControlImpl(postOffice,
                                                               configuration,
                                                               resourceManager,

Modified: trunk/src/main/org/jboss/messaging/core/messagecounter/impl/MessageCounterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/messagecounter/impl/MessageCounterManagerImpl.java	2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/src/main/org/jboss/messaging/core/messagecounter/impl/MessageCounterManagerImpl.java	2009-07-10 15:07:49 UTC (rev 7560)
@@ -26,8 +26,9 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.logging.Logger;
@@ -39,6 +40,8 @@
  * A MessageCounterManager
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
  * @version <tt>$Revision: 3307 $</tt>
  *
  * $Id: MessageCounterManager.java 3307 2007-11-09 20:43:00Z timfox $
@@ -59,17 +62,19 @@
    
    private boolean started;
    
-   private Timer timer;
-   
    private long period = DEFAULT_SAMPLE_PERIOD;
    
-   private PingMessageCountersTask task;
+   private MessageCountersPinger messageCountersPinger;
 
    private int maxDayCount = DEFAULT_MAX_DAY_COUNT;
+
+   private final ScheduledExecutorService scheduledThreadPool;
           
-   public MessageCounterManagerImpl()
+   public MessageCounterManagerImpl(final ScheduledExecutorService scheduledThreadPool)
    {
       messageCounters = new HashMap<String, MessageCounter>();
+      
+      this.scheduledThreadPool = scheduledThreadPool;
    }
 
    public synchronized void start()
@@ -79,12 +84,10 @@
          return;
       }
       
-      // Needs to be daemon
-      timer = new Timer(true);
+      messageCountersPinger = new MessageCountersPinger();
       
-      task = new PingMessageCountersTask();
-            
-      timer.schedule(task, 0, period);      
+      Future<?> future = scheduledThreadPool.scheduleAtFixedRate(messageCountersPinger, 0, period, TimeUnit.MILLISECONDS);
+      messageCountersPinger.setFuture(future);
       
       started = true;      
    }
@@ -96,14 +99,8 @@
          return;
       }
       
-      //Wait for timer task to stop
+      messageCountersPinger.stop();
       
-      task.stop();
-      
-      timer.cancel();
-      
-      timer = null;
-      
       started = false;
    }
    
@@ -206,10 +203,19 @@
       }
    }
    
-   class PingMessageCountersTask extends TimerTask
+   class MessageCountersPinger implements Runnable
    {
+      private boolean closed = false;
+      
+      private Future<?> future;
+
       public synchronized void run()
       {
+         if (closed)
+         {
+            return;
+         }
+         
          synchronized (messageCounters)
          {
             Iterator<MessageCounter> iter = messageCounters.values().iterator();
@@ -223,9 +229,19 @@
          }
       }  
                         
+      public void setFuture(Future<?> future)
+      {
+         this.future = future;
+      }
+
       synchronized void stop()
       {
-         cancel();
+         if (future != null)
+         {
+            future.cancel(false);
+         }
+         
+         closed = true;
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-07-10 15:07:49 UTC (rev 7560)
@@ -910,7 +910,8 @@
       pagingManager = createPagingManager();
 
       resourceManager = new ResourceManagerImpl((int)(configuration.getTransactionTimeout() / 1000),
-                                                configuration.getTransactionTimeoutScanPeriod());
+                                                configuration.getTransactionTimeoutScanPeriod(),
+                                                scheduledPool);
       postOffice = new PostOfficeImpl(this,
                                       storageManager,
                                       pagingManager,
@@ -934,6 +935,7 @@
                                                                 remotingService,
                                                                 this,
                                                                 queueFactory,
+                                                                scheduledPool,
                                                                 configuration.isBackup());
 
       // Address settings need to deployed initially, since they're require on paging manager.start()

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java	2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java	2009-07-10 15:07:49 UTC (rev 7560)
@@ -28,10 +28,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.transaction.xa.Xid;
 
@@ -58,17 +59,20 @@
 
    private boolean started = false;
 
-   private Timer timer;
+   private TxTimeoutHandler task;
 
-   private TimerTask task;
-
    private final long txTimeoutScanPeriod;
 
-   public ResourceManagerImpl(final int defaultTimeoutSeconds, final long txTimeoutScanPeriod)
+   private final ScheduledExecutorService scheduledThreadPool;
+
+   public ResourceManagerImpl(final int defaultTimeoutSeconds, 
+                              final long txTimeoutScanPeriod, 
+                              final ScheduledExecutorService scheduledThreadPool)
    {
       this.defaultTimeoutSeconds = defaultTimeoutSeconds;
       this.timeoutSeconds = defaultTimeoutSeconds;
       this.txTimeoutScanPeriod = txTimeoutScanPeriod;
+      this.scheduledThreadPool = scheduledThreadPool;
    }
 
    // MessagingComponent implementation
@@ -79,9 +83,11 @@
       {
          return;
       }
-      timer = new Timer(true);
+      
       task = new TxTimeoutHandler();
-      timer.schedule(task, txTimeoutScanPeriod, txTimeoutScanPeriod);
+      Future<?> future = scheduledThreadPool.scheduleAtFixedRate(task, txTimeoutScanPeriod, txTimeoutScanPeriod, TimeUnit.MILLISECONDS);
+      task.setFuture(future);
+      
       started = true;
    }
 
@@ -91,15 +97,9 @@
       {
          return;
       }
-      if (timer != null)
+      if (task != null)
       {
-         task.cancel();
-
-         task = null;
-
-         timer.cancel();
-
-         timer = null;
+         task.close();
       }
 
       started = false;
@@ -173,10 +173,19 @@
       return xidsWithCreationTime;
    }
 
-   class TxTimeoutHandler extends TimerTask
+   class TxTimeoutHandler implements Runnable
    {
+      private boolean closed = false;
+      
+      private Future<?> future;
+
       public void run()
       {
+         if (closed)
+         {
+            return;
+         }
+         
          Set<Transaction> timedoutTransactions = new HashSet<Transaction>();
 
          long now = System.currentTimeMillis();
@@ -204,5 +213,20 @@
          }
       }
 
+      synchronized void setFuture(Future<?> future)
+      {
+         this.future = future;
+      }
+      
+      void close()
+      {
+         if (future != null)
+         {
+            future.cancel(false);
+         }
+         
+         closed = true;
+      }
+
    }
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2009-07-10 15:07:49 UTC (rev 7560)
@@ -27,6 +27,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -89,6 +90,8 @@
 
          configuration.setJournalType(JournalType.ASYNCIO);
 
+         ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
+         
          journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
 
          journal.start();
@@ -97,7 +100,7 @@
          HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
 
          journal.loadMessageJournal(new FakePagingManager(),
-                                    new ResourceManagerImpl(0, 0),
+                                    new ResourceManagerImpl(0, 0, scheduledThreadPool),
                                     new HashMap<Long, Queue>(),
                                     mapDups);
 
@@ -117,7 +120,7 @@
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
 
          journal.loadMessageJournal(new FakePagingManager(),
-                                    new ResourceManagerImpl(0, 0),
+                                    new ResourceManagerImpl(0, 0, scheduledThreadPool),
                                     new HashMap<Long, Queue>(),
                                     mapDups);
 
@@ -144,7 +147,7 @@
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
 
          journal.loadMessageJournal(new FakePagingManager(),
-                                    new ResourceManagerImpl(0, 0),
+                                    new ResourceManagerImpl(0, 0, scheduledThreadPool),
                                     new HashMap<Long, Queue>(),
                                     mapDups);
 




More information about the jboss-cvs-commits mailing list