[jbosscache-commits] JBoss Cache SVN: r6489 - in core/trunk/src: test/java/org/jboss/cache/loader and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Aug 4 07:20:19 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-08-04 07:20:19 -0400 (Mon, 04 Aug 2008)
New Revision: 6489

Modified:
   core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java
   core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java
   core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java
Log:
Refactored async cache loader

Modified: core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java	2008-08-04 11:19:50 UTC (rev 6488)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java	2008-08-04 11:20:19 UTC (rev 6489)
@@ -18,6 +18,11 @@
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -68,6 +73,9 @@
  * expiration of messages within a separate thread and keeping other
  * operations synchronous for reliability.
  * </dd>
+ * <dt>cache.async.threadPoolSize</dt>
+ * <dd>The size of the async processor thread pool.  Defaults to <tt>1</tt>.  This
+ * property is new in JBoss Cache 3.0.</dd>
  * </dl>
  * For increased performance for many smaller transactions, use higher values
  * for <code>cache.async.batchSize</code> and
@@ -90,9 +98,10 @@
    private static final int DEFAULT_QUEUE_SIZE = 10000;
 
    private AsyncCacheLoaderConfig config;
-   private AsyncProcessor processor;
+   private ExecutorService executor;
    private AtomicBoolean stopped = new AtomicBoolean(true);
    private BlockingQueue<Modification> queue = new ArrayBlockingQueue<Modification>(DEFAULT_QUEUE_SIZE);
+   private List<Future> processorFutures;
 
    public AsyncCacheLoader()
    {
@@ -237,32 +246,52 @@
       if (log.isInfoEnabled()) log.info("Async cache loader starting: " + this);
       stopped.set(false);
       super.start();
-      processor = new AsyncProcessor();
-      processor.start();
+      executor = Executors.newFixedThreadPool(config.getThreadPoolSize(), new ThreadFactory()
+      {
+         public Thread newThread(Runnable r)
+         {
+            Thread t = new Thread(r, "AsyncCacheLoader-" + threadId.getAndIncrement());
+            t.setDaemon(true);
+            return t;
+         }
+      });
+      processorFutures = new ArrayList<Future>(config.getThreadPoolSize());
+      for (int i = 0; i < config.getThreadPoolSize(); i++) processorFutures.add(executor.submit(new AsyncProcessor()));
    }
 
    @Override
    public void stop()
    {
       stopped.set(true);
-      if (processor != null)
+      if (executor != null)
       {
-         processor.stop();
+         for (Future f : processorFutures) f.cancel(true);
+         executor.shutdown();
+         try
+         {
+            boolean terminated = executor.isTerminated();
+            while (!terminated)
+            {
+               terminated = executor.awaitTermination(60, TimeUnit.SECONDS);
+            }
+         }
+         catch (InterruptedException e)
+         {
+            Thread.currentThread().interrupt();
+         }
       }
+      executor = null;
       super.stop();
    }
 
-   private void enqueue(Modification mod)
+   private void enqueue(final Modification mod)
          throws CacheException, InterruptedException
    {
       if (stopped.get())
       {
          throw new CacheException("AsyncCacheLoader stopped; no longer accepting more entries.");
       }
-      if (trace)
-      {
-         log.trace("Enqueuing modification " + mod);
-      }
+      if (trace) log.trace("Enqueuing modification " + mod);
       queue.put(mod);
    }
 
@@ -273,40 +302,9 @@
     */
    private class AsyncProcessor implements Runnable
    {
-      private Thread t;
-
       // Modifications to invoke as a single put
       private final List<Modification> mods = new ArrayList<Modification>(config.getBatchSize());
 
-      public void start()
-      {
-         if (t == null || !t.isAlive())
-         {
-            t = new Thread(this, "AsyncCacheLoader-" + threadId.getAndIncrement());
-            t.setDaemon(true);
-            t.start();
-         }
-      }
-
-      public void stop()
-      {
-         if (t != null)
-         {
-            t.interrupt();
-            try
-            {
-               t.join();
-            }
-            catch (InterruptedException e)
-            {
-            }
-         }
-         if (!queue.isEmpty())
-         {
-            log.warn("Async queue not yet empty, possibly interrupted");
-         }
-      }
-
       public void run()
       {
          while (!Thread.interrupted())
@@ -364,16 +362,9 @@
          catch (Exception e)
          {
             if (log.isWarnEnabled()) log.warn("Failed to process async modifications: " + e);
-            log.debug("Exception: ", e);
+            if (log.isDebugEnabled()) log.debug("Exception: ", e);
          }
       }
-
-      @Override
-      public String toString()
-      {
-         return "TQ t=" + t;
-      }
-
    }
 
    @Override
