[infinispan-commits] Infinispan SVN: r1280 - in trunk/core/src/main/java/org/infinispan/loaders: file and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Dec 10 11:30:53 EST 2009
Author: manik.surtani at jboss.com
Date: 2009-12-10 11:30:52 -0500 (Thu, 10 Dec 2009)
New Revision: 1280
Modified:
trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStoreConfig.java
trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStoreConfig.java
Log:
Multithreaded purging
Modified: trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java 2009-12-10 15:44:27 UTC (rev 1279)
+++ trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java 2009-12-10 16:30:52 UTC (rev 1280)
@@ -15,9 +15,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* An abstract {@link org.infinispan.loaders.CacheStore} that holds common implementations for some methods
@@ -30,9 +32,11 @@
private Map<GlobalTransaction, List<? extends Modification>> transactions;
private static Log log = LogFactory.getLog(AbstractCacheStore.class);
private AbstractCacheStoreConfig config;
- private ExecutorService purgerService;
+ protected ExecutorService purgerService;
protected Marshaller marshaller;
protected Cache cache;
+ private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(0);
+ protected boolean multiThreadedPurge = false;
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException{
this.config = (AbstractCacheStoreConfig) config;
@@ -50,18 +54,31 @@
if (config.isPurgeSynchronously()) {
purgerService = new WithinThreadExecutor();
} else {
- purgerService = Executors.newSingleThreadExecutor();
+ multiThreadedPurge = supportsMultiThreadedPurge() && config.getPurgerThreads() > 1;
+ purgerService = Executors.newFixedThreadPool(supportsMultiThreadedPurge() ? config.getPurgerThreads() : 1, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ // Thread name: <cache>-<CacheStore>-<purger>-ID
+ Thread t = new Thread(r, cache.getName() + "-" + config.getCacheLoaderClassName().substring(config.getCacheLoaderClassName().lastIndexOf(".") + 1) + "-" + THREAD_COUNTER.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+ });
}
transactions = new ConcurrentHashMap<GlobalTransaction, List<? extends Modification>>(64, 0.75f, getConcurrencyLevel());
}
+ protected boolean supportsMultiThreadedPurge() {
+ return false;
+ }
+
public void stop() throws CacheLoaderException {
purgerService.shutdownNow();
}
public void purgeExpired() throws CacheLoaderException {
if (purgerService == null)
- throw new IllegalStateException("PurgeService is null (did you call super.start() from cache loader implementation ?");
+ throw new IllegalStateException("purgerService is null (did you call super.start() from cache loader implementation ?");
purgerService.execute(new Runnable() {
public void run() {
try {
@@ -73,8 +90,7 @@
});
}
- protected void purgeInternal() throws CacheLoaderException {
- }
+ protected abstract void purgeInternal() throws CacheLoaderException;
protected void applyModifications(List<? extends Modification> mods) throws CacheLoaderException {
for (Modification m : mods) {
Modified: trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStoreConfig.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStoreConfig.java 2009-12-10 15:44:27 UTC (rev 1279)
+++ trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStoreConfig.java 2009-12-10 16:30:52 UTC (rev 1280)
@@ -17,7 +17,8 @@
* <p/>
* <ul> <li><tt>purgeSynchronously</tt> - whether {@link org.infinispan.loaders.CacheStore#purgeExpired()} calls happen
* synchronously or not. By default, this is set to <tt>false</tt>.</li>
- * <p/>
+ * <li><tt>purgerThreads</tt> - number of threads to use when purging. Defaults to <tt>1</tt> if <tt>purgeSynchronously</tt>
+ * is <tt>true</tt>, ignored if <tt>false</tt>.</li>
* </ul>
*
* <p>
@@ -57,6 +58,9 @@
/** @configRef desc="If true, CacheStore#purgeExpired() call will be done synchronously" */
protected Boolean purgeSynchronously = false;
+ /** @configRef desc="The number of threads to use when purging asynchronously." */
+ protected Integer purgerThreads = 1;
+
protected SingletonStoreConfig singletonStore = new SingletonStoreConfig();
protected AsyncStoreConfig async = new AsyncStoreConfig();
@@ -66,11 +70,21 @@
return purgeSynchronously;
}
+ @XmlAttribute
+ public Integer getPurgerThreads() {
+ return purgerThreads;
+ }
+
public void setPurgeSynchronously(Boolean purgeSynchronously) {
testImmutability("purgeSynchronously");
this.purgeSynchronously = purgeSynchronously;
}
+ public void setPurgerThreads(Integer purgerThreads) {
+ testImmutability("purgerThreads");
+ this.purgerThreads = purgerThreads;
+ }
+
@XmlAttribute
public Boolean isFetchPersistentState() {
return fetchPersistentState;
@@ -145,7 +159,8 @@
&& (this.fetchPersistentState.equals(other.fetchPersistentState))
&& Util.safeEquals(this.singletonStore, other.singletonStore)
&& Util.safeEquals(this.async, other.async)
- && Util.safeEquals(this.purgeSynchronously, other.purgeSynchronously);
+ && Util.safeEquals(this.purgeSynchronously, other.purgeSynchronously)
+ && Util.safeEquals(this.purgerThreads, other.purgerThreads);
}
@Override
@@ -161,6 +176,7 @@
result = 31 * result + (singletonStore == null ? 0 : singletonStore.hashCode());
result = 31 * result + (async == null ? 0 : async.hashCode());
result = 31 * result + (purgeOnStartup ? 0 : 1);
+ result = 31 * result + (purgerThreads);
return result;
}
@@ -174,6 +190,7 @@
.append(", singletonStore{").append(singletonStore).append('}')
.append(", async{").append(async).append('}')
.append(", purgeSynchronously{").append(purgeSynchronously).append('}')
+ .append(", purgerThreads{").append(purgerThreads).append('}')
.toString();
}
Modified: trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java 2009-12-10 15:44:27 UTC (rev 1279)
+++ trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java 2009-12-10 16:30:52 UTC (rev 1280)
@@ -34,6 +34,7 @@
public class FileCacheStore extends BucketBasedCacheStore {
private static final Log log = LogFactory.getLog(FileCacheStore.class);
+ private static final boolean trace = log.isTraceEnabled();
private int streamBufferSize;
FileCacheStoreConfig config;
@@ -83,7 +84,7 @@
} else {
bytesRead = objectInput.read(buffer, 0, numBytes - totalBytesRead);
}
-
+
if (bytesRead == -1) break;
totalBytesRead += bytesRead;
bos.write(buffer, 0, bytesRead);
@@ -145,8 +146,38 @@
}
}
+ @Override
+ protected boolean supportsMultiThreadedPurge() {
+ return true;
+ }
+
protected void purgeInternal() throws CacheLoaderException {
- loadAll();
+ if (trace) log.trace("purgeInternal()");
+ acquireGlobalLock(false);
+ try {
+ for (final File bucketFile : root.listFiles()) {
+ if (multiThreadedPurge) {
+ purgerService.execute(new Runnable() {
+ @Override
+ public void run() {
+ Bucket bucket;
+ try {
+ if ((bucket = loadBucket(bucketFile)) != null && bucket.removeExpiredEntries())
+ saveBucket(bucket);
+ } catch (CacheLoaderException e) {
+ log.warn("Problems purging file " + bucketFile, e);
+ }
+ }
+ });
+ } else {
+ Bucket bucket;
+ if ((bucket = loadBucket(bucketFile)) != null && bucket.removeExpiredEntries()) saveBucket(bucket);
+ }
+ }
+ } finally {
+ releaseGlobalLock(false);
+ if (trace) log.trace("Exit purgeInternal()");
+ }
}
protected Bucket loadBucket(String bucketName) throws CacheLoaderException {
@@ -215,9 +246,9 @@
location = "Infinispan-FileCacheStore"; // use relative path!
location += File.separator + cache.getName();
root = new File(location);
- if(!root.exists()) {
+ if (!root.exists()) {
if (!root.mkdirs()) {
- log.warn("Problems creating the directory: " + root);
+ log.warn("Problems creating the directory: " + root);
}
}
if (!root.exists()) {
Modified: trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStoreConfig.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStoreConfig.java 2009-12-10 15:44:27 UTC (rev 1279)
+++ trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStoreConfig.java 2009-12-10 16:30:52 UTC (rev 1280)
@@ -6,16 +6,22 @@
* Configures {@link org.infinispan.loaders.file.FileCacheStore}. This allows you to tune a number of characteristics
* of the {@link FileCacheStore}.
* <p/>
- * <ul> <li><tt>location</tt> - a location on disk where the store can write internal files. This defaults to
- * <tt>Infinispan-FileCacheStore</tt> in the current working directory.</li> <li><tt>purgeSynchronously</tt> - whether
- * {@link org.infinispan.loaders.CacheStore#purgeExpired()} calls happen synchronously or not. By default, this is set
- * to <tt>false</tt>.</li> <li><tt>streamBufferSize</tt> - when writing state to disk, a buffered stream is used. This
+ * <ul>
+ * <li><tt>location</tt> - a location on disk where the store can write internal files. This defaults to
+ * <tt>Infinispan-FileCacheStore</tt> in the current working directory.</li>
+ * <li><tt>purgeSynchronously</tt> - whether {@link org.infinispan.loaders.CacheStore#purgeExpired()} calls happen
+ * synchronously or not. By default, this is set to <tt>false</tt>.</li>
+ * <li><tt>purgerThreads</tt> - number of threads to use when purging. Defaults to <tt>1</tt> if <tt>purgeSynchronously</tt>
+ * is <tt>true</tt>, ignored if <tt>false</tt>.</li>
+ * <li><tt>streamBufferSize</tt> - when writing state to disk, a buffered stream is used. This
* parameter allows you to tune the buffer size. Larger buffers are usually faster but take up more (temporary) memory,
- * resulting in more gc. By default, this is set to <tt>8192</tt>.</li> <li><tt>lockConcurrencyLevel</tt> - locking
- * granularity is per file bucket. This setting defines the number of shared locks to use. The more locks you have,
- * the better your concurrency will be, but more locks take up more memory. By default, this is set to
- * <tt>2048</tt>.</li> <li><tt>lockAcquistionTimeout</tt> - the length of time, in milliseconds, to wait for locks
- * before timing out and throwing an exception. By default, this is set to <tt>60000</tt>.</li> </ul>
+ * resulting in more gc. By default, this is set to <tt>8192</tt>.</li>
+ * <li><tt>lockConcurrencyLevel</tt> - locking granularity is per file bucket. This setting defines the number of
+ * shared locks to use. The more locks you have, the better your concurrency will be, but more locks take up more
+ * memory. By default, this is set to <tt>2048</tt>.</li>
+ * <li><tt>lockAcquistionTimeout</tt> - the length of time, in milliseconds, to wait for locks
+ * before timing out and throwing an exception. By default, this is set to <tt>60000</tt>.</li>
+ * </ul>
*
* @author Manik Surtani
* @autor Vladimir Blagojevic
More information about the infinispan-commits
mailing list