[infinispan-commits] Infinispan SVN: r1280 - in trunk/core/src/main/java/org/infinispan/loaders: file and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Dec 10 11:30:53 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-12-10 11:30:52 -0500 (Thu, 10 Dec 2009)
New Revision: 1280

Modified:
   trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
   trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStoreConfig.java
   trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
   trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStoreConfig.java
Log:
Multithreaded purging

Modified: trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java	2009-12-10 15:44:27 UTC (rev 1279)
+++ trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java	2009-12-10 16:30:52 UTC (rev 1280)
@@ -15,9 +15,11 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * An abstract {@link org.infinispan.loaders.CacheStore} that holds common implementations for some methods
@@ -30,9 +32,11 @@
    private Map<GlobalTransaction, List<? extends Modification>> transactions;
    private static Log log = LogFactory.getLog(AbstractCacheStore.class);
    private AbstractCacheStoreConfig config;
-   private ExecutorService purgerService;
+   protected ExecutorService purgerService;
    protected Marshaller marshaller;
    protected Cache cache;
+   private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(0);
+   protected boolean multiThreadedPurge = false;
 
    public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException{
       this.config = (AbstractCacheStoreConfig) config;
@@ -50,18 +54,31 @@
       if (config.isPurgeSynchronously()) {
          purgerService = new WithinThreadExecutor();
       } else {
-         purgerService = Executors.newSingleThreadExecutor();
+         multiThreadedPurge = supportsMultiThreadedPurge() && config.getPurgerThreads() > 1;
+         purgerService = Executors.newFixedThreadPool(supportsMultiThreadedPurge() ? config.getPurgerThreads() : 1, new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+               // Thread name: <cache>-<CacheStore>-<purger>-ID
+               Thread t = new Thread(r, cache.getName() + "-" + config.getCacheLoaderClassName().substring(config.getCacheLoaderClassName().lastIndexOf(".") + 1) + "-" + THREAD_COUNTER.getAndIncrement());
+               t.setDaemon(true);
+               return t;
+            }
+         });
       }
       transactions = new ConcurrentHashMap<GlobalTransaction, List<? extends Modification>>(64, 0.75f, getConcurrencyLevel());
    }
 
