[infinispan-commits] Infinispan SVN: r1792 - trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon May 17 06:46:43 EDT 2010
Author: pvdyck
Date: 2010-05-17 06:46:42 -0400 (Mon, 17 May 2010)
New Revision: 1792
Modified:
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
Log:
[ISPN-409] (CloudCacheStore GZipInputStream generates EOFException)
Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-05-13 19:59:28 UTC (rev 1791)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-05-17 10:46:42 UTC (rev 1792)
@@ -1,20 +1,7 @@
package org.infinispan.loaders.cloud;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
@@ -39,22 +26,34 @@
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.domain.PageSet;
import org.jclouds.blobstore.domain.StorageMetadata;
-import org.jclouds.blobstore.options.ListContainerOptions;
import org.jclouds.domain.Location;
import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
import org.jclouds.logging.log4j.config.Log4JLoggingModule;
-import com.google.common.collect.ImmutableSet;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
/**
- * The CloudCacheStore implementation that utilizes <a
- * href="http://code.google.com/p/jclouds">JClouds</a> to communicate with cloud storage providers
- * such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
- * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or
- * any other such provider supported by JClouds.
+ * The CloudCacheStore implementation that utilizes <a href="http://code.google.com/p/jclouds">JClouds</a> to
+ * communicate with cloud storage providers such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
+ * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or any other such
+ * provider supported by JClouds.
* <p/>
- * This file store stores stuff in the following format:
- * <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
+ * This file store stores stuff in the following format: <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}</tt>
* <p/>
*
* @author Manik Surtani
@@ -63,7 +62,6 @@
*/
@CacheLoaderMetadata(configurationClass = CloudCacheStoreConfig.class)
public class CloudCacheStore extends BucketBasedCacheStore {
- static final int COMPRESSION_COPY_BYTEARRAY_SIZE = 1024;
private static final Log log = LogFactory.getLog(CloudCacheStore.class);
final ThreadLocal<List<Future<?>>> asyncCommandFutures = new ThreadLocal<List<Future<?>>>();
CloudCacheStoreConfig cfg;
@@ -74,7 +72,16 @@
boolean pollFutures = false;
boolean constructInternalBlobstores = true;
protected static final String EARLIEST_EXPIRY_TIME = "metadata_eet";
+ private MessageDigest md5;
+ public CloudCacheStore() {
+ try {
+ md5 = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException ignore) {
+ md5 = null;
+ }
+ }
+
@Override
public Class<? extends CacheStoreConfig> getConfigurationClass() {
return CloudCacheStoreConfig.class;
@@ -82,7 +89,7 @@
private String getThisContainerName() {
return cfg.getBucketPrefix() + "-"
- + cache.getName().toLowerCase().replace("_", "").replace(".", "");
+ + cache.getName().toLowerCase().replace("_", "").replace(".", "");
}
@Override
@@ -92,14 +99,14 @@
@Override
public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m)
- throws CacheLoaderException {
+ throws CacheLoaderException {
this.cfg = (CloudCacheStoreConfig) cfg;
init(cfg, cache, m, null, null, null, true);
}
public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m, BlobStoreContext ctx,
BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
- throws CacheLoaderException {
+ throws CacheLoaderException {
super.init(cfg, cache, m);
this.cfg = (CloudCacheStoreConfig) cfg;
this.cache = cache;
@@ -130,9 +137,9 @@
// EnterpriseConfigurationModule, pass
// property overrides instead of Properties()
ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg
- .getIdentity(), cfg.getPassword(), ImmutableSet.of(
- new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
- new Properties());
+ .getIdentity(), cfg.getPassword(), ImmutableSet.of(
+ new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
+ new Properties());
blobStore = ctx.getBlobStore();
asyncBlobStore = ctx.getAsyncBlobStore();
}
@@ -150,8 +157,8 @@
chosenLoc = idToLocation.get(loc);
if (chosenLoc == null) {
log.warn(
- String.format("Unable to use configured Cloud Service Location [%s]. Available locations for Cloud Service [%s] are %s",
- loc, cfg.getCloudService(), idToLocation.keySet()));
+ String.format("Unable to use configured Cloud Service Location [%s]. Available locations for Cloud Service [%s] are %s",
+ loc, cfg.getCloudService(), idToLocation.keySet()));
}
}
blobStore.createContainerInLocation(chosenLoc, containerName);
@@ -162,6 +169,7 @@
}
}
+
@Override
protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
for (Map.Entry<String, Blob> entry : ctx.createBlobMap(containerName).entrySet()) {
@@ -224,10 +232,11 @@
return readFromBlob(blobStore.getBlob(containerName, encodeBucketName(hash)), hash);
}
- private void purge() throws CacheLoaderException {
+ private void purge() {
long currentTime = System.currentTimeMillis();
- PageSet<? extends StorageMetadata> ps = blobStore.list(containerName, ListContainerOptions.Builder.withDetails());
+ PageSet<? extends StorageMetadata> ps = blobStore.list(containerName);
+ // TODO do we need to scroll through the PageSet?
for (StorageMetadata sm : ps) {
long lastExpirableEntry = readLastExpirableEntryFromMetadata(sm.getUserMetadata());
if (lastExpirableEntry < currentTime)
@@ -297,7 +306,7 @@
@Override
public void applyModifications(List<? extends Modification> modifications)
- throws CacheLoaderException {
+ throws CacheLoaderException {
List<Future<?>> futures = new LinkedList<Future<?>>();
asyncCommandFutures.set(futures);
@@ -318,7 +327,7 @@
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
exception = convertToCacheLoaderException("Caught exception in async process", ee
- .getCause());
+ .getCause());
}
if (exception != null)
throw exception;
@@ -342,13 +351,14 @@
try {
final byte[] payloadBuffer = marshaller.objectToByteBuffer(bucket);
- if (cfg.isCompress())
- blob.setPayload(compress(payloadBuffer));
- else
+ if (cfg.isCompress()) {
+ final byte[] compress = compress(payloadBuffer, blob);
+ blob.setPayload(compress);
+ } else
blob.setPayload(payloadBuffer);
if (earliestExpiryTime > -1) {
Map<String, String> md = Collections.singletonMap(EARLIEST_EXPIRY_TIME, String
- .valueOf(earliestExpiryTime));
+ .valueOf(earliestExpiryTime));
blob.getMetadata().setUserMetadata(md);
}
@@ -360,34 +370,11 @@
private Bucket readFromBlob(Blob blob, String bucketName) throws CacheLoaderException {
if (blob == null)
return null;
- BZip2CompressorInputStream is = null;
try {
Bucket bucket;
final InputStream content = blob.getContent();
if (cfg.isCompress()) {
- // TODO re-enable streaming
- // everything is copied in memory in order to avoid an unfixed UnexpectedEOF exception
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Streams.copy(content, bos);
- final byte[] byteArray = bos.toByteArray();
- ByteArrayInputStream bis = new ByteArrayInputStream(byteArray);
- is = new BZip2CompressorInputStream(bis);
- ByteArrayOutputStream bos2 = new ByteArrayOutputStream();
- try {
- Streams.copy(is, bos2);
- final byte[] byteArray2 = bos2.toByteArray();
-
- log.debug("Decompressed " + bucketName + " from " + byteArray.length + " -> "
- + byteArray2.length);
-
- bucket = (Bucket) marshaller.objectFromInputStream(new ByteArrayInputStream(
- byteArray2));
- } finally {
- is.close();
- bis.close();
- bos.close();
- bos2.close();
- }
+ bucket = uncompress(blob, bucketName, content);
} else
bucket = (Bucket) marshaller.objectFromInputStream(content);
@@ -401,25 +388,70 @@
}
}
- private byte[] compress(final byte[] payloadBuffer) throws IOException {
+ private Bucket uncompress(Blob blob, String bucketName, InputStream content) throws IOException, CacheLoaderException, ClassNotFoundException {
+ //TODO go back to fully streamed version and get rid of the byte buffers
+ BZip2CompressorInputStream is;
+ Bucket bucket;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ Streams.copy(content, bos);
+ final byte[] compressedByteArray = bos.toByteArray();
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressedByteArray);
+
+ is = new BZip2CompressorInputStream(bis);
+ ByteArrayOutputStream bos2 = new ByteArrayOutputStream();
+ Streams.copy(is, bos2);
+ final byte[] uncompressedByteArray = bos2.toByteArray();
+
+ byte[] md5FromStoredBlob = blob.getMetadata().getContentMD5();
+
+ byte[] hash = getMd5Digest(compressedByteArray);
+
+ if (!Arrays.equals(hash, md5FromStoredBlob))
+ throw new CacheLoaderException("MD5 hash failed when reading (transfer error) for entry " + bucketName);
+
+ is.close();
+ bis.close();
+ bos.close();
+ bos2.close();
+
+ bucket = (Bucket) marshaller
+ .objectFromInputStream(new ByteArrayInputStream(uncompressedByteArray));
+ return bucket;
+ }
+
+ private byte[] compress(final byte[] uncompressedByteArray, Blob blob) throws IOException {
+ //TODO go back to fully streamed version and get rid of the byte buffers
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- InputStream input = new ByteArrayInputStream(payloadBuffer);
+
+ InputStream input = new ByteArrayInputStream(uncompressedByteArray);
BZip2CompressorOutputStream output = new BZip2CompressorOutputStream(baos);
- try {
- Streams.copy(input, output);
- return baos.toByteArray();
- } finally {
- output.close();
- input.close();
- baos.close();
- }
+
+ Streams.copy(input, output);
+
+ output.close();
+ input.close();
+
+ final byte[] compressedByteArray = baos.toByteArray();
+
+ blob.getMetadata().setContentMD5(getMd5Digest(compressedByteArray));
+
+ baos.close();
+
+ return compressedByteArray;
}
private String encodeBucketName(String decodedName) {
final String name = (decodedName.startsWith("-")) ? decodedName.replace('-', 'A')
- : decodedName;
+ : decodedName;
if (cfg.isCompress())
return name + ".bz2";
return name;
}
+
+ private synchronized byte[] getMd5Digest(byte[] toDigest) {
+ md5.reset();
+ return md5.digest(toDigest);
+ }
}
More information about the infinispan-commits
mailing list