[jbosscache-commits] JBoss Cache SVN: r6840 - in core/trunk/src/main/java/org/jboss/cache: config and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Oct 6 07:04:50 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-10-06 07:04:49 -0400 (Mon, 06 Oct 2008)
New Revision: 6840

Modified:
   core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/config/RuntimeConfig.java
   core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java
   core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
   core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
Log:
JBCACHE-1422:  Allow thread pools and executors to be injected via the RuntimeConfig

Modified: core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java	2008-10-06 10:45:52 UTC (rev 6839)
+++ core/trunk/src/main/java/org/jboss/cache/RegionManagerImpl.java	2008-10-06 11:04:49 UTC (rev 6840)
@@ -138,7 +138,8 @@
 
       setDefaultInactive(configuration.isInactiveOnStartup());
 
-      if (isUsingEvictions()) evictionTimerTask.init(evictionConfig.getWakeupInterval());
+      if (isUsingEvictions())
+         evictionTimerTask.init(evictionConfig.getWakeupInterval(), configuration.getRuntimeConfig().getEvictionTimerThreadFactory());
    }
 
    @Stop
@@ -732,7 +733,7 @@
     */
    public void startEvictionThread()
    {
-      evictionTimerTask.init(evictionConfig.getWakeupInterval());
+      evictionTimerTask.init(evictionConfig.getWakeupInterval(), configuration.getRuntimeConfig().getEvictionTimerThreadFactory());
    }
 
    /**

Modified: core/trunk/src/main/java/org/jboss/cache/config/RuntimeConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/RuntimeConfig.java	2008-10-06 10:45:52 UTC (rev 6839)
+++ core/trunk/src/main/java/org/jboss/cache/config/RuntimeConfig.java	2008-10-06 11:04:49 UTC (rev 6840)
@@ -26,8 +26,10 @@
 import org.jboss.cache.util.Util;
 import org.jgroups.Channel;
 import org.jgroups.ChannelFactory;
+import org.jgroups.util.ThreadFactory;
 
 import javax.transaction.TransactionManager;
+import java.util.concurrent.ExecutorService;
 
 public class RuntimeConfig extends ConfigurationComponent
 {
@@ -41,6 +43,9 @@
    private transient ChannelFactory muxChannelFactory;
    private transient BuddyGroup buddyGroup;
    private RPCManager rpcManager;
+   private transient ThreadFactory evictionTimerThreadFactory;
+   private transient ExecutorService asyncSerializationExecutor;
+   private transient ExecutorService asyncCacheListenerExecutor;
 
    /**
     * Resets the runtime to default values.
@@ -141,6 +146,81 @@
       this.transactionManager = transactionManager;
    }
 
+   /**
+    * This is only relevant if an eviction timer thread factory has been set using {@link #setEvictionTimerThreadFactory(org.jgroups.util.ThreadFactory)}.
+    * Will return a null if the eviction timer thread factory needs to be created internally.
+    * <p/>
+    *
+    * @return the thread factory used by the eviction timer's scheduled executor.
+    * @since 3.0
+    */
+   public ThreadFactory getEvictionTimerThreadFactory()
+   {
+      return evictionTimerThreadFactory;
+   }
+
+   /**
+    * Sets the eviction timer thread factory to use when creating a scheduled executor.  If this is not set, the eviction
+    * timer task will use a default thread factory.
+    *
+    * @param evictionTimerThreadFactory factory to use
+    * @since 3.0
+    */
+   public void setEvictionTimerThreadFactory(ThreadFactory evictionTimerThreadFactory)
+   {
+      this.evictionTimerThreadFactory = evictionTimerThreadFactory;
+   }
+
+   /**
+    * This is only relevant if the async cache replication executor has been set using {@link #setAsyncSerializationExecutor(java.util.concurrent.ExecutorService)}.
+    * If the executor is created internally, this method will return null.
+    * <p/>
+    *
+    * @return the executor used for async replication.
+    * @since 3.0
+    */
+   public ExecutorService getAsyncSerializationExecutor()
+   {
+      return asyncSerializationExecutor;
+   }
+
+   /**
+    * This is used to set the executor to use for async cache replucation, and effectively overrides {@link org.jboss.cache.config.Configuration#setSerializationExecutorPoolSize(int)}
+    * <p/>
+    *
+    * @param asyncSerializationExecutor executor to set
+    * @since 3.0
+    */
+   public void setAsyncSerializationExecutor(ExecutorService asyncSerializationExecutor)
+   {
+      this.asyncSerializationExecutor = asyncSerializationExecutor;
+   }
+
+   /**
+    * This is only relevant if the async cache listener executor has been set using {@link #setAsyncCacheListenerExecutor(java.util.concurrent.ExecutorService)}.
+    * If the executor is created internally, this method will return null.
+    * <p/>
+    *
+    * @return the executor to use for async cache listeners
+    * @since 3.0
+    */
+   public ExecutorService getAsyncCacheListenerExecutor()
+   {
+      return asyncCacheListenerExecutor;
+   }
+
+   /**
+    * This is used to set the executor to use for async cache listeners, and effectively overrides {@link org.jboss.cache.config.Configuration#setListenerAsyncPoolSize(int)}
+    * <p/>
+    *
+    * @param asyncCacheListenerExecutor the executor to use for async cache listeners
+    * @since 3.0
+    */
+   public void setAsyncCacheListenerExecutor(ExecutorService asyncCacheListenerExecutor)
+   {
+      this.asyncCacheListenerExecutor = asyncCacheListenerExecutor;
+   }
+
    @Override
    public boolean equals(Object obj)
    {
@@ -155,7 +235,10 @@
          return Util.safeEquals(transactionManager, other.transactionManager)
                && Util.safeEquals(muxChannelFactory, other.muxChannelFactory)
                && Util.safeEquals(rpcManager, other.rpcManager)
-               && Util.safeEquals(channel, other.channel);
+               && Util.safeEquals(channel, other.channel)
+               && Util.safeEquals(evictionTimerThreadFactory, other.evictionTimerThreadFactory)
+               && Util.safeEquals(asyncCacheListenerExecutor, other.asyncCacheListenerExecutor)
+               && Util.safeEquals(asyncSerializationExecutor, other.asyncSerializationExecutor);
       }
 
       return false;
