[infinispan-commits] Infinispan SVN: r1715 - in trunk/cachestore/cloud: src/main/java/org/infinispan/loaders/cloud and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Apr 22 08:45:13 EDT 2010


Author: pvdyck
Date: 2010-04-22 08:45:11 -0400 (Thu, 22 Apr 2010)
New Revision: 1715

Modified:
   trunk/cachestore/cloud/pom.xml
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
Log:
[ISPN-409] (CloudCacheStore GZipInputStream generates EOFException)

Modified: trunk/cachestore/cloud/pom.xml
===================================================================
--- trunk/cachestore/cloud/pom.xml	2010-04-22 10:50:35 UTC (rev 1714)
+++ trunk/cachestore/cloud/pom.xml	2010-04-22 12:45:11 UTC (rev 1715)
@@ -4,7 +4,7 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
-      <groupId>org.infinispan</groupId>
+      <groupId>org.infinispan</groupId> 
       <artifactId>infinispan-cachestore-parent</artifactId>
       <version>4.1.0-SNAPSHOT</version>
       <relativePath>../pom.xml</relativePath>
@@ -64,6 +64,13 @@
          <scope>test</scope>
       </dependency>
       <!-- =========================================================== -->
+      
+      <dependency>
+			<groupId>org.apache.commons</groupId>
+			<artifactId>commons-compress</artifactId>
+			<version>1.0</version>
+	  </dependency>
+      
    </dependencies>
 
    <build>

Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-04-22 10:50:35 UTC (rev 1714)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-04-22 12:45:11 UTC (rev 1715)
@@ -1,6 +1,21 @@
 package org.infinispan.loaders.cloud;
 
-import com.google.common.collect.ImmutableSet;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.infinispan.Cache;
 import org.infinispan.config.ConfigurationException;
 import org.infinispan.container.entries.InternalCacheEntry;
@@ -14,6 +29,7 @@
 import org.infinispan.marshall.Marshaller;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
+import org.jboss.util.stream.Streams;
 import org.jclouds.blobstore.AsyncBlobStore;
 import org.jclouds.blobstore.BlobStore;
 import org.jclouds.blobstore.BlobStoreContext;
@@ -24,34 +40,19 @@
 import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
 import org.jclouds.logging.log4j.config.Log4JLoggingModule;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
