Author: manik.surtani(a)jboss.com
Date: 2008-08-04 07:20:19 -0400 (Mon, 04 Aug 2008)
New Revision: 6489
Modified:
core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java
core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java
Log:
Refactored async cache loader
Modified: core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java 2008-08-04
11:19:50 UTC (rev 6488)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java 2008-08-04
11:20:19 UTC (rev 6489)
@@ -18,6 +18,11 @@
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -68,6 +73,9 @@
* expiration of messages within a separate thread and keeping other
* operations synchronous for reliability.
* </dd>
+ * <dt>cache.async.threadPoolSize</dt>
+ * <dd>The size of the async processor thread pool. Defaults to
<tt>1</tt>. This
+ * property is new in JBoss Cache 3.0.</dd>
* </dl>
* For increased performance for many smaller transactions, use higher values
* for <code>cache.async.batchSize</code> and
@@ -90,9 +98,10 @@
private static final int DEFAULT_QUEUE_SIZE = 10000;
private AsyncCacheLoaderConfig config;
- private AsyncProcessor processor;
+ private ExecutorService executor;
private AtomicBoolean stopped = new AtomicBoolean(true);
private BlockingQueue<Modification> queue = new
ArrayBlockingQueue<Modification>(DEFAULT_QUEUE_SIZE);
+ private List<Future> processorFutures;
public AsyncCacheLoader()
{
@@ -237,32 +246,52 @@
if (log.isInfoEnabled()) log.info("Async cache loader starting: " +
this);
stopped.set(false);
super.start();
- processor = new AsyncProcessor();
- processor.start();
+ executor = Executors.newFixedThreadPool(config.getThreadPoolSize(), new
ThreadFactory()
+ {
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(r, "AsyncCacheLoader-" +
threadId.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ processorFutures = new ArrayList<Future>(config.getThreadPoolSize());
+ for (int i = 0; i < config.getThreadPoolSize(); i++)
processorFutures.add(executor.submit(new AsyncProcessor()));
}
@Override
public void stop()
{
stopped.set(true);
- if (processor != null)
+ if (executor != null)
{
- processor.stop();
+ for (Future f : processorFutures) f.cancel(true);
+ executor.shutdown();
+ try
+ {
+ boolean terminated = executor.isTerminated();
+ while (!terminated)
+ {
+ terminated = executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
}
+ executor = null;
super.stop();
}
- private void enqueue(Modification mod)
+ private void enqueue(final Modification mod)
throws CacheException, InterruptedException
{
if (stopped.get())
{
throw new CacheException("AsyncCacheLoader stopped; no longer accepting
more entries.");
}
- if (trace)
- {
- log.trace("Enqueuing modification " + mod);
- }
+ if (trace) log.trace("Enqueuing modification " + mod);
queue.put(mod);
}
@@ -273,40 +302,9 @@
*/
private class AsyncProcessor implements Runnable
{
- private Thread t;
-
// Modifications to invoke as a single put
private final List<Modification> mods = new
ArrayList<Modification>(config.getBatchSize());
- public void start()
- {
- if (t == null || !t.isAlive())
- {
- t = new Thread(this, "AsyncCacheLoader-" +
threadId.getAndIncrement());
- t.setDaemon(true);
- t.start();
- }
- }
-
- public void stop()
- {
- if (t != null)
- {
- t.interrupt();
- try
- {
- t.join();
- }
- catch (InterruptedException e)
- {
- }
- }
- if (!queue.isEmpty())
- {
- log.warn("Async queue not yet empty, possibly interrupted");
- }
- }
-
public void run()
{
while (!Thread.interrupted())
@@ -364,16 +362,9 @@
catch (Exception e)
{
if (log.isWarnEnabled()) log.warn("Failed to process async
modifications: " + e);
- log.debug("Exception: ", e);
+ if (log.isDebugEnabled()) log.debug("Exception: ", e);
}
}
-
- @Override
- public String toString()
- {
- return "TQ t=" + t;
- }
-
}
@Override
@@ -381,11 +372,11 @@
{
return super.toString() +
" delegate=[" + super.getCacheLoader() + "]" +
- " processor=" + processor +
" stopped=" + stopped +
" batchSize=" + config.getBatchSize() +
" returnOld=" + config.getReturnOld() +
" asyncPut=" + config.getUseAsyncPut() +
+ " threadPoolSize=" + config.getThreadPoolSize() +
" queue.remainingCapacity()=" + queue.remainingCapacity() +
" queue.peek()=" + queue.peek();
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java 2008-08-04
11:19:50 UTC (rev 6488)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java 2008-08-04
11:20:19 UTC (rev 6489)
@@ -15,6 +15,7 @@
private boolean returnOld = true;
private int queueSize = 0;
private boolean useAsyncPut = true;
+ private int threadPoolSize = 1;
/**
* Default constructor.
@@ -35,6 +36,17 @@
populateFromBaseConfig(base);
}
+ public int getThreadPoolSize()
+ {
+ return threadPoolSize;
+ }
+
+ public void setThreadPoolSize(int threadPoolSize)
+ {
+ testImmutability("threadPoolSize");
+ this.threadPoolSize = threadPoolSize;
+ }
+
public int getBatchSize()
{
return batchSize;
@@ -86,32 +98,24 @@
String s;
s = props.getProperty("cache.async.batchSize");
- if (s != null)
- {
- batchSize = Integer.parseInt(s);
- }
- if (batchSize <= 0)
- {
- throw new IllegalArgumentException("Invalid size: " + batchSize);
- }
+ if (s != null) batchSize = Integer.parseInt(s);
+ if (batchSize <= 0) throw new IllegalArgumentException("Invalid batch size:
" + batchSize);
+ s = props.getProperty("cache.async.threadPoolSize");
+ if (s != null) threadPoolSize = Integer.parseInt(s);
+ if (threadPoolSize <= 0) throw new IllegalArgumentException("Invalid thread
pool size: " + threadPoolSize);
+
+
s = props.getProperty("cache.async.returnOld");
- if (s != null)
- {
- returnOld = Boolean.valueOf(s);
- }
+ if (s != null) returnOld = Boolean.valueOf(s);
s = props.getProperty("cache.async.queueSize");
- if (s != null)
- {
- queueSize = Integer.parseInt(s);
- }
+ if (s != null) queueSize = Integer.parseInt(s);
s = props.getProperty("cache.async.put");
- if (s != null)
- {
- useAsyncPut = Boolean.valueOf(s);
- }
+ if (s != null) useAsyncPut = Boolean.valueOf(s);
+
+
}
@Override
Modified: core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java 2008-08-04
11:19:50 UTC (rev 6488)
+++
core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java 2008-08-04
11:20:19 UTC (rev 6489)
@@ -12,12 +12,14 @@
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
import java.util.HashMap;
+@Test(groups = "functional")
public class AsyncFileCacheLoaderTest extends AbstractCacheLoaderTestBase
{
private CacheSPI<Object, Object> cache;
@@ -150,4 +152,23 @@
assertEquals("X found", true, loader.exists(X));
loader.remove(X);
}
+
+ public void testMultipleThreads() throws Exception
+ {
+ configureCache(
+ "cache.async.queueSize=1\n" +
+ "cache.async.pollWait=10\n" +
+ "cache.async.threadPoolSize=5");
+ CacheLoader loader = cache.getCacheLoaderManager().getCacheLoader();
+ Fqn fqn = Fqn.fromString("/bound");
+ loader.remove(fqn);
+ // You can't really see it block though :-/
+ for (int i = 0; i < 50; i++)
+ {
+ cache.put(fqn, "key" + i, "value1");
+ }
+ Thread.sleep(1000);
+ assertEquals(50, loader.get(fqn).size());
+ loader.remove(fqn);
+ }
}