[infinispan-commits] Infinispan SVN: r1581 - in trunk/cachestore/cloud/src: main/java/org/infinispan/loaders/cloud and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Mar 9 09:55:48 EST 2010


Author: manik.surtani at jboss.com
Date: 2010-03-09 09:55:47 -0500 (Tue, 09 Mar 2010)
New Revision: 1581

Modified:
   trunk/cachestore/cloud/src/integrationtest/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
Log:
[ISPN-334] (CloudCacheStore should use metadata to store expiration information for more efficient purging) partial fix

Modified: trunk/cachestore/cloud/src/integrationtest/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java
===================================================================
--- trunk/cachestore/cloud/src/integrationtest/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java	2010-03-09 13:17:54 UTC (rev 1580)
+++ trunk/cachestore/cloud/src/integrationtest/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java	2010-03-09 14:55:47 UTC (rev 1581)
@@ -9,8 +9,14 @@
 import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.CacheStore;
 import org.infinispan.marshall.Marshaller;
+import org.infinispan.test.TestingUtil;
+import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.PageSet;
+import org.jclouds.blobstore.domain.StorageMetadata;
 import org.jclouds.blobstore.integration.StubBlobStoreContextBuilder;
+import org.jgroups.protocols.TUNNEL;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.BeforeTest;
@@ -24,6 +30,7 @@
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -196,7 +203,7 @@
 
    @Override
    @Test (enabled = false, description = "Disabled until we can build the blobstore stub to retain state somewhere.")
-   public void testStopStartDoesntNukeValues() throws InterruptedException, CacheLoaderException {
+   public void testStopStartDoesNotNukeValues() throws InterruptedException, CacheLoaderException {
 
    }
 
@@ -249,4 +256,21 @@
       for (InternalCacheEntry se : set) assert expected.remove(se.getKey());
       assert expected.isEmpty();
    }
+
+   // TODO test that this passes before closing ISPN-334
+   public void testJCloudsMetadataTest() throws IOException {
+      String blobName = "myBlob";
+      String containerName = "myContainer";
+      BlobStore blobStore = ((CloudCacheStore) cs).blobStore;
+      Blob b = blobStore.newBlob(blobName);
+      b.setPayload("Hello world");
+      b.getMetadata().setUserMetadata(Collections.singletonMap("hello", "world"));
+      blobStore.putBlob(containerName, b);
+
+      b = blobStore.getBlob(containerName, blobName);
+      assert "world".equals(b.getMetadata().getUserMetadata().get("hello"));
+
+      PageSet<? extends StorageMetadata> ps = blobStore.list(containerName);
+      for (StorageMetadata sm: ps) assert "world".equals(sm.getUserMetadata().get("hello"));
+   }
 }
\ No newline at end of file

Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-03-09 13:17:54 UTC (rev 1580)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-03-09 14:55:47 UTC (rev 1581)
@@ -1,22 +1,6 @@
 package org.infinispan.loaders.cloud;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
+import com.google.common.collect.ImmutableSet;
 import org.infinispan.Cache;
 import org.infinispan.config.ConfigurationException;
 import org.infinispan.container.entries.InternalCacheEntry;
@@ -31,43 +15,59 @@
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 import org.jclouds.blobstore.AsyncBlobStore;
-import org.jclouds.blobstore.BlobMap;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
 import org.jclouds.blobstore.BlobStoreContextFactory;
 import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.PageSet;
+import org.jclouds.blobstore.domain.StorageMetadata;
 import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
 import org.jclouds.logging.log4j.config.Log4JLoggingModule;
 