@@ -169,6 +252,9 @@
       result = result * 29 + (muxChannelFactory == null ? 0 : muxChannelFactory.hashCode());
       result = result * 29 + (rpcManager == null ? 0 : rpcManager.hashCode());
       result = result * 29 + (channel == null ? 0 : channel.hashCode());
+      result = result * 29 + (evictionTimerThreadFactory == null ? 0 : evictionTimerThreadFactory.hashCode());
+      result = result * 29 + (asyncCacheListenerExecutor == null ? 0 : asyncCacheListenerExecutor.hashCode());
+      result = result * 29 + (asyncSerializationExecutor == null ? 0 : asyncSerializationExecutor.hashCode());
       return result;
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java	2008-10-06 10:45:52 UTC (rev 6839)
+++ core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java	2008-10-06 11:04:49 UTC (rev 6840)
@@ -58,12 +58,12 @@
       task = new Task();
    }
 
-   public void init(long wakeupInterval)
+   public void init(long wakeupInterval, ThreadFactory evictionThreadFactory)
    {
       if (log.isTraceEnabled())
          log.trace("Creating a new eviction listener with wakeupInterval millis set at " + wakeupInterval);
       this.wakeupInterval = wakeupInterval;
-      start();
+      start(evictionThreadFactory);
    }
 
    /**
@@ -107,7 +107,7 @@
       scheduledExecutor = null;
    }
 
-   private void start()
+   private void start(ThreadFactory tf)
    {
       if (wakeupInterval < 1)
       {
@@ -115,7 +115,8 @@
             log.info("Wakeup Interval set to " + wakeupInterval + ".  Not starting an eviction thread!");
          return;
       }
-      scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
+
+      if (tf == null) tf = new ThreadFactory()
       {
          public Thread newThread(Runnable r)
          {
@@ -123,7 +124,9 @@
             t.setDaemon(true);
             return t;
          }
-      });
+      };
+
+      scheduledExecutor = Executors.newSingleThreadScheduledExecutor(tf);
       scheduledExecutor.scheduleWithFixedDelay(task, wakeupInterval, wakeupInterval, TimeUnit.MILLISECONDS);
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2008-10-06 10:45:52 UTC (rev 6839)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2008-10-06 11:04:49 UTC (rev 6840)
@@ -86,7 +86,9 @@
 
       // what sort of a repl processor do we need?
       Configuration c = componentRegistry.getComponent(Configuration.class);
-      if (c.getCacheMode().isSynchronous() || c.getSerializationExecutorPoolSize() < 1)
+      replicationProcessor = c.getRuntimeConfig().getAsyncSerializationExecutor();
+      if (c.getCacheMode().isSynchronous() ||
+            (replicationProcessor == null && c.getSerializationExecutorPoolSize() < 1)) // if an executor has not been injected and the pool size is set
       {
          // in-process thread.  Not async.
          replicationProcessor = new WithinThreadExecutor();
@@ -95,16 +97,19 @@
       else
       {
          asyncSerial = true;
-         replicationProcessorCount = new AtomicInteger(0);
-         replicationProcessor = Executors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
-               new ThreadFactory()
-               {
-                  public Thread newThread(Runnable r)
+         if (replicationProcessor == null)
+         {
+            replicationProcessorCount = new AtomicInteger(0);
+            replicationProcessor = Executors.newFixedThreadPool(c.isUseReplQueue() ? 1 : c.getSerializationExecutorPoolSize(),
+                  new ThreadFactory()
                   {
-                     return new Thread(r, "AsyncReplicationProcessor-" + replicationProcessorCount.incrementAndGet());
+                     public Thread newThread(Runnable r)
+                     {
+                        return new Thread(r, "AsyncReplicationProcessor-" + replicationProcessorCount.incrementAndGet());
+                     }
                   }
-               }
-         );
+            );
+         }
       }
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java	2008-10-06 10:45:52 UTC (rev 6839)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java	2008-10-06 11:04:49 UTC (rev 6840)
@@ -162,21 +162,27 @@
    {
       useMarshalledValueMaps = config.isUseLazyDeserialization();
       syncProcessor = new WithinThreadExecutor();
-      if (config.getListenerAsyncPoolSize() > 0)
+
+      // first try and use an injected executor for async listeners
+      if ((asyncProcessor = config.getRuntimeConfig().getAsyncCacheListenerExecutor()) == null)
       {
-         asyncProcessor = Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
+         // create one if needed
+         if (config.getListenerAsyncPoolSize() > 0)
          {
-            public Thread newThread(Runnable r)
+            asyncProcessor = Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
             {
-               return new Thread(r, "AsyncNotifier-" + asyncNotifierThreadNumber.getAndIncrement());
-            }
-         });
+               public Thread newThread(Runnable r)
+               {
+                  return new Thread(r, "AsyncNotifier-" + asyncNotifierThreadNumber.getAndIncrement());
+               }
+            });
+         }
+         else
+         {
+            // use the same sync executor
+            asyncProcessor = syncProcessor;
+         }
       }
-      else
-      {
-         // use the same sync executor
-         asyncProcessor = syncProcessor;
-      }
    }
 
    /**




More information about the jbosscache-commits mailing list