[infinispan-commits] Infinispan SVN: r1792 - trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon May 17 06:46:43 EDT 2010


Author: pvdyck
Date: 2010-05-17 06:46:42 -0400 (Mon, 17 May 2010)
New Revision: 1792

Modified:
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
Log:
[ISPN-409] (CloudCacheStore GZipInputStream generates EOFException)

Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-05-13 19:59:28 UTC (rev 1791)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-05-17 10:46:42 UTC (rev 1792)
@@ -1,20 +1,7 @@
 package org.infinispan.loaders.cloud;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
 import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
@@ -39,22 +26,34 @@
 import org.jclouds.blobstore.domain.Blob;
 import org.jclouds.blobstore.domain.PageSet;
 import org.jclouds.blobstore.domain.StorageMetadata;
-import org.jclouds.blobstore.options.ListContainerOptions;
 import org.jclouds.domain.Location;
 import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
 import org.jclouds.logging.log4j.config.Log4JLoggingModule;
 
-import com.google.common.collect.ImmutableSet;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 /**
- * The CloudCacheStore implementation that utilizes <a
- * href="http://code.google.com/p/jclouds">JClouds</a> to communicate with cloud storage providers
- * such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
- * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or
- * any other such provider supported by JClouds.
+ * The CloudCacheStore implementation that utilizes <a href="http://code.google.com/p/jclouds">JClouds</a> to
+ * communicate with cloud storage providers such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
+ * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or any other such
+ * provider supported by JClouds.
  * <p/>
- * This file store stores stuff in the following format:
- * <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
+ * This file store stores stuff in the following format: <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}</tt>
  * <p/>
  *
  * @author Manik Surtani
@@ -63,7 +62,6 @@
  */
 @CacheLoaderMetadata(configurationClass = CloudCacheStoreConfig.class)
 public class CloudCacheStore extends BucketBasedCacheStore {
-   static final int COMPRESSION_COPY_BYTEARRAY_SIZE = 1024;
    private static final Log log = LogFactory.getLog(CloudCacheStore.class);
    final ThreadLocal<List<Future<?>>> asyncCommandFutures = new ThreadLocal<List<Future<?>>>();
    CloudCacheStoreConfig cfg;
@@ -74,7 +72,16 @@
    boolean pollFutures = false;
    boolean constructInternalBlobstores = true;
    protected static final String EARLIEST_EXPIRY_TIME = "metadata_eet";
+   private MessageDigest md5;
 
+   public CloudCacheStore() {
+      try {
+         md5 = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException ignore) {
+         md5 = null;
+      }
+   }
+
    @Override
    public Class<? extends CacheStoreConfig> getConfigurationClass() {
       return CloudCacheStoreConfig.class;
@@ -82,7 +89,7 @@
 
    private String getThisContainerName() {
       return cfg.getBucketPrefix() + "-"
-              + cache.getName().toLowerCase().replace("_", "").replace(".", "");
+            + cache.getName().toLowerCase().replace("_", "").replace(".", "");
    }
 
    @Override
@@ -92,14 +99,14 @@
 
    @Override
    public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m)
-           throws CacheLoaderException {
+         throws CacheLoaderException {
       this.cfg = (CloudCacheStoreConfig) cfg;
       init(cfg, cache, m, null, null, null, true);
    }
 
    public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m, BlobStoreContext ctx,
                     BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
-           throws CacheLoaderException {
+         throws CacheLoaderException {
       super.init(cfg, cache, m);
       this.cfg = (CloudCacheStoreConfig) cfg;
       this.cache = cache;
@@ -130,9 +137,9 @@
             // EnterpriseConfigurationModule, pass
             // property overrides instead of Properties()
             ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg
-                    .getIdentity(), cfg.getPassword(), ImmutableSet.of(
-                    new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
-                    new Properties());
+                  .getIdentity(), cfg.getPassword(), ImmutableSet.of(
+                  new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
+                                                              new Properties());
             blobStore = ctx.getBlobStore();
             asyncBlobStore = ctx.getAsyncBlobStore();
          }
@@ -150,8 +157,8 @@
                chosenLoc = idToLocation.get(loc);
                if (chosenLoc == null) {
                   log.warn(
-                     String.format("Unable to use configured Cloud Service Location [%s].  Available locations for Cloud Service [%s] are %s",
-                     loc, cfg.getCloudService(), idToLocation.keySet()));
+                        String.format("Unable to use configured Cloud Service Location [%s].  Available locations for Cloud Service [%s] are %s",
+                                      loc, cfg.getCloudService(), idToLocation.keySet()));
                }
             }
             blobStore.createContainerInLocation(chosenLoc, containerName);
@@ -162,6 +169,7 @@
       }
    }
 
+
    @Override
    protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
       for (Map.Entry<String, Blob> entry : ctx.createBlobMap(containerName).entrySet()) {
@@ -224,10 +232,11 @@
       return readFromBlob(blobStore.getBlob(containerName, encodeBucketName(hash)), hash);
    }
 