+import com.google.common.collect.ImmutableSet;
 
 /**
- * The CloudCacheStore implementation that utilizes <a href="http://code.google.com/p/jclouds">JClouds</a> to
- * communicate with cloud storage providers such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
- * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or any other such
- * provider supported by JClouds.
+ * The CloudCacheStore implementation that utilizes <a
+ * href="http://code.google.com/p/jclouds">JClouds</a> to communicate with cloud storage providers
+ * such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
+ * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or
+ * any other such provider supported by JClouds.
  * <p/>
- * This file store stores stuff in the following format: <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
+ * This file store stores stuff in the following format:
+ * <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
  * <p/>
- *
+ * 
  * @author Manik Surtani
  * @author Adrian Cole
  * @since 4.0
@@ -77,7 +78,7 @@
 
    private String getThisContainerName() {
       return cfg.getBucketPrefix() + "-"
-            + cache.getName().toLowerCase().replace("_", "").replace(".", "");
+               + cache.getName().toLowerCase().replace("_", "").replace(".", "");
    }
 
    @Override
@@ -87,14 +88,14 @@
 
    @Override
    public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m)
-         throws CacheLoaderException {
+            throws CacheLoaderException {
       this.cfg = (CloudCacheStoreConfig) cfg;
       init(cfg, cache, m, null, null, null, true);
    }
 
    public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m, BlobStoreContext ctx,
-                    BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
-         throws CacheLoaderException {
+            BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
+            throws CacheLoaderException {
       super.init(cfg, cache, m);
       this.cfg = (CloudCacheStoreConfig) cfg;
       this.cache = cache;
@@ -125,9 +126,9 @@
             // EnterpriseConfigurationModule, pass
             // property overrides instead of Properties()
             ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg
-                  .getIdentity(), cfg.getPassword(), ImmutableSet.of(
-                  new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
-                                                              new Properties());
+                     .getIdentity(), cfg.getPassword(), ImmutableSet.of(
+                     new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
+                     new Properties());
             blobStore = ctx.getBlobStore();
             asyncBlobStore = ctx.getAsyncBlobStore();
          }
@@ -145,8 +146,10 @@
    protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
       for (Map.Entry<String, Blob> entry : ctx.createBlobMap(containerName).entrySet()) {
          Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
-         if (bucket.removeExpiredEntries()) updateBucket(bucket);
-         if (handler.handle(bucket)) break;
+         if (bucket.removeExpiredEntries())
+            updateBucket(bucket);
+         if (handler.handle(bucket))
+            break;
       }
    }
 
@@ -208,7 +211,8 @@
       // TODO do we need to scroll through the PageSet?
       for (StorageMetadata sm : ps) {
          long lastExpirableEntry = readLastExpirableEntryFromMetadata(sm.getUserMetadata());
-         if (lastExpirableEntry < currentTime) scanBlobForExpiredEntries(sm.getName());
+         if (lastExpirableEntry < currentTime)
+            scanBlobForExpiredEntries(sm.getName());
       }
    }
 
@@ -216,7 +220,8 @@
       Blob blob = blobStore.getBlob(containerName, blobName);
       try {
          Bucket bucket = readFromBlob(blob, blobName);
-         if (bucket.removeExpiredEntries()) updateBucket(bucket);
+         if (bucket.removeExpiredEntries())
+            updateBucket(bucket);
       } catch (CacheLoaderException e) {
          log.warn("Unable to read blob at {0}", blobName, e);
       }
@@ -225,7 +230,8 @@
    private long readLastExpirableEntryFromMetadata(Map<String, String> metadata) {
       String eet = metadata.get(EARLIEST_EXPIRY_TIME);
       long eetLong = -1;
-      if (eet != null) eetLong = Long.parseLong(eet);
+      if (eet != null)
+         eetLong = Long.parseLong(eet);
       return eetLong;
    }
 
@@ -272,7 +278,7 @@
 
    @Override
    public void applyModifications(List<? extends Modification> modifications)
-         throws CacheLoaderException {
+            throws CacheLoaderException {
       List<Future<?>> futures = new LinkedList<Future<?>>();
       asyncCommandFutures.set(futures);
 
@@ -293,7 +299,7 @@
                Thread.currentThread().interrupt();
             } catch (ExecutionException ee) {
                exception = convertToCacheLoaderException("Caught exception in async process", ee
-                     .getCause());
+                        .getCause());
             }
             if (exception != null)
                throw exception;
@@ -308,8 +314,10 @@
       for (InternalCacheEntry e : bucket.getEntries().values()) {
          long t = e.getExpiryTime();
          if (t != -1) {
-            if (earliestExpiryTime == -1) earliestExpiryTime = t;
-            else earliestExpiryTime = Math.min(earliestExpiryTime, t);
+            if (earliestExpiryTime == -1)
+               earliestExpiryTime = t;
+            else
+               earliestExpiryTime = Math.min(earliestExpiryTime, t);
          }
       }
 
@@ -320,7 +328,8 @@
          else
             blob.setPayload(payloadBuffer);
          if (earliestExpiryTime > -1) {
-            Map<String, String> md = Collections.singletonMap(EARLIEST_EXPIRY_TIME, String.valueOf(earliestExpiryTime));
+            Map<String, String> md = Collections.singletonMap(EARLIEST_EXPIRY_TIME, String
+                     .valueOf(earliestExpiryTime));
             blob.getMetadata().setUserMetadata(md);
          }
 
@@ -332,14 +341,37 @@
    private Bucket readFromBlob(Blob blob, String bucketName) throws CacheLoaderException {
       if (blob == null)
          return null;
+      BZip2CompressorInputStream is = null;
       try {
          Bucket bucket;
-         if (cfg.isCompress())
-            bucket = (Bucket) marshaller.objectFromInputStream(new GZIPInputStream(blob
-                  .getContent()));
-         else
-            bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
+         final InputStream content = blob.getContent();
+         if (cfg.isCompress()) {
+            // TODO re-enable streaming
+            // everything is copied in memory in order to avoid an unfixed UnexpectedEOF exception
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            Streams.copy(content, bos);
+            final byte[] byteArray = bos.toByteArray();
+            ByteArrayInputStream bis = new ByteArrayInputStream(byteArray);
+            is = new BZip2CompressorInputStream(bis);
+            ByteArrayOutputStream bos2 = new ByteArrayOutputStream();
+            try {
+               Streams.copy(is, bos2);
+               final byte[] byteArray2 = bos2.toByteArray();
 
+               log.debug("Decompressed " + bucketName + " from " + byteArray.length + " -> "
+                        + byteArray2.length);
+
+               bucket = (Bucket) marshaller.objectFromInputStream(new ByteArrayInputStream(
+                        byteArray2));
+            } finally {
+               is.close();
+               bis.close();
+               bos.close();
+               bos2.close();
+            }
+         } else
+            bucket = (Bucket) marshaller.objectFromInputStream(content);
+
          if (bucket != null)
             bucket.setBucketName(bucketName);
          return bucket;
@@ -353,24 +385,22 @@
    private byte[] compress(final byte[] payloadBuffer) throws IOException {
       final ByteArrayOutputStream baos = new ByteArrayOutputStream();
       InputStream input = new ByteArrayInputStream(payloadBuffer);
-      GZIPOutputStream output = new GZIPOutputStream(baos);
-      byte[] buf = new byte[COMPRESSION_COPY_BYTEARRAY_SIZE];
-
-      int bytesRead = input.read(buf);
-      while (bytesRead != -1) {
-         output.write(buf, 0, bytesRead);
-         bytesRead = input.read(buf);
+      BZip2CompressorOutputStream output = new BZip2CompressorOutputStream(baos);
+      try {
+         Streams.copy(input, output);
+         return baos.toByteArray();
+      } finally {
+         output.close();
+         input.close();
+         baos.close();
       }
-      input.close();
-      output.close();
-      return baos.toByteArray();
    }
 
    private String encodeBucketName(String decodedName) {
       final String name = (decodedName.startsWith("-")) ? decodedName.replace('-', 'A')
-            : decodedName;
+               : decodedName;
       if (cfg.isCompress())
-         return name + ".gz";
+         return name + ".bz2";
       return name;
    }
 }



More information about the infinispan-commits mailing list