[infinispan-commits] Infinispan SVN: r1581 - in trunk/cachestore/cloud/src: main/java/org/infinispan/loaders/cloud and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Mar 9 09:55:48 EST 2010
Author: manik.surtani at jboss.com
Date: 2010-03-09 09:55:47 -0500 (Tue, 09 Mar 2010)
New Revision: 1581
Modified:
trunk/cachestore/cloud/src/integrationtest/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
Log:
[ISPN-334] (CloudCacheStore should use metadata to store expiration information for more efficient purging) partial fix
Modified: trunk/cachestore/cloud/src/integrationtest/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java
===================================================================
--- trunk/cachestore/cloud/src/integrationtest/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java 2010-03-09 13:17:54 UTC (rev 1580)
+++ trunk/cachestore/cloud/src/integrationtest/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java 2010-03-09 14:55:47 UTC (rev 1581)
@@ -9,8 +9,14 @@
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.infinispan.marshall.Marshaller;
+import org.infinispan.test.TestingUtil;
+import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.PageSet;
+import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.blobstore.integration.StubBlobStoreContextBuilder;
+import org.jgroups.protocols.TUNNEL;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeTest;
@@ -24,6 +30,7 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -196,7 +203,7 @@
@Override
@Test (enabled = false, description = "Disabled until we can build the blobstore stub to retain state somewhere.")
- public void testStopStartDoesntNukeValues() throws InterruptedException, CacheLoaderException {
+ public void testStopStartDoesNotNukeValues() throws InterruptedException, CacheLoaderException {
}
@@ -249,4 +256,21 @@
for (InternalCacheEntry se : set) assert expected.remove(se.getKey());
assert expected.isEmpty();
}
+
+ // TODO test that this passes before closing ISPN-334
+ public void testJCloudsMetadataTest() throws IOException {
+ String blobName = "myBlob";
+ String containerName = "myContainer";
+ BlobStore blobStore = ((CloudCacheStore) cs).blobStore;
+ Blob b = blobStore.newBlob(blobName);
+ b.setPayload("Hello world");
+ b.getMetadata().setUserMetadata(Collections.singletonMap("hello", "world"));
+ blobStore.putBlob(containerName, b);
+
+ b = blobStore.getBlob(containerName, blobName);
+ assert "world".equals(b.getMetadata().getUserMetadata().get("hello"));
+
+ PageSet<? extends StorageMetadata> ps = blobStore.list(containerName);
+ for (StorageMetadata sm: ps) assert "world".equals(sm.getUserMetadata().get("hello"));
+ }
}
\ No newline at end of file
Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-03-09 13:17:54 UTC (rev 1580)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-03-09 14:55:47 UTC (rev 1581)
@@ -1,22 +1,6 @@
package org.infinispan.loaders.cloud;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
+import com.google.common.collect.ImmutableSet;
import org.infinispan.Cache;
import org.infinispan.config.ConfigurationException;
import org.infinispan.container.entries.InternalCacheEntry;
@@ -31,43 +15,59 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.jclouds.blobstore.AsyncBlobStore;
-import org.jclouds.blobstore.BlobMap;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.BlobStoreContextFactory;
import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.blobstore.domain.PageSet;
+import org.jclouds.blobstore.domain.StorageMetadata;
import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
import org.jclouds.logging.log4j.config.Log4JLoggingModule;
-import com.google.common.collect.ImmutableSet;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
/**
- * The CloudCacheStore implementation that utilizes <a
- * href="http://code.google.com/p/jclouds">JClouds</a> to communicate with cloud storage providers
- * such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
- * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or
- * any other such provider supported by JClouds.
+ * The CloudCacheStore implementation that utilizes <a href="http://code.google.com/p/jclouds">JClouds</a> to
+ * communicate with cloud storage providers such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
+ * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or any other such
+ * provider supported by JClouds.
* <p/>
- * This file store stores stuff in the following format:
- * <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
+ * This file store stores stuff in the following format: <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
* <p/>
- *
+ *
* @author Manik Surtani
* @author Adrian Cole
* @since 4.0
*/
@CacheLoaderMetadata(configurationClass = CloudCacheStoreConfig.class)
public class CloudCacheStore extends BucketBasedCacheStore {
- private static final int COMPRESSION_COPY_BYTEARRAY_SIZE = 1024;
+ static final int COMPRESSION_COPY_BYTEARRAY_SIZE = 1024;
private static final Log log = LogFactory.getLog(CloudCacheStore.class);
- private final ThreadLocal<List<Future<?>>> asyncCommandFutures = new ThreadLocal<List<Future<?>>>();
- private CloudCacheStoreConfig cfg;
- private String containerName;
- private BlobStoreContext ctx;
- private BlobStore blobStore;
- private AsyncBlobStore asyncBlobStore;
- private boolean pollFutures = false;
- private boolean constructInternalBlobstores = true;
+ final ThreadLocal<List<Future<?>>> asyncCommandFutures = new ThreadLocal<List<Future<?>>>();
+ CloudCacheStoreConfig cfg;
+ String containerName;
+ BlobStoreContext ctx;
+ BlobStore blobStore;
+ AsyncBlobStore asyncBlobStore;
+ boolean pollFutures = false;
+ boolean constructInternalBlobstores = true;
+ protected static final String EARLIEST_EXPIRY_TIME = "metadata_eet";
@Override
public Class<? extends CacheStoreConfig> getConfigurationClass() {
@@ -76,7 +76,7 @@
private String getThisContainerName() {
return cfg.getBucketPrefix() + "-"
- + cache.getName().toLowerCase().replace("_", "").replace(".", "");
+ + cache.getName().toLowerCase().replace("_", "").replace(".", "");
}
@Override
@@ -86,14 +86,14 @@
@Override
public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m)
- throws CacheLoaderException {
+ throws CacheLoaderException {
this.cfg = (CloudCacheStoreConfig) cfg;
init(cfg, cache, m, null, null, null, true);
}
public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m, BlobStoreContext ctx,
- BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
- throws CacheLoaderException {
+ BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
+ throws CacheLoaderException {
super.init(cfg, cache, m);
this.cfg = (CloudCacheStoreConfig) cfg;
this.cache = cache;
@@ -124,9 +124,9 @@
// EnterpriseConfigurationModule, pass
// property overrides instead of Properties()
ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg
- .getIdentity(), cfg.getPassword(), ImmutableSet.of(
- new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
- new Properties());
+ .getIdentity(), cfg.getPassword(), ImmutableSet.of(
+ new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
+ new Properties());
blobStore = ctx.getBlobStore();
asyncBlobStore = ctx.getAsyncBlobStore();
}
@@ -164,8 +164,7 @@
if (containerName.equals(source)) {
log.info("Attempt to load the same cloud bucket ({0}) ignored", source);
} else {
- // TODO implement stream handling. What's the JClouds API to "copy"
- // one bucket to another?
+ // TODO implement stream handling. What's the JClouds API to "copy" one bucket to another?
}
}
@@ -205,35 +204,51 @@
return readFromBlob(blobStore.getBlob(containerName, encodeBucketName(hash)), hash);
}
- private void purge(BlobMap blobMap) throws CacheLoaderException {
- for (Map.Entry<String, Blob> entry : blobMap.entrySet()) {
- Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
- if (bucket.removeExpiredEntries())
- updateBucket(bucket);
+ private void purge() throws CacheLoaderException {
+ long currentTime = System.currentTimeMillis();
+ PageSet<? extends StorageMetadata> ps = blobStore.list(containerName);
+
+ // TODO do we need to scroll through the PageSet?
+ for (StorageMetadata sm : ps) {
+ long lastExpirableEntry = readLastExpirableEntryFromMetadata(sm.getUserMetadata());
+ if (lastExpirableEntry < currentTime) scanBlobForExpiredEntries(sm.getName());
}
}
+ private void scanBlobForExpiredEntries(String blobName) {
+ Blob blob = blobStore.getBlob(containerName, blobName);
+ try {
+ Bucket bucket = readFromBlob(blob, blobName);
+ if (bucket.removeExpiredEntries()) updateBucket(bucket);
+ } catch (CacheLoaderException e) {
+ log.warn("Unable to read blob at {0}", blobName, e);
+ }
+ }
+
+ private long readLastExpirableEntryFromMetadata(Map<String, String> metadata) {
+ String eet = metadata.get(EARLIEST_EXPIRY_TIME);
+ long eetLong = -1;
+ if (eet != null) eetLong = Long.parseLong(eet);
+ return eetLong;
+ }
+
@Override
protected void purgeInternal() throws CacheLoaderException {
- // TODO can expiry data be stored in a blob's metadata? More efficient
- // purging that way. See
- // https://jira.jboss.org/jira/browse/ISPN-334
if (!cfg.isLazyPurgingOnly()) {
acquireGlobalLock(false);
try {
- final BlobMap blobMap = ctx.createBlobMap(containerName);
if (multiThreadedPurge) {
purgerService.execute(new Runnable() {
public void run() {
try {
- purge(blobMap);
+ purge();
} catch (Exception e) {
log.warn("Problems purging", e);
}
}
});
} else {
- purge(blobMap);
+ purge();
}
} finally {
releaseGlobalLock(false);
@@ -260,7 +275,7 @@
@Override
public void applyModifications(List<? extends Modification> modifications)
- throws CacheLoaderException {
+ throws CacheLoaderException {
List<Future<?>> futures = new LinkedList<Future<?>>();
asyncCommandFutures.set(futures);
@@ -281,7 +296,7 @@
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
exception = convertToCacheLoaderException("Caught exception in async process", ee
- .getCause());
+ .getCause());
}
if (exception != null)
throw exception;
@@ -297,6 +312,14 @@
}
private void writeToBlob(Blob blob, Bucket bucket) throws CacheLoaderException {
+ long earliestExpiryTime = -1;
+ for (InternalCacheEntry e : bucket.getEntries().values()) {
+ long t = e.getExpiryTime();
+ if (t != -1) {
+ if (earliestExpiryTime == -1) earliestExpiryTime = t;
+ else earliestExpiryTime = Math.min(earliestExpiryTime, t);
+ }
+ }
try {
final byte[] payloadBuffer = marshaller.objectToByteBuffer(bucket);
@@ -304,6 +327,11 @@
blob.setPayload(compress(payloadBuffer));
else
blob.setPayload(payloadBuffer);
+ if (earliestExpiryTime > -1) {
+ Map<String, String> md = Collections.singletonMap(EARLIEST_EXPIRY_TIME, String.valueOf(earliestExpiryTime));
+ blob.getMetadata().setUserMetadata(md);
+ }
+
} catch (IOException e) {
throw new CacheLoaderException(e);
}
@@ -316,7 +344,7 @@
Bucket bucket;
if (cfg.isCompress())
bucket = (Bucket) marshaller.objectFromInputStream(new GZIPInputStream(blob
- .getContent()));
+ .getContent()));
else
bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
@@ -348,7 +376,7 @@
private String encodeBucketName(String decodedName) {
final String name = (decodedName.startsWith("-")) ? decodedName.replace('-', 'A')
- : decodedName;
+ : decodedName;
if (cfg.isCompress())
return name + ".gz";
return name;
Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java 2010-03-09 13:17:54 UTC (rev 1580)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java 2010-03-09 14:55:47 UTC (rev 1581)
@@ -45,6 +45,7 @@
private String proxyHost;
private String proxyPort;
private long requestTimeout = 10000;
+ // TODO Once ISPN-334 is closed, consider setting this to false by default.
private boolean lazyPurgingOnly = true;
private String cloudService;
private int maxConnections = 10000;
More information about the infinispan-commits
mailing list