-import com.google.common.collect.ImmutableSet;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 /**
- * The CloudCacheStore implementation that utilizes <a
- * href="http://code.google.com/p/jclouds">JClouds</a> to communicate with cloud storage providers
- * such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
- * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or
- * any other such provider supported by JClouds.
+ * The CloudCacheStore implementation that utilizes <a href="http://code.google.com/p/jclouds">JClouds</a> to
+ * communicate with cloud storage providers such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
+ * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or any other such
+ * provider supported by JClouds.
  * <p/>
- * This file store stores stuff in the following format:
- * <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
+ * This file store stores stuff in the following format: <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
  * <p/>
- * 
+ *
  * @author Manik Surtani
  * @author Adrian Cole
  * @since 4.0
  */
 @CacheLoaderMetadata(configurationClass = CloudCacheStoreConfig.class)
 public class CloudCacheStore extends BucketBasedCacheStore {
-   private static final int COMPRESSION_COPY_BYTEARRAY_SIZE = 1024;
+   static final int COMPRESSION_COPY_BYTEARRAY_SIZE = 1024;
    private static final Log log = LogFactory.getLog(CloudCacheStore.class);
-   private final ThreadLocal<List<Future<?>>> asyncCommandFutures = new ThreadLocal<List<Future<?>>>();
-   private CloudCacheStoreConfig cfg;
-   private String containerName;
-   private BlobStoreContext ctx;
-   private BlobStore blobStore;
-   private AsyncBlobStore asyncBlobStore;
-   private boolean pollFutures = false;
-   private boolean constructInternalBlobstores = true;
+   final ThreadLocal<List<Future<?>>> asyncCommandFutures = new ThreadLocal<List<Future<?>>>();
+   CloudCacheStoreConfig cfg;
+   String containerName;
+   BlobStoreContext ctx;
+   BlobStore blobStore;
+   AsyncBlobStore asyncBlobStore;
+   boolean pollFutures = false;
+   boolean constructInternalBlobstores = true;
+   protected static final String EARLIEST_EXPIRY_TIME = "metadata_eet";
 
    @Override
    public Class<? extends CacheStoreConfig> getConfigurationClass() {
@@ -76,7 +76,7 @@
 
    private String getThisContainerName() {
       return cfg.getBucketPrefix() + "-"
-               + cache.getName().toLowerCase().replace("_", "").replace(".", "");
+            + cache.getName().toLowerCase().replace("_", "").replace(".", "");
    }
 
    @Override
@@ -86,14 +86,14 @@
 
    @Override
    public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m)
-            throws CacheLoaderException {
+         throws CacheLoaderException {
       this.cfg = (CloudCacheStoreConfig) cfg;
       init(cfg, cache, m, null, null, null, true);
    }
 
    public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m, BlobStoreContext ctx,
-            BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
-            throws CacheLoaderException {
+                    BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
+         throws CacheLoaderException {
       super.init(cfg, cache, m);
       this.cfg = (CloudCacheStoreConfig) cfg;
       this.cache = cache;
@@ -124,9 +124,9 @@
             // EnterpriseConfigurationModule, pass
             // property overrides instead of Properties()
             ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg
-                     .getIdentity(), cfg.getPassword(), ImmutableSet.of(
-                     new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
-                     new Properties());
+                  .getIdentity(), cfg.getPassword(), ImmutableSet.of(
+                  new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
+                                                              new Properties());
             blobStore = ctx.getBlobStore();
             asyncBlobStore = ctx.getAsyncBlobStore();
          }
@@ -164,8 +164,7 @@
       if (containerName.equals(source)) {
          log.info("Attempt to load the same cloud bucket ({0}) ignored", source);
       } else {
-         // TODO implement stream handling. What's the JClouds API to "copy"
-         // one bucket to another?
+         // TODO implement stream handling. What's the JClouds API to "copy" one bucket to another?
       }
    }
 
@@ -205,35 +204,51 @@
       return readFromBlob(blobStore.getBlob(containerName, encodeBucketName(hash)), hash);
    }
 
-   private void purge(BlobMap blobMap) throws CacheLoaderException {
-      for (Map.Entry<String, Blob> entry : blobMap.entrySet()) {
-         Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
-         if (bucket.removeExpiredEntries())
-            updateBucket(bucket);
+   private void purge() throws CacheLoaderException {
+      long currentTime = System.currentTimeMillis();
+      PageSet<? extends StorageMetadata> ps = blobStore.list(containerName);
+
+      // TODO do we need to scroll through the PageSet?
+      for (StorageMetadata sm : ps) {
+         long lastExpirableEntry = readLastExpirableEntryFromMetadata(sm.getUserMetadata());
+         if (lastExpirableEntry < currentTime) scanBlobForExpiredEntries(sm.getName());
       }
    }
 
+   private void scanBlobForExpiredEntries(String blobName) {
+      Blob blob = blobStore.getBlob(containerName, blobName);
+      try {
+         Bucket bucket = readFromBlob(blob, blobName);
+         if (bucket.removeExpiredEntries()) updateBucket(bucket);
+      } catch (CacheLoaderException e) {
+         log.warn("Unable to read blob at {0}", blobName, e);
+      }
+   }
+
+   private long readLastExpirableEntryFromMetadata(Map<String, String> metadata) {
+      String eet = metadata.get(EARLIEST_EXPIRY_TIME);
+      long eetLong = -1;
+      if (eet != null) eetLong = Long.parseLong(eet);
+      return eetLong;
+   }
+
    @Override
    protected void purgeInternal() throws CacheLoaderException {
-      // TODO can expiry data be stored in a blob's metadata? More efficient
-      // purging that way. See
-      // https://jira.jboss.org/jira/browse/ISPN-334
       if (!cfg.isLazyPurgingOnly()) {
          acquireGlobalLock(false);
          try {
-            final BlobMap blobMap = ctx.createBlobMap(containerName);
             if (multiThreadedPurge) {
                purgerService.execute(new Runnable() {
                   public void run() {
                      try {
-                        purge(blobMap);
+                        purge();
                      } catch (Exception e) {
                         log.warn("Problems purging", e);
                      }
                   }
                });
             } else {
-               purge(blobMap);
+               purge();
             }
          } finally {
             releaseGlobalLock(false);
@@ -260,7 +275,7 @@
 
    @Override
    public void applyModifications(List<? extends Modification> modifications)
-            throws CacheLoaderException {
+         throws CacheLoaderException {
       List<Future<?>> futures = new LinkedList<Future<?>>();
       asyncCommandFutures.set(futures);
 
@@ -281,7 +296,7 @@
                Thread.currentThread().interrupt();
             } catch (ExecutionException ee) {
                exception = convertToCacheLoaderException("Caught exception in async process", ee
-                        .getCause());
+                     .getCause());
             }
             if (exception != null)
                throw exception;
@@ -297,6 +312,14 @@
    }
 
    private void writeToBlob(Blob blob, Bucket bucket) throws CacheLoaderException {
+      long earliestExpiryTime = -1;
+      for (InternalCacheEntry e : bucket.getEntries().values()) {
+         long t = e.getExpiryTime();
+         if (t != -1) {
+            if (earliestExpiryTime == -1) earliestExpiryTime = t;
+            else earliestExpiryTime = Math.min(earliestExpiryTime, t);
+         }
+      }
 
       try {
          final byte[] payloadBuffer = marshaller.objectToByteBuffer(bucket);
@@ -304,6 +327,11 @@
             blob.setPayload(compress(payloadBuffer));
          else
             blob.setPayload(payloadBuffer);
+         if (earliestExpiryTime > -1) {
+            Map<String, String> md = Collections.singletonMap(EARLIEST_EXPIRY_TIME, String.valueOf(earliestExpiryTime));
+            blob.getMetadata().setUserMetadata(md);
+         }
+
       } catch (IOException e) {
          throw new CacheLoaderException(e);
       }
@@ -316,7 +344,7 @@
          Bucket bucket;
          if (cfg.isCompress())
             bucket = (Bucket) marshaller.objectFromInputStream(new GZIPInputStream(blob
-                     .getContent()));
+                  .getContent()));
          else
             bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
 
@@ -348,7 +376,7 @@
 
    private String encodeBucketName(String decodedName) {
       final String name = (decodedName.startsWith("-")) ? decodedName.replace('-', 'A')
-               : decodedName;
+            : decodedName;
       if (cfg.isCompress())
          return name + ".gz";
       return name;

Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java	2010-03-09 13:17:54 UTC (rev 1580)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java	2010-03-09 14:55:47 UTC (rev 1581)
@@ -45,6 +45,7 @@
    private String proxyHost;
    private String proxyPort;
    private long requestTimeout = 10000;
+   // TODO Once ISPN-334 is closed, consider setting this to false by default.
    private boolean lazyPurgingOnly = true;
    private String cloudService;
    private int maxConnections = 10000;



More information about the infinispan-commits mailing list