[infinispan-commits] Infinispan SVN: r1555 - trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Feb 26 01:51:08 EST 2010
Author: pvdyck
Date: 2010-02-26 01:51:08 -0500 (Fri, 26 Feb 2010)
New Revision: 1555
Modified:
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
Log:
[ISPN-357] (Create a compressing Marshaller wrapper) Compression Implementation
Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-02-25 17:42:57 UTC (rev 1554)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-02-26 06:51:08 UTC (rev 1555)
@@ -1,6 +1,9 @@
package org.infinispan.loaders.cloud;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashSet;
@@ -11,6 +14,8 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import org.infinispan.Cache;
import org.infinispan.config.ConfigurationException;
@@ -51,6 +56,7 @@
* @since 4.0
*/
public class CloudCacheStore extends BucketBasedCacheStore {
+ private static final int COMPRESSION_COPY_BYTEARRAY_SIZE = 1024;
private static final Log log = LogFactory.getLog(CloudCacheStore.class);
private final ThreadLocal<List<Future<?>>> asyncCommandFutures = new ThreadLocal<List<Future<?>>>();
private CloudCacheStoreConfig cfg;
@@ -112,7 +118,8 @@
containerName = getThisContainerName();
try {
if (constructInternalBlobstores) {
- // add an executor as a constructor param to EnterpriseConfigurationModule, pass
+ // add an executor as a constructor param to
+ // EnterpriseConfigurationModule, pass
// property overrides instead of Properties()
ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg
.getIdentity(), cfg.getPassword(), ImmutableSet.of(
@@ -131,6 +138,7 @@
}
}
+ @Override
protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
@@ -143,6 +151,7 @@
return result;
}
+ @Override
protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
String source;
try {
@@ -153,10 +162,12 @@
if (containerName.equals(source)) {
log.info("Attempt to load the same cloud bucket ({0}) ignored", source);
} else {
- // TODO implement stream handling. What's the JClouds API to "copy" one bucket to another?
+ // TODO implement stream handling. What's the JClouds API to "copy"
+ // one bucket to another?
}
}
+ @Override
protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
try {
objectOutput.writeObject(containerName);
@@ -165,13 +176,15 @@
}
}
+ @Override
protected void clearLockSafe() {
List<Future<?>> futures = asyncCommandFutures.get();
if (futures == null) {
// is a sync call
blobStore.clearContainer(containerName);
} else {
- // is an async call - invoke clear() on the container asynchronously and store the future
+ // is an async call - invoke clear() on the container asynchronously
+ // and store the future
// in the 'futures' collection
futures.add(asyncBlobStore.clearContainer(containerName));
}
@@ -185,6 +198,7 @@
}
}
+ @Override
protected Bucket loadBucket(String hash) throws CacheLoaderException {
return readFromBlob(blobStore.getBlob(containerName, encodeBucketName(hash)), hash);
}
@@ -197,8 +211,10 @@
}
}
+ @Override
protected void purgeInternal() throws CacheLoaderException {
- // TODO can expiry data be stored in a blob's metadata? More efficient purging that way. See
+ // TODO can expiry data be stored in a blob's metadata? More efficient
+ // purging that way. See
// https://jira.jboss.org/jira/browse/ISPN-334
if (!cfg.isLazyPurgingOnly()) {
acquireGlobalLock(false);
@@ -223,6 +239,7 @@
}
}
+ @Override
protected void insertBucket(Bucket bucket) throws CacheLoaderException {
Blob blob = blobStore.newBlob(encodeBucketName(bucket.getBucketName()));
writeToBlob(blob, bucket);
@@ -232,7 +249,8 @@
// is a sync call
blobStore.putBlob(containerName, blob);
} else {
- // is an async call - invoke clear() on the container asynchronously and store the future
+ // is an async call - invoke clear() on the container asynchronously
+ // and store the future
// in the 'futures' collection
futures.add(asyncBlobStore.putBlob(containerName, blob));
}
@@ -250,10 +268,12 @@
CacheLoaderException exception = null;
try {
futures = asyncCommandFutures.get();
- if (log.isTraceEnabled()) log.trace("Futures, in order: {0}", futures);
+ if (log.isTraceEnabled())
+ log.trace("Futures, in order: {0}", futures);
for (Future<?> f : futures) {
Object o = f.get();
- if (log.isTraceEnabled()) log.trace("Future {0} returned {1}", f, o);
+ if (log.isTraceEnabled())
+ log.trace("Future {0} returned {1}", f, o);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
@@ -269,13 +289,19 @@
}
}
+ @Override
protected void updateBucket(Bucket bucket) throws CacheLoaderException {
insertBucket(bucket);
}
private void writeToBlob(Blob blob, Bucket bucket) throws CacheLoaderException {
+
try {
- blob.setPayload(marshaller.objectToByteBuffer(bucket));
+ final byte[] payloadBuffer = marshaller.objectToByteBuffer(bucket);
+ if (cfg.isCompress())
+ blob.setPayload(compress(payloadBuffer));
+ else
+ blob.setPayload(payloadBuffer);
} catch (IOException e) {
throw new CacheLoaderException(e);
}
@@ -285,16 +311,44 @@
if (blob == null)
return null;
try {
- Bucket bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
+ Bucket bucket;
+ if (cfg.isCompress())
+ bucket = (Bucket) marshaller.objectFromInputStream(new GZIPInputStream(blob
+ .getContent()));
+ else
+ bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
+
if (bucket != null)
bucket.setBucketName(bucketName);
return bucket;
- } catch (Exception e) {
+ } catch (ClassNotFoundException e) {
throw convertToCacheLoaderException("Unable to read blob", e);
+ } catch (IOException e) {
+ throw convertToCacheLoaderException("Class loading issue", e);
}
}
+ private byte[] compress(final byte[] payloadBuffer) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ InputStream input = new ByteArrayInputStream(payloadBuffer);
+ GZIPOutputStream output = new GZIPOutputStream(baos);
+ byte[] buf = new byte[COMPRESSION_COPY_BYTEARRAY_SIZE];
+
+ int bytesRead = input.read(buf);
+ while (bytesRead != -1) {
+ output.write(buf, 0, bytesRead);
+ bytesRead = input.read(buf);
+ }
+ input.close();
+ output.close();
+ return baos.toByteArray();
+ }
+
private String encodeBucketName(String decodedName) {
- return (decodedName.startsWith("-")) ? decodedName.replace('-', 'A') : decodedName;
+ final String name = (decodedName.startsWith("-")) ? decodedName.replace('-', 'A')
+ : decodedName;
+ if (cfg.isCompress())
+ return name + ".gz";
+ return name;
}
}
Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java 2010-02-25 17:42:57 UTC (rev 1554)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java 2010-02-26 06:51:08 UTC (rev 1555)
@@ -3,23 +3,38 @@
import org.infinispan.loaders.LockSupportCacheStoreConfig;
/**
- * The cache store config bean for the {@link org.infinispan.loaders.cloud.CloudCacheStore}. This allows you to tune a number of characteristics
- * of the {@link org.infinispan.loaders.cloud.CloudCacheStore}.
+ * The cache store config bean for the {@link org.infinispan.loaders.cloud.CloudCacheStore}. This
+ * allows you to tune a number of characteristics of the
+ * {@link org.infinispan.loaders.cloud.CloudCacheStore}.
* <p/>
- * <ul>
- * <li><tt>identity</tt> - A String that identifies you to the cloud provider. For example. with AWS, this is your ACCESS KEY.</li>
- * <li><tt>password</tt> - A String that is used to authenticate you with the cloud provider. For example. with AWS, this is your SECRET KEY.</li>
- * <li><tt>bucketPrefix</tt> - A String that is prepended to generated buckets or containers on the cloud store. Buckets or containers are named {bucketPrefix}-{cacheName}.</li>
- * <li><tt>proxyHost</tt> - The host name of a proxy to use. Optional, no proxy is used if this is un-set.</li>
- * <li><tt>proxyPort</tt> - The port of a proxy to use. Optional, no proxy is used if this is un-set.</li>
- * <li><tt>requestTimeout</tt> - A timeout to use when communicating with the cloud storage provider, in milliseconds. Defaults to 10000.</li>
- * <li><tt>lazyPurgingOnly</tt> - If enabled, then expired entries are only purged on access, lazily, rather than by using the periodic eviction thread. Defaults to <tt>true</tt>.</li>
- * <li><tt>cloudService</tt> - The cloud service to use. Supported values are <tt>s3</tt> (Amazon AWS), <tt>cloudfiles</tt> (Rackspace Cloud), <tt>azureblob</tt> (Microsoft Azure), and <tt>atmos</tt> (Atmos Online Storage Service).</li>
- * <li><tt>maxConnections</tt> - The maximum number of concurrent connections to make to the cloud provider. Defaults to 10.</li>
- * <li><tt>secure</tt> - Whether to use secure (SSL) connections or not. Defaults to <tt>true</tt>.</li>
- * <li><tt>cloudServiceLocation</tt> - the data center to use. Note that this is specific to the cloud provider in question. E.g., Amazon's S3 service supports storage buckets in several different locations. Valid strings for S3, for example, are <a href="http://github.com/jclouds/jclouds/blob/master/aws/core/src/main/java/org/jclouds/aws/domain/Region.java">here</a>. Optional, and defaults to <tt>DEFAULT</tt>.</li>
+ * <ul>
+ * <li><tt>identity</tt> - A String that identifies you to the cloud provider. For example. with
+ * AWS, this is your ACCESS KEY.</li>
+ * <li><tt>password</tt> - A String that is used to authenticate you with the cloud provider. For
+ * example. with AWS, this is your SECRET KEY.</li>
+ * <li><tt>bucketPrefix</tt> - A String that is prepended to generated buckets or containers on the
+ * cloud store. Buckets or containers are named {bucketPrefix}-{cacheName}.</li>
+ * <li><tt>proxyHost</tt> - The host name of a proxy to use. Optional, no proxy is used if this is
+ * un-set.</li>
+ * <li><tt>proxyPort</tt> - The port of a proxy to use. Optional, no proxy is used if this is
+ * un-set.</li>
+ * <li><tt>requestTimeout</tt> - A timeout to use when communicating with the cloud storage
+ * provider, in milliseconds. Defaults to 10000.</li>
+ * <li><tt>lazyPurgingOnly</tt> - If enabled, then expired entries are only purged on access,
+ * lazily, rather than by using the periodic eviction thread. Defaults to <tt>true</tt>.</li>
+ * <li><tt>cloudService</tt> - The cloud service to use. Supported values are <tt>s3</tt> (Amazon
+ * AWS), <tt>cloudfiles</tt> (Rackspace Cloud), <tt>azureblob</tt> (Microsoft Azure), and
+ * <tt>atmos</tt> (Atmos Online Storage Service).</li>
+ * <li><tt>maxConnections</tt> - The maximum number of concurrent connections to make to the cloud
+ * provider. Defaults to 10.</li>
+ * <li><tt>secure</tt> - Whether to use secure (SSL) connections or not. Defaults to <tt>true</tt>.</li>
+ * <li><tt>compress</tt> - Whether to compress stored data. Defaults to <tt>true</tt>.</li>
+ * <li><tt>cloudServiceLocation</tt> - the data center to use. Note that this is specific to the
+ * cloud provider in question. E.g., Amazon's S3 service supports storage buckets in several
+ * different locations. Valid strings for S3, for example, are <a href="http://github.com/jclouds/jclouds/blob/master/aws/core/src/main/java/org/jclouds/aws/domain/Region.java"
+ * >here</a>. Optional, and defaults to <tt>DEFAULT</tt>.</li>
* </ul>
- *
+ *
* @author Manik Surtani
* @since 4.0
*/
@@ -34,7 +49,9 @@
private String cloudService;
private int maxConnections = 10000;
private boolean secure = true;
- private String cloudServiceLocation = "DEFAULT";
+ private boolean compress = true;
+
+ private final String cloudServiceLocation = "DEFAULT";
private static final long serialVersionUID = -9011054600279256849L;
public CloudCacheStoreConfig() {
@@ -121,26 +138,52 @@
this.secure = secure;
}
+ public boolean isCompress() {
+ return compress;
+ }
+
+ public void setCompress(boolean compress) {
+ this.compress = compress;
+ }
+
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- if (!super.equals(o)) return false;
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ if (!super.equals(o))
+ return false;
CloudCacheStoreConfig that = (CloudCacheStoreConfig) o;
- if (lazyPurgingOnly != that.lazyPurgingOnly) return false;
- if (maxConnections != that.maxConnections) return false;
- if (requestTimeout != that.requestTimeout) return false;
- if (secure != that.secure) return false;
- if (bucketPrefix != null ? !bucketPrefix.equals(that.bucketPrefix) : that.bucketPrefix != null) return false;
- if (cloudService != null ? !cloudService.equals(that.cloudService) : that.cloudService != null) return false;
- if (cloudServiceLocation != null ? !cloudServiceLocation.equals(that.cloudServiceLocation) : that.cloudServiceLocation != null)
+ if (lazyPurgingOnly != that.lazyPurgingOnly)
return false;
- if (identity != null ? !identity.equals(that.identity) : that.identity != null) return false;
- if (password != null ? !password.equals(that.password) : that.password != null) return false;
- if (proxyHost != null ? !proxyHost.equals(that.proxyHost) : that.proxyHost != null) return false;
- if (proxyPort != null ? !proxyPort.equals(that.proxyPort) : that.proxyPort != null) return false;
+ if (maxConnections != that.maxConnections)
+ return false;
+ if (requestTimeout != that.requestTimeout)
+ return false;
+ if (secure != that.secure)
+ return false;
+ if (compress != that.compress)
+ return false;
+ if (bucketPrefix != null ? !bucketPrefix.equals(that.bucketPrefix)
+ : that.bucketPrefix != null)
+ return false;
+ if (cloudService != null ? !cloudService.equals(that.cloudService)
+ : that.cloudService != null)
+ return false;
+ if (cloudServiceLocation != null ? !cloudServiceLocation.equals(that.cloudServiceLocation)
+ : that.cloudServiceLocation != null)
+ return false;
+ if (identity != null ? !identity.equals(that.identity) : that.identity != null)
+ return false;
+ if (password != null ? !password.equals(that.password) : that.password != null)
+ return false;
+ if (proxyHost != null ? !proxyHost.equals(that.proxyHost) : that.proxyHost != null)
+ return false;
+ if (proxyPort != null ? !proxyPort.equals(that.proxyPort) : that.proxyPort != null)
+ return false;
return true;
}
@@ -158,25 +201,19 @@
result = 31 * result + (cloudService != null ? cloudService.hashCode() : 0);
result = 31 * result + maxConnections;
result = 31 * result + (secure ? 1 : 0);
+ result = 31 * result + (compress ? 1 : 0);
result = 31 * result + (cloudServiceLocation != null ? cloudServiceLocation.hashCode() : 0);
return result;
}
@Override
public String toString() {
- return "CloudCacheStoreConfig{" +
- "bucketPrefix='" + bucketPrefix + '\'' +
- ", identity='" + identity + '\'' +
- ", password='" + password + '\'' +
- ", proxyHost='" + proxyHost + '\'' +
- ", proxyPort='" + proxyPort + '\'' +
- ", requestTimeout=" + requestTimeout +
- ", lazyPurgingOnly=" + lazyPurgingOnly +
- ", cloudService='" + cloudService + '\'' +
- ", maxConnections=" + maxConnections +
- ", secure=" + secure +
- ", cloudServiceLocation='" + cloudServiceLocation + '\'' +
- '}';
+ return "CloudCacheStoreConfig{" + "bucketPrefix='" + bucketPrefix + '\'' + ", identity='"
+ + identity + '\'' + ", password='" + password + '\'' + ", proxyHost='" + proxyHost
+ + '\'' + ", proxyPort='" + proxyPort + '\'' + ", requestTimeout=" + requestTimeout
+ + ", lazyPurgingOnly=" + lazyPurgingOnly + ", cloudService='" + cloudService + '\''
+ + ", maxConnections=" + maxConnections + ", secure=" + secure + ", compress="
+ + compress + ", cloudServiceLocation='" + cloudServiceLocation + '\'' + '}';
}
public String getCloudServiceLocation() {
More information about the infinispan-commits
mailing list