-   private void purge() throws CacheLoaderException {
+   private void purge() {
       long currentTime = System.currentTimeMillis();
-      PageSet<? extends StorageMetadata> ps = blobStore.list(containerName, ListContainerOptions.Builder.withDetails());
+      PageSet<? extends StorageMetadata> ps = blobStore.list(containerName);
 
+      // TODO do we need to scroll through the PageSet?
       for (StorageMetadata sm : ps) {
          long lastExpirableEntry = readLastExpirableEntryFromMetadata(sm.getUserMetadata());
          if (lastExpirableEntry < currentTime)
@@ -297,7 +306,7 @@
 
    @Override
    public void applyModifications(List<? extends Modification> modifications)
-           throws CacheLoaderException {
+         throws CacheLoaderException {
       List<Future<?>> futures = new LinkedList<Future<?>>();
       asyncCommandFutures.set(futures);
 
@@ -318,7 +327,7 @@
                Thread.currentThread().interrupt();
             } catch (ExecutionException ee) {
                exception = convertToCacheLoaderException("Caught exception in async process", ee
-                       .getCause());
+                     .getCause());
             }
             if (exception != null)
                throw exception;
@@ -342,13 +351,14 @@
 
       try {
          final byte[] payloadBuffer = marshaller.objectToByteBuffer(bucket);
-         if (cfg.isCompress())
-            blob.setPayload(compress(payloadBuffer));
-         else
+         if (cfg.isCompress()) {
+            final byte[] compress = compress(payloadBuffer, blob);
+            blob.setPayload(compress);
+         } else
             blob.setPayload(payloadBuffer);
          if (earliestExpiryTime > -1) {
             Map<String, String> md = Collections.singletonMap(EARLIEST_EXPIRY_TIME, String
-                    .valueOf(earliestExpiryTime));
+                  .valueOf(earliestExpiryTime));
             blob.getMetadata().setUserMetadata(md);
          }
 
@@ -360,34 +370,11 @@
    private Bucket readFromBlob(Blob blob, String bucketName) throws CacheLoaderException {
       if (blob == null)
          return null;
-      BZip2CompressorInputStream is = null;
       try {
          Bucket bucket;
          final InputStream content = blob.getContent();
          if (cfg.isCompress()) {
-            // TODO re-enable streaming
-            // everything is copied in memory in order to avoid an unfixed UnexpectedEOF exception
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            Streams.copy(content, bos);
-            final byte[] byteArray = bos.toByteArray();
-            ByteArrayInputStream bis = new ByteArrayInputStream(byteArray);
-            is = new BZip2CompressorInputStream(bis);
-            ByteArrayOutputStream bos2 = new ByteArrayOutputStream();
-            try {
-               Streams.copy(is, bos2);
-               final byte[] byteArray2 = bos2.toByteArray();
-
-               log.debug("Decompressed " + bucketName + " from " + byteArray.length + " -> "
-                       + byteArray2.length);
-
-               bucket = (Bucket) marshaller.objectFromInputStream(new ByteArrayInputStream(
-                       byteArray2));
-            } finally {
-               is.close();
-               bis.close();
-               bos.close();
-               bos2.close();
-            }
+            bucket = uncompress(blob, bucketName, content);
          } else
             bucket = (Bucket) marshaller.objectFromInputStream(content);
 
@@ -401,25 +388,70 @@
       }
    }
 
-   private byte[] compress(final byte[] payloadBuffer) throws IOException {
+   private Bucket uncompress(Blob blob, String bucketName, InputStream content) throws IOException, CacheLoaderException, ClassNotFoundException {
+      //TODO go back to fully streamed version and get rid of the byte buffers
+      BZip2CompressorInputStream is;
+      Bucket bucket;
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+      Streams.copy(content, bos);
+      final byte[] compressedByteArray = bos.toByteArray();
+
+      ByteArrayInputStream bis = new ByteArrayInputStream(compressedByteArray);
+
+      is = new BZip2CompressorInputStream(bis);
+      ByteArrayOutputStream bos2 = new ByteArrayOutputStream();
+      Streams.copy(is, bos2);
+      final byte[] uncompressedByteArray = bos2.toByteArray();
+
+      byte[] md5FromStoredBlob = blob.getMetadata().getContentMD5();
+
+      byte[] hash = getMd5Digest(compressedByteArray);
+
+      if (!Arrays.equals(hash, md5FromStoredBlob))
+         throw new CacheLoaderException("MD5 hash failed when reading (transfer error) for entry " + bucketName);
+
+      is.close();
+      bis.close();
+      bos.close();
+      bos2.close();
+
+      bucket = (Bucket) marshaller
+            .objectFromInputStream(new ByteArrayInputStream(uncompressedByteArray));
+      return bucket;
+   }
+
+   private byte[] compress(final byte[] uncompressedByteArray, Blob blob) throws IOException {
+      //TODO go back to fully streamed version and get rid of the byte buffers
       final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      InputStream input = new ByteArrayInputStream(payloadBuffer);
+
+      InputStream input = new ByteArrayInputStream(uncompressedByteArray);
       BZip2CompressorOutputStream output = new BZip2CompressorOutputStream(baos);
-      try {
-         Streams.copy(input, output);
-         return baos.toByteArray();
-      } finally {
-         output.close();
-         input.close();
-         baos.close();
-      }
+
+      Streams.copy(input, output);
+
+      output.close();
+      input.close();
+
+      final byte[] compressedByteArray = baos.toByteArray();
+
+      blob.getMetadata().setContentMD5(getMd5Digest(compressedByteArray));
+
+      baos.close();
+
+      return compressedByteArray;
    }
 
    private String encodeBucketName(String decodedName) {
       final String name = (decodedName.startsWith("-")) ? decodedName.replace('-', 'A')
-              : decodedName;
+            : decodedName;
       if (cfg.isCompress())
          return name + ".bz2";
       return name;
    }
+
+   private synchronized byte[] getMd5Digest(byte[] toDigest) {
+      md5.reset();
+      return md5.digest(toDigest);
+   }
 }



More information about the infinispan-commits mailing list