+   protected boolean supportsMultiThreadedPurge() {
+      return false;
+   }
+
    public void stop() throws CacheLoaderException {
       purgerService.shutdownNow();
    }
 
    public void purgeExpired() throws CacheLoaderException {
       if (purgerService == null)
-         throw new IllegalStateException("PurgeService is null (did you call super.start() from cache loader implementation ?");
+         throw new IllegalStateException("purgerService is null (did you call super.start() from cache loader implementation ?");
       purgerService.execute(new Runnable() {
          public void run() {
             try {
@@ -73,8 +90,7 @@
       });
    }
 
-   protected void purgeInternal() throws CacheLoaderException {
-   }
+   protected abstract void purgeInternal() throws CacheLoaderException;
 
    protected void applyModifications(List<? extends Modification> mods) throws CacheLoaderException {
       for (Modification m : mods) {

Modified: trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStoreConfig.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStoreConfig.java	2009-12-10 15:44:27 UTC (rev 1279)
+++ trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStoreConfig.java	2009-12-10 16:30:52 UTC (rev 1280)
@@ -17,7 +17,8 @@
  * <p/>
  * <ul> <li><tt>purgeSynchronously</tt> - whether {@link org.infinispan.loaders.CacheStore#purgeExpired()} calls happen
  * synchronously or not.  By default, this is set to <tt>false</tt>.</li>
- * <p/>
+ * <li><tt>purgerThreads</tt> - number of threads to use when purging.  Defaults to <tt>1</tt> if <tt>purgeSynchronously</tt>
+ * is <tt>true</tt>, ignored if <tt>false</tt>.</li>
  * </ul>
  * 
  * <p>
@@ -57,6 +58,9 @@
    /** @configRef desc="If true, CacheStore#purgeExpired() call will be done synchronously" */
    protected Boolean purgeSynchronously = false;
 
+   /** @configRef desc="The number of threads to use when purging asynchronously." */
+   protected Integer purgerThreads = 1;
+
    protected SingletonStoreConfig singletonStore = new SingletonStoreConfig();
 
    protected AsyncStoreConfig async = new AsyncStoreConfig();
@@ -66,11 +70,21 @@
       return purgeSynchronously;
    }
 
+   @XmlAttribute
+   public Integer getPurgerThreads() {
+      return purgerThreads;
+   }
+
    public void setPurgeSynchronously(Boolean purgeSynchronously) {
       testImmutability("purgeSynchronously");
       this.purgeSynchronously = purgeSynchronously;
    }
 
+   public void setPurgerThreads(Integer purgerThreads) {
+      testImmutability("purgerThreads");
+      this.purgerThreads = purgerThreads;
+   }
+
    @XmlAttribute
    public Boolean isFetchPersistentState() {
       return fetchPersistentState;
@@ -145,7 +159,8 @@
             && (this.fetchPersistentState.equals(other.fetchPersistentState))
             && Util.safeEquals(this.singletonStore, other.singletonStore)
             && Util.safeEquals(this.async, other.async)
-            && Util.safeEquals(this.purgeSynchronously, other.purgeSynchronously);
+            && Util.safeEquals(this.purgeSynchronously, other.purgeSynchronously)
+            && Util.safeEquals(this.purgerThreads, other.purgerThreads);
    }
 
    @Override
@@ -161,6 +176,7 @@
       result = 31 * result + (singletonStore == null ? 0 : singletonStore.hashCode());
       result = 31 * result + (async == null ? 0 : async.hashCode());
       result = 31 * result + (purgeOnStartup ? 0 : 1);
+      result = 31 * result + (purgerThreads);
       return result;
    }
 
@@ -174,6 +190,7 @@
             .append(", singletonStore{").append(singletonStore).append('}')
             .append(", async{").append(async).append('}')
             .append(", purgeSynchronously{").append(purgeSynchronously).append('}')
+            .append(", purgerThreads{").append(purgerThreads).append('}')
             .toString();
    }
 

Modified: trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java	2009-12-10 15:44:27 UTC (rev 1279)
+++ trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java	2009-12-10 16:30:52 UTC (rev 1280)
@@ -34,6 +34,7 @@
 public class FileCacheStore extends BucketBasedCacheStore {
 
    private static final Log log = LogFactory.getLog(FileCacheStore.class);
+   private static final boolean trace = log.isTraceEnabled();
    private int streamBufferSize;
 
    FileCacheStoreConfig config;
@@ -83,7 +84,7 @@
                } else {
                   bytesRead = objectInput.read(buffer, 0, numBytes - totalBytesRead);
                }
-               
+
                if (bytesRead == -1) break;
                totalBytesRead += bytesRead;
                bos.write(buffer, 0, bytesRead);
@@ -145,8 +146,38 @@
       }
    }
 
