[jboss-cvs] JBoss Messaging SVN: r7560 - in trunk: src/main/org/jboss/messaging/core/management/impl and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jul 10 11:07:49 EDT 2009
Author: jmesnil
Date: 2009-07-10 11:07:49 -0400 (Fri, 10 Jul 2009)
New Revision: 7560
Modified:
trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/messagecounter/impl/MessageCounterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
Log:
* removed Timers in MessageCounterManagerImpl & ResourceManagerImpl
* replaced them by Runnable executed by the server's scheduled thread pool
Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2009-07-10 15:07:49 UTC (rev 7560)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.management;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
import javax.management.ObjectName;
@@ -88,6 +89,7 @@
RemotingService remotingService,
MessagingServer messagingServer,
QueueFactory queueFactory,
+ ScheduledExecutorService scheduledThreadPool,
boolean backup) throws Exception;
void unregisterServer() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-07-10 15:07:49 UTC (rev 7560)
@@ -30,6 +30,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
import javax.management.MBeanServer;
import javax.management.NotificationBroadcasterSupport;
@@ -118,7 +119,7 @@
private MessagingServerControlImpl messagingServerControl;
- private final MessageCounterManager messageCounterManager;
+ private MessageCounterManager messageCounterManager;
private final SimpleString managementNotificationAddress;
@@ -169,9 +170,6 @@
registry = new HashMap<String, Object>();
broadcaster = new NotificationBroadcasterSupport();
notificationsEnabled = true;
- messageCounterManager = new MessageCounterManagerImpl();
- messageCounterManager.setMaxDayCount(configuration.getMessageCounterMaxDayHistory());
- messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
replicationInvoker = new ReplicationOperationInvokerImpl(managementClusterUser,
managementClusterPassword,
@@ -198,6 +196,7 @@
final RemotingService remotingService,
final MessagingServer messagingServer,
final QueueFactory queueFactory,
+ final ScheduledExecutorService scheduledThreadPool,
final boolean backup) throws Exception
{
this.postOffice = postOffice;
@@ -206,6 +205,10 @@
this.storageManager = storageManager;
this.messagingServer = messagingServer;
+ this.messageCounterManager = new MessageCounterManagerImpl(scheduledThreadPool);
+ messageCounterManager.setMaxDayCount(configuration.getMessageCounterMaxDayHistory());
+ messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
+
messagingServerControl = new MessagingServerControlImpl(postOffice,
configuration,
resourceManager,
Modified: trunk/src/main/org/jboss/messaging/core/messagecounter/impl/MessageCounterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/messagecounter/impl/MessageCounterManagerImpl.java 2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/src/main/org/jboss/messaging/core/messagecounter/impl/MessageCounterManagerImpl.java 2009-07-10 15:07:49 UTC (rev 7560)
@@ -26,8 +26,9 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
@@ -39,6 +40,8 @@
* A MessageCounterManager
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
* @version <tt>$Revision: 3307 $</tt>
*
* $Id: MessageCounterManager.java 3307 2007-11-09 20:43:00Z timfox $
@@ -59,17 +62,19 @@
private boolean started;
- private Timer timer;
-
private long period = DEFAULT_SAMPLE_PERIOD;
- private PingMessageCountersTask task;
+ private MessageCountersPinger messageCountersPinger;
private int maxDayCount = DEFAULT_MAX_DAY_COUNT;
+
+ private final ScheduledExecutorService scheduledThreadPool;
- public MessageCounterManagerImpl()
+ public MessageCounterManagerImpl(final ScheduledExecutorService scheduledThreadPool)
{
messageCounters = new HashMap<String, MessageCounter>();
+
+ this.scheduledThreadPool = scheduledThreadPool;
}
public synchronized void start()
@@ -79,12 +84,10 @@
return;
}
- // Needs to be daemon
- timer = new Timer(true);
+ messageCountersPinger = new MessageCountersPinger();
- task = new PingMessageCountersTask();
-
- timer.schedule(task, 0, period);
+ Future<?> future = scheduledThreadPool.scheduleAtFixedRate(messageCountersPinger, 0, period, TimeUnit.MILLISECONDS);
+ messageCountersPinger.setFuture(future);
started = true;
}
@@ -96,14 +99,8 @@
return;
}
- //Wait for timer task to stop
+ messageCountersPinger.stop();
- task.stop();
-
- timer.cancel();
-
- timer = null;
-
started = false;
}
@@ -206,10 +203,19 @@
}
}
- class PingMessageCountersTask extends TimerTask
+ class MessageCountersPinger implements Runnable
{
+ private boolean closed = false;
+
+ private Future<?> future;
+
public synchronized void run()
{
+ if (closed)
+ {
+ return;
+ }
+
synchronized (messageCounters)
{
Iterator<MessageCounter> iter = messageCounters.values().iterator();
@@ -223,9 +229,19 @@
}
}
+ public void setFuture(Future<?> future)
+ {
+ this.future = future;
+ }
+
synchronized void stop()
{
- cancel();
+ if (future != null)
+ {
+ future.cancel(false);
+ }
+
+ closed = true;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-10 15:07:49 UTC (rev 7560)
@@ -910,7 +910,8 @@
pagingManager = createPagingManager();
resourceManager = new ResourceManagerImpl((int)(configuration.getTransactionTimeout() / 1000),
- configuration.getTransactionTimeoutScanPeriod());
+ configuration.getTransactionTimeoutScanPeriod(),
+ scheduledPool);
postOffice = new PostOfficeImpl(this,
storageManager,
pagingManager,
@@ -934,6 +935,7 @@
remotingService,
this,
queueFactory,
+ scheduledPool,
configuration.isBackup());
// Address settings need to deployed initially, since they're require on paging manager.start()
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java 2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java 2009-07-10 15:07:49 UTC (rev 7560)
@@ -28,10 +28,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.transaction.xa.Xid;
@@ -58,17 +59,20 @@
private boolean started = false;
- private Timer timer;
+ private TxTimeoutHandler task;
- private TimerTask task;
-
private final long txTimeoutScanPeriod;
- public ResourceManagerImpl(final int defaultTimeoutSeconds, final long txTimeoutScanPeriod)
+ private final ScheduledExecutorService scheduledThreadPool;
+
+ public ResourceManagerImpl(final int defaultTimeoutSeconds,
+ final long txTimeoutScanPeriod,
+ final ScheduledExecutorService scheduledThreadPool)
{
this.defaultTimeoutSeconds = defaultTimeoutSeconds;
this.timeoutSeconds = defaultTimeoutSeconds;
this.txTimeoutScanPeriod = txTimeoutScanPeriod;
+ this.scheduledThreadPool = scheduledThreadPool;
}
// MessagingComponent implementation
@@ -79,9 +83,11 @@
{
return;
}
- timer = new Timer(true);
+
task = new TxTimeoutHandler();
- timer.schedule(task, txTimeoutScanPeriod, txTimeoutScanPeriod);
+ Future<?> future = scheduledThreadPool.scheduleAtFixedRate(task, txTimeoutScanPeriod, txTimeoutScanPeriod, TimeUnit.MILLISECONDS);
+ task.setFuture(future);
+
started = true;
}
@@ -91,15 +97,9 @@
{
return;
}
- if (timer != null)
+ if (task != null)
{
- task.cancel();
-
- task = null;
-
- timer.cancel();
-
- timer = null;
+ task.close();
}
started = false;
@@ -173,10 +173,19 @@
return xidsWithCreationTime;
}
- class TxTimeoutHandler extends TimerTask
+ class TxTimeoutHandler implements Runnable
{
+ private boolean closed = false;
+
+ private Future<?> future;
+
public void run()
{
+ if (closed)
+ {
+ return;
+ }
+
Set<Transaction> timedoutTransactions = new HashSet<Transaction>();
long now = System.currentTimeMillis();
@@ -204,5 +213,20 @@
}
}
+ synchronized void setFuture(Future<?> future)
+ {
+ this.future = future;
+ }
+
+ void close()
+ {
+ if (future != null)
+ {
+ future.cancel(false);
+ }
+
+ closed = true;
+ }
+
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-07-10 12:41:48 UTC (rev 7559)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-07-10 15:07:49 UTC (rev 7560)
@@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
@@ -89,6 +90,8 @@
configuration.setJournalType(JournalType.ASYNCIO);
+ ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
+
journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
journal.start();
@@ -97,7 +100,7 @@
HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
journal.loadMessageJournal(new FakePagingManager(),
- new ResourceManagerImpl(0, 0),
+ new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
mapDups);
@@ -117,7 +120,7 @@
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
journal.loadMessageJournal(new FakePagingManager(),
- new ResourceManagerImpl(0, 0),
+ new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
mapDups);
@@ -144,7 +147,7 @@
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
journal.loadMessageJournal(new FakePagingManager(),
- new ResourceManagerImpl(0, 0),
+ new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
mapDups);
More information about the jboss-cvs-commits
mailing list