[infinispan-commits] Infinispan SVN: r1715 - in trunk/cachestore/cloud: src/main/java/org/infinispan/loaders/cloud and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Apr 22 08:45:13 EDT 2010
Author: pvdyck
Date: 2010-04-22 08:45:11 -0400 (Thu, 22 Apr 2010)
New Revision: 1715
Modified:
trunk/cachestore/cloud/pom.xml
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
Log:
[ISPN-409] (CloudCacheStore GZipInputStream generates EOFException)
Modified: trunk/cachestore/cloud/pom.xml
===================================================================
--- trunk/cachestore/cloud/pom.xml 2010-04-22 10:50:35 UTC (rev 1714)
+++ trunk/cachestore/cloud/pom.xml 2010-04-22 12:45:11 UTC (rev 1715)
@@ -4,7 +4,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
- <groupId>org.infinispan</groupId>
+ <groupId>org.infinispan</groupId>
<artifactId>infinispan-cachestore-parent</artifactId>
<version>4.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
@@ -64,6 +64,13 @@
<scope>test</scope>
</dependency>
<!-- =========================================================== -->
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.0</version>
+ </dependency>
+
</dependencies>
<build>
Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-04-22 10:50:35 UTC (rev 1714)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-04-22 12:45:11 UTC (rev 1715)
@@ -1,6 +1,21 @@
package org.infinispan.loaders.cloud;
-import com.google.common.collect.ImmutableSet;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.infinispan.Cache;
import org.infinispan.config.ConfigurationException;
import org.infinispan.container.entries.InternalCacheEntry;
@@ -14,6 +29,7 @@
import org.infinispan.marshall.Marshaller;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import org.jboss.util.stream.Streams;
import org.jclouds.blobstore.AsyncBlobStore;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
@@ -24,34 +40,19 @@
import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
import org.jclouds.logging.log4j.config.Log4JLoggingModule;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
+import com.google.common.collect.ImmutableSet;
/**
- * The CloudCacheStore implementation that utilizes <a href="http://code.google.com/p/jclouds">JClouds</a> to
- * communicate with cloud storage providers such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
- * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or any other such
- * provider supported by JClouds.
+ * The CloudCacheStore implementation that utilizes <a
+ * href="http://code.google.com/p/jclouds">JClouds</a> to communicate with cloud storage providers
+ * such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
+ * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or
+ * any other such provider supported by JClouds.
* <p/>
- * This file store stores stuff in the following format: <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
+ * This file store stores stuff in the following format:
+ * <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
* <p/>
- *
+ *
* @author Manik Surtani
* @author Adrian Cole
* @since 4.0
@@ -77,7 +78,7 @@
private String getThisContainerName() {
return cfg.getBucketPrefix() + "-"
- + cache.getName().toLowerCase().replace("_", "").replace(".", "");
+ + cache.getName().toLowerCase().replace("_", "").replace(".", "");
}
@Override
@@ -87,14 +88,14 @@
@Override
public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m)
- throws CacheLoaderException {
+ throws CacheLoaderException {
this.cfg = (CloudCacheStoreConfig) cfg;
init(cfg, cache, m, null, null, null, true);
}
public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m, BlobStoreContext ctx,
- BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
- throws CacheLoaderException {
+ BlobStore blobStore, AsyncBlobStore asyncBlobStore, boolean constructInternalBlobstores)
+ throws CacheLoaderException {
super.init(cfg, cache, m);
this.cfg = (CloudCacheStoreConfig) cfg;
this.cache = cache;
@@ -125,9 +126,9 @@
// EnterpriseConfigurationModule, pass
// property overrides instead of Properties()
ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg
- .getIdentity(), cfg.getPassword(), ImmutableSet.of(
- new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
- new Properties());
+ .getIdentity(), cfg.getPassword(), ImmutableSet.of(
+ new EnterpriseConfigurationModule(), new Log4JLoggingModule()),
+ new Properties());
blobStore = ctx.getBlobStore();
asyncBlobStore = ctx.getAsyncBlobStore();
}
@@ -145,8 +146,10 @@
protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
for (Map.Entry<String, Blob> entry : ctx.createBlobMap(containerName).entrySet()) {
Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
- if (bucket.removeExpiredEntries()) updateBucket(bucket);
- if (handler.handle(bucket)) break;
+ if (bucket.removeExpiredEntries())
+ updateBucket(bucket);
+ if (handler.handle(bucket))
+ break;
}
}
@@ -208,7 +211,8 @@
// TODO do we need to scroll through the PageSet?
for (StorageMetadata sm : ps) {
long lastExpirableEntry = readLastExpirableEntryFromMetadata(sm.getUserMetadata());
- if (lastExpirableEntry < currentTime) scanBlobForExpiredEntries(sm.getName());
+ if (lastExpirableEntry < currentTime)
+ scanBlobForExpiredEntries(sm.getName());
}
}
@@ -216,7 +220,8 @@
Blob blob = blobStore.getBlob(containerName, blobName);
try {
Bucket bucket = readFromBlob(blob, blobName);
- if (bucket.removeExpiredEntries()) updateBucket(bucket);
+ if (bucket.removeExpiredEntries())
+ updateBucket(bucket);
} catch (CacheLoaderException e) {
log.warn("Unable to read blob at {0}", blobName, e);
}
@@ -225,7 +230,8 @@
private long readLastExpirableEntryFromMetadata(Map<String, String> metadata) {
String eet = metadata.get(EARLIEST_EXPIRY_TIME);
long eetLong = -1;
- if (eet != null) eetLong = Long.parseLong(eet);
+ if (eet != null)
+ eetLong = Long.parseLong(eet);
return eetLong;
}
@@ -272,7 +278,7 @@
@Override
public void applyModifications(List<? extends Modification> modifications)
- throws CacheLoaderException {
+ throws CacheLoaderException {
List<Future<?>> futures = new LinkedList<Future<?>>();
asyncCommandFutures.set(futures);
@@ -293,7 +299,7 @@
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
exception = convertToCacheLoaderException("Caught exception in async process", ee
- .getCause());
+ .getCause());
}
if (exception != null)
throw exception;
@@ -308,8 +314,10 @@
for (InternalCacheEntry e : bucket.getEntries().values()) {
long t = e.getExpiryTime();
if (t != -1) {
- if (earliestExpiryTime == -1) earliestExpiryTime = t;
- else earliestExpiryTime = Math.min(earliestExpiryTime, t);
+ if (earliestExpiryTime == -1)
+ earliestExpiryTime = t;
+ else
+ earliestExpiryTime = Math.min(earliestExpiryTime, t);
}
}
@@ -320,7 +328,8 @@
else
blob.setPayload(payloadBuffer);
if (earliestExpiryTime > -1) {
- Map<String, String> md = Collections.singletonMap(EARLIEST_EXPIRY_TIME, String.valueOf(earliestExpiryTime));
+ Map<String, String> md = Collections.singletonMap(EARLIEST_EXPIRY_TIME, String
+ .valueOf(earliestExpiryTime));
blob.getMetadata().setUserMetadata(md);
}
@@ -332,14 +341,37 @@
private Bucket readFromBlob(Blob blob, String bucketName) throws CacheLoaderException {
if (blob == null)
return null;
+ BZip2CompressorInputStream is = null;
try {
Bucket bucket;
- if (cfg.isCompress())
- bucket = (Bucket) marshaller.objectFromInputStream(new GZIPInputStream(blob
- .getContent()));
- else
- bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
+ final InputStream content = blob.getContent();
+ if (cfg.isCompress()) {
+ // TODO re-enable streaming
+ // everything is copied in memory in order to avoid an unfixed UnexpectedEOF exception
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Streams.copy(content, bos);
+ final byte[] byteArray = bos.toByteArray();
+ ByteArrayInputStream bis = new ByteArrayInputStream(byteArray);
+ is = new BZip2CompressorInputStream(bis);
+ ByteArrayOutputStream bos2 = new ByteArrayOutputStream();
+ try {
+ Streams.copy(is, bos2);
+ final byte[] byteArray2 = bos2.toByteArray();
+ log.debug("Decompressed " + bucketName + " from " + byteArray.length + " -> "
+ + byteArray2.length);
+
+ bucket = (Bucket) marshaller.objectFromInputStream(new ByteArrayInputStream(
+ byteArray2));
+ } finally {
+ is.close();
+ bis.close();
+ bos.close();
+ bos2.close();
+ }
+ } else
+ bucket = (Bucket) marshaller.objectFromInputStream(content);
+
if (bucket != null)
bucket.setBucketName(bucketName);
return bucket;
@@ -353,24 +385,22 @@
private byte[] compress(final byte[] payloadBuffer) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
InputStream input = new ByteArrayInputStream(payloadBuffer);
- GZIPOutputStream output = new GZIPOutputStream(baos);
- byte[] buf = new byte[COMPRESSION_COPY_BYTEARRAY_SIZE];
-
- int bytesRead = input.read(buf);
- while (bytesRead != -1) {
- output.write(buf, 0, bytesRead);
- bytesRead = input.read(buf);
+ BZip2CompressorOutputStream output = new BZip2CompressorOutputStream(baos);
+ try {
+ Streams.copy(input, output);
+ return baos.toByteArray();
+ } finally {
+ output.close();
+ input.close();
+ baos.close();
}
- input.close();
- output.close();
- return baos.toByteArray();
}
private String encodeBucketName(String decodedName) {
final String name = (decodedName.startsWith("-")) ? decodedName.replace('-', 'A')
- : decodedName;
+ : decodedName;
if (cfg.isCompress())
- return name + ".gz";
+ return name + ".bz2";
return name;
}
}
More information about the infinispan-commits
mailing list