+   @Override
+   protected boolean supportsMultiThreadedPurge() {
+      return true;
+   }   
+
    protected void purgeInternal() throws CacheLoaderException {
-      loadAll();
+      if (trace) log.trace("purgeInternal()");
+      acquireGlobalLock(false);
+      try {
+         for (final File bucketFile : root.listFiles()) {
+            if (multiThreadedPurge) {
+               purgerService.execute(new Runnable() {
+                  @Override
+                  public void run() {
+                     Bucket bucket;
+                     try {
+                        if ((bucket = loadBucket(bucketFile)) != null && bucket.removeExpiredEntries())
+                           saveBucket(bucket);
+                     } catch (CacheLoaderException e) {
+                        log.warn("Problems purging file " + bucketFile, e);
+                     }
+                  }
+               });
+            } else {
+               Bucket bucket;
+               if ((bucket = loadBucket(bucketFile)) != null && bucket.removeExpiredEntries()) saveBucket(bucket);
+            }
+         }
+      } finally {
+         releaseGlobalLock(false);
+         if (trace) log.trace("Exit purgeInternal()");
+      }
    }
 
    protected Bucket loadBucket(String bucketName) throws CacheLoaderException {
@@ -215,9 +246,9 @@
          location = "Infinispan-FileCacheStore"; // use relative path!
       location += File.separator + cache.getName();
       root = new File(location);
-      if(!root.exists()) {
+      if (!root.exists()) {
          if (!root.mkdirs()) {
-            log.warn("Problems creating the directory: " + root);  
+            log.warn("Problems creating the directory: " + root);
          }
       }
       if (!root.exists()) {

Modified: trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStoreConfig.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStoreConfig.java	2009-12-10 15:44:27 UTC (rev 1279)
+++ trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStoreConfig.java	2009-12-10 16:30:52 UTC (rev 1280)
@@ -6,16 +6,22 @@
  * Configures {@link org.infinispan.loaders.file.FileCacheStore}.  This allows you to tune a number of characteristics
  * of the {@link FileCacheStore}.
  * <p/>
- * <ul> <li><tt>location</tt> - a location on disk where the store can write internal files.  This defaults to
- * <tt>Infinispan-FileCacheStore</tt> in the current working directory.</li> <li><tt>purgeSynchronously</tt> - whether
- * {@link org.infinispan.loaders.CacheStore#purgeExpired()} calls happen synchronously or not.  By default, this is set
- * to <tt>false</tt>.</li> <li><tt>streamBufferSize</tt> - when writing state to disk, a buffered stream is used.  This
+ *    <ul>
+ *       <li><tt>location</tt> - a location on disk where the store can write internal files.  This defaults to
+ * <tt>Infinispan-FileCacheStore</tt> in the current working directory.</li>
+ *       <li><tt>purgeSynchronously</tt> - whether {@link org.infinispan.loaders.CacheStore#purgeExpired()} calls happen
+ * synchronously or not.  By default, this is set to <tt>false</tt>.</li>
+ *       <li><tt>purgerThreads</tt> - number of threads to use when purging.  Defaults to <tt>1</tt> if <tt>purgeSynchronously</tt>
+ * is <tt>true</tt>, ignored if <tt>false</tt>.</li>
+ *    <li><tt>streamBufferSize</tt> - when writing state to disk, a buffered stream is used.  This
  * parameter allows you to tune the buffer size.  Larger buffers are usually faster but take up more (temporary) memory,
- * resulting in more gc. By default, this is set to <tt>8192</tt>.</li> <li><tt>lockConcurrencyLevel</tt> - locking
- * granularity is per file bucket.  This setting defines the number of shared locks to use.  The more locks you have,
- * the better your concurrency will be, but more locks take up more memory. By default, this is set to
- * <tt>2048</tt>.</li> <li><tt>lockAcquistionTimeout</tt> - the length of time, in milliseconds, to wait for locks
- * before timing out and throwing an exception.  By default, this is set to <tt>60000</tt>.</li> </ul>
+ * resulting in more gc. By default, this is set to <tt>8192</tt>.</li>
+ *    <li><tt>lockConcurrencyLevel</tt> - locking granularity is per file bucket.  This setting defines the number of
+ * shared locks to use.  The more locks you have, the better your concurrency will be, but more locks take up more
+ * memory. By default, this is set to <tt>2048</tt>.</li>
+ *    <li><tt>lockAcquistionTimeout</tt> - the length of time, in milliseconds, to wait for locks
+ * before timing out and throwing an exception.  By default, this is set to <tt>60000</tt>.</li>
+ * </ul>
  *
  * @author Manik Surtani
  * @autor Vladimir Blagojevic



More information about the infinispan-commits mailing list