@@ -381,11 +372,11 @@
    {
       return super.toString() +
             " delegate=[" + super.getCacheLoader() + "]" +
-            " processor=" + processor +
             " stopped=" + stopped +
             " batchSize=" + config.getBatchSize() +
             " returnOld=" + config.getReturnOld() +
             " asyncPut=" + config.getUseAsyncPut() +
+            " threadPoolSize=" + config.getThreadPoolSize() +
             " queue.remainingCapacity()=" + queue.remainingCapacity() +
             " queue.peek()=" + queue.peek();
    }

Modified: core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java	2008-08-04 11:19:50 UTC (rev 6488)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java	2008-08-04 11:20:19 UTC (rev 6489)
@@ -15,6 +15,7 @@
    private boolean returnOld = true;
    private int queueSize = 0;
    private boolean useAsyncPut = true;
+   private int threadPoolSize = 1;
 
    /**
     * Default constructor.
@@ -35,6 +36,17 @@
       populateFromBaseConfig(base);
    }
 
+   public int getThreadPoolSize()
+   {
+      return threadPoolSize;
+   }
+
+   public void setThreadPoolSize(int threadPoolSize)
+   {
+      testImmutability("threadPoolSize");
+      this.threadPoolSize = threadPoolSize;
+   }
+
    public int getBatchSize()
    {
       return batchSize;
@@ -86,32 +98,24 @@
       String s;
 
       s = props.getProperty("cache.async.batchSize");
-      if (s != null)
-      {
-         batchSize = Integer.parseInt(s);
-      }
-      if (batchSize <= 0)
-      {
-         throw new IllegalArgumentException("Invalid size: " + batchSize);
-      }
+      if (s != null) batchSize = Integer.parseInt(s);
+      if (batchSize <= 0) throw new IllegalArgumentException("Invalid batch size: " + batchSize);
 
+      s = props.getProperty("cache.async.threadPoolSize");
+      if (s != null) threadPoolSize = Integer.parseInt(s);
+      if (threadPoolSize <= 0) throw new IllegalArgumentException("Invalid thread pool size: " + threadPoolSize);
+
+
       s = props.getProperty("cache.async.returnOld");
-      if (s != null)
-      {
-         returnOld = Boolean.valueOf(s);
-      }
+      if (s != null) returnOld = Boolean.valueOf(s);
 
       s = props.getProperty("cache.async.queueSize");
-      if (s != null)
-      {
-         queueSize = Integer.parseInt(s);
-      }
+      if (s != null) queueSize = Integer.parseInt(s);
 
       s = props.getProperty("cache.async.put");
-      if (s != null)
-      {
-         useAsyncPut = Boolean.valueOf(s);
-      }
+      if (s != null) useAsyncPut = Boolean.valueOf(s);
+
+
    }
 
    @Override

Modified: core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java	2008-08-04 11:19:50 UTC (rev 6488)
+++ core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java	2008-08-04 11:20:19 UTC (rev 6489)
@@ -12,12 +12,14 @@
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.util.Collections;
 import java.util.HashMap;
 
+ at Test(groups = "functional")
 public class AsyncFileCacheLoaderTest extends AbstractCacheLoaderTestBase
 {
    private CacheSPI<Object, Object> cache;
@@ -150,4 +152,23 @@
       assertEquals("X found", true, loader.exists(X));
       loader.remove(X);
    }
+
+   public void testMultipleThreads() throws Exception
+   {
+      configureCache(
+            "cache.async.queueSize=1\n" +
+                  "cache.async.pollWait=10\n" +
+                  "cache.async.threadPoolSize=5");
+      CacheLoader loader = cache.getCacheLoaderManager().getCacheLoader();
+      Fqn fqn = Fqn.fromString("/bound");
+      loader.remove(fqn);
+      // You can't really see it block though :-/
+      for (int i = 0; i < 50; i++)
+      {
+         cache.put(fqn, "key" + i, "value1");
+      }
+      Thread.sleep(1000);
+      assertEquals(50, loader.get(fqn).size());
+      loader.remove(fqn);
+   }
 }




More information about the jbosscache-commits mailing list