Author: manik.surtani(a)jboss.com
Date: 2008-10-06 07:04:49 -0400 (Mon, 06 Oct 2008)
New Revision: 6840
Modified:
core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/config/RuntimeConfig.java
core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
Log:
JBCACHE-1422: Allow thread pools and executors to be injected via the RuntimeConfig
Modified: core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java 2008-10-06 10:45:52
UTC (rev 6839)
+++ core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java 2008-10-06 11:04:49
UTC (rev 6840)
@@ -138,7 +138,8 @@
setDefaultInactive(configuration.isInactiveOnStartup());
- if (isUsingEvictions())
evictionTimerTask.init(evictionConfig.getWakeupInterval());
+ if (isUsingEvictions())
+ evictionTimerTask.init(evictionConfig.getWakeupInterval(),
configuration.getRuntimeConfig().getEvictionTimerThreadFactory());
}
@Stop
@@ -732,7 +733,7 @@
*/
public void startEvictionThread()
{
- evictionTimerTask.init(evictionConfig.getWakeupInterval());
+ evictionTimerTask.init(evictionConfig.getWakeupInterval(),
configuration.getRuntimeConfig().getEvictionTimerThreadFactory());
}
/**
Modified: core/trunk/src/main/java/org/jboss/cache/config/RuntimeConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/RuntimeConfig.java 2008-10-06 10:45:52
UTC (rev 6839)
+++ core/trunk/src/main/java/org/jboss/cache/config/RuntimeConfig.java 2008-10-06 11:04:49
UTC (rev 6840)
@@ -26,8 +26,10 @@
import org.jboss.cache.util.Util;
import org.jgroups.Channel;
import org.jgroups.ChannelFactory;
+import org.jgroups.util.ThreadFactory;
import javax.transaction.TransactionManager;
+import java.util.concurrent.ExecutorService;
public class RuntimeConfig extends ConfigurationComponent
{
@@ -41,6 +43,9 @@
private transient ChannelFactory muxChannelFactory;
private transient BuddyGroup buddyGroup;
private RPCManager rpcManager;
+ private transient ThreadFactory evictionTimerThreadFactory;
+ private transient ExecutorService asyncSerializationExecutor;
+ private transient ExecutorService asyncCacheListenerExecutor;
/**
* Resets the runtime to default values.
@@ -141,6 +146,81 @@
this.transactionManager = transactionManager;
}
+ /**
+ * This is only relevant if an eviction timer thread factory has been set using {@link
#setEvictionTimerThreadFactory(org.jgroups.util.ThreadFactory)}.
+ * Will return a null if the eviction timer thread factory needs to be created
internally.
+ * <p/>
+ *
+ * @return the thread factory used by the eviction timer's scheduled executor.
+ * @since 3.0
+ */
+ public ThreadFactory getEvictionTimerThreadFactory()
+ {
+ return evictionTimerThreadFactory;
+ }
+
+ /**
+ * Sets the eviction timer thread factory to use when creating a scheduled executor.
If this is not set, the eviction
+ * timer task will use a default thread factory.
+ *
+ * @param evictionTimerThreadFactory factory to use
+ * @since 3.0
+ */
+ public void setEvictionTimerThreadFactory(ThreadFactory evictionTimerThreadFactory)
+ {
+ this.evictionTimerThreadFactory = evictionTimerThreadFactory;
+ }
+
+ /**
+ * This is only relevant if the async cache replication executor has been set using
{@link #setAsyncSerializationExecutor(java.util.concurrent.ExecutorService)}.
+ * If the executor is created internally, this method will return null.
+ * <p/>
+ *
+ * @return the executor used for async replication.
+ * @since 3.0
+ */
+ public ExecutorService getAsyncSerializationExecutor()
+ {
+ return asyncSerializationExecutor;
+ }
+
+ /**
+ * This is used to set the executor to use for async cache replucation, and
effectively overrides {@link
org.jboss.cache.config.Configuration#setSerializationExecutorPoolSize(int)}
+ * <p/>
+ *
+ * @param asyncSerializationExecutor executor to set
+ * @since 3.0
+ */
+ public void setAsyncSerializationExecutor(ExecutorService asyncSerializationExecutor)
+ {
+ this.asyncSerializationExecutor = asyncSerializationExecutor;
+ }
+
+ /**
+ * This is only relevant if the async cache listener executor has been set using
{@link #setAsyncCacheListenerExecutor(java.util.concurrent.ExecutorService)}.
+ * If the executor is created internally, this method will return null.
+ * <p/>
+ *
+ * @return the executor to use for async cache listeners
+ * @since 3.0
+ */
+ public ExecutorService getAsyncCacheListenerExecutor()
+ {
+ return asyncCacheListenerExecutor;
+ }
+
+ /**
+ * This is used to set the executor to use for async cache listeners, and effectively
overrides {@link org.jboss.cache.config.Configuration#setListenerAsyncPoolSize(int)}
+ * <p/>
+ *
+ * @param asyncCacheListenerExecutor the executor to use for async cache listeners
+ * @since 3.0
+ */
+ public void setAsyncCacheListenerExecutor(ExecutorService asyncCacheListenerExecutor)
+ {
+ this.asyncCacheListenerExecutor = asyncCacheListenerExecutor;
+ }
+
@Override
public boolean equals(Object obj)
{
@@ -155,7 +235,10 @@
return Util.safeEquals(transactionManager, other.transactionManager)
&& Util.safeEquals(muxChannelFactory, other.muxChannelFactory)
&& Util.safeEquals(rpcManager, other.rpcManager)
- && Util.safeEquals(channel, other.channel);
+ && Util.safeEquals(channel, other.channel)
+ && Util.safeEquals(evictionTimerThreadFactory,
other.evictionTimerThreadFactory)
+ && Util.safeEquals(asyncCacheListenerExecutor,
other.asyncCacheListenerExecutor)
+ && Util.safeEquals(asyncSerializationExecutor,
other.asyncSerializationExecutor);
}
return false;
@@ -169,6 +252,9 @@
result = result * 29 + (muxChannelFactory == null ? 0 :
muxChannelFactory.hashCode());
result = result * 29 + (rpcManager == null ? 0 : rpcManager.hashCode());
result = result * 29 + (channel == null ? 0 : channel.hashCode());
+ result = result * 29 + (evictionTimerThreadFactory == null ? 0 :
evictionTimerThreadFactory.hashCode());
+ result = result * 29 + (asyncCacheListenerExecutor == null ? 0 :
asyncCacheListenerExecutor.hashCode());
+ result = result * 29 + (asyncSerializationExecutor == null ? 0 :
asyncSerializationExecutor.hashCode());
return result;
}
Modified: core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java 2008-10-06
10:45:52 UTC (rev 6839)
+++ core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java 2008-10-06
11:04:49 UTC (rev 6840)
@@ -58,12 +58,12 @@
task = new Task();
}
- public void init(long wakeupInterval)
+ public void init(long wakeupInterval, ThreadFactory evictionThreadFactory)
{
if (log.isTraceEnabled())
log.trace("Creating a new eviction listener with wakeupInterval millis set
at " + wakeupInterval);
this.wakeupInterval = wakeupInterval;
- start();
+ start(evictionThreadFactory);
}
/**
@@ -107,7 +107,7 @@
scheduledExecutor = null;
}
- private void start()
+ private void start(ThreadFactory tf)
{
if (wakeupInterval < 1)
{
@@ -115,7 +115,8 @@
log.info("Wakeup Interval set to " + wakeupInterval + ". Not
starting an eviction thread!");
return;
}
- scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
+
+ if (tf == null) tf = new ThreadFactory()
{
public Thread newThread(Runnable r)
{
@@ -123,7 +124,9 @@
t.setDaemon(true);
return t;
}
- });
+ };
+
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor(tf);
scheduledExecutor.scheduleWithFixedDelay(task, wakeupInterval, wakeupInterval,
TimeUnit.MILLISECONDS);
}
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-10-06
10:45:52 UTC (rev 6839)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-10-06
11:04:49 UTC (rev 6840)
@@ -86,7 +86,9 @@
// what sort of a repl processor do we need?
Configuration c = componentRegistry.getComponent(Configuration.class);
- if (c.getCacheMode().isSynchronous() || c.getSerializationExecutorPoolSize() <
1)
+ replicationProcessor = c.getRuntimeConfig().getAsyncSerializationExecutor();
+ if (c.getCacheMode().isSynchronous() ||
+ (replicationProcessor == null && c.getSerializationExecutorPoolSize()
< 1)) // if an executor has not been injected and the pool size is set
{
// in-process thread. Not async.
replicationProcessor = new WithinThreadExecutor();
@@ -95,16 +97,19 @@
else
{
asyncSerial = true;
- replicationProcessorCount = new AtomicInteger(0);
- replicationProcessor = Executors.newFixedThreadPool(c.isUseReplQueue() ? 1 :
c.getSerializationExecutorPoolSize(),
- new ThreadFactory()
- {
- public Thread newThread(Runnable r)
+ if (replicationProcessor == null)
+ {
+ replicationProcessorCount = new AtomicInteger(0);
+ replicationProcessor = Executors.newFixedThreadPool(c.isUseReplQueue() ? 1 :
c.getSerializationExecutorPoolSize(),
+ new ThreadFactory()
{
- return new Thread(r, "AsyncReplicationProcessor-" +
replicationProcessorCount.incrementAndGet());
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "AsyncReplicationProcessor-" +
replicationProcessorCount.incrementAndGet());
+ }
}
- }
- );
+ );
+ }
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java 2008-10-06
10:45:52 UTC (rev 6839)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java 2008-10-06
11:04:49 UTC (rev 6840)
@@ -162,21 +162,27 @@
{
useMarshalledValueMaps = config.isUseLazyDeserialization();
syncProcessor = new WithinThreadExecutor();
- if (config.getListenerAsyncPoolSize() > 0)
+
+ // first try and use an injected executor for async listeners
+ if ((asyncProcessor = config.getRuntimeConfig().getAsyncCacheListenerExecutor()) ==
null)
{
- asyncProcessor = Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(),
new ThreadFactory()
+ // create one if needed
+ if (config.getListenerAsyncPoolSize() > 0)
{
- public Thread newThread(Runnable r)
+ asyncProcessor =
Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
{
- return new Thread(r, "AsyncNotifier-" +
asyncNotifierThreadNumber.getAndIncrement());
- }
- });
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "AsyncNotifier-" +
asyncNotifierThreadNumber.getAndIncrement());
+ }
+ });
+ }
+ else
+ {
+ // use the same sync executor
+ asyncProcessor = syncProcessor;
+ }
}
- else
- {
- // use the same sync executor
- asyncProcessor = syncProcessor;
- }
}
/**