Author: manik.surtani(a)jboss.com
Date: 2008-08-01 09:19:09 -0400 (Fri, 01 Aug 2008)
New Revision: 6481
Modified:
core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
Log:
ReplicationQueue to use a ScheduledExecutor
Modified: core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java 2008-08-01
13:12:46 UTC (rev 6480)
+++ core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java 2008-08-01
13:19:09 UTC (rev 6481)
@@ -14,8 +14,11 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Periodically (or when certain size is exceeded) takes elements and replicates them.
@@ -29,11 +32,6 @@
private static final Log log = LogFactory.getLog(ReplicationQueue.class);
/**
- * We flush every 5 seconds. Inactive if -1 or 0
- */
- private long interval = 5000;
-
- /**
* Max elements before we flush
*/
private long max_elements = 500;
@@ -46,18 +44,13 @@
/**
* For periodical replication
*/
- private Timer timer = null;
-
- /**
- * The timer task, only calls flush() when executed by Timer
- */
- private ReplicationQueue.MyTask task = null;
+ private ScheduledExecutorService scheduledExecutor = null;
private RPCManager rpcManager;
private Configuration configuration;
private boolean enabled;
private CommandsFactory commandsFactory;
+ private static final AtomicInteger counter = new AtomicInteger(0);
-
public boolean isEnabled()
{
return enabled;
@@ -80,7 +73,7 @@
@Start
public synchronized void start()
{
- this.interval = configuration.getReplQueueInterval();
+ long interval = configuration.getReplQueueInterval();
this.max_elements = configuration.getReplQueueMaxElements();
// check again
enabled = configuration.isUseReplQueue() &&
(configuration.getBuddyReplicationConfig() == null ||
!configuration.getBuddyReplicationConfig().isEnabled());
@@ -88,14 +81,22 @@
{
if (interval > 0)
{
- if (task == null)
- task = new ReplicationQueue.MyTask();
- if (timer == null)
+ if (scheduledExecutor == null)
{
- timer = new Timer(true);
- timer.schedule(task,
- 500, // delay before initial flush
- interval); // interval between flushes
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactory()
+ {
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "ReplicationQueue-periodicProcessor-"
+ counter.getAndIncrement());
+ }
+ });
+ scheduledExecutor.scheduleWithFixedDelay(new Runnable()
+ {
+ public void run()
+ {
+ flush();
+ }
+ }, 500l, interval, TimeUnit.MILLISECONDS);
}
}
}
@@ -107,16 +108,11 @@
@Stop
public synchronized void stop()
{
- if (task != null)
+ if (scheduledExecutor != null)
{
- task.cancel();
- task = null;
+ scheduledExecutor.shutdownNow();
}
- if (timer != null)
- {
- timer.cancel();
- timer = null;
- }
+ scheduledExecutor = null;
}
@@ -164,13 +160,4 @@
}
}
}
-
- class MyTask extends TimerTask
- {
- @Override
- public void run()
- {
- flush();
- }
- }
}
\ No newline at end of file
Show replies by date