[infinispan-commits] Infinispan SVN: r1555 - trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Feb 26 01:51:08 EST 2010


Author: pvdyck
Date: 2010-02-26 01:51:08 -0500 (Fri, 26 Feb 2010)
New Revision: 1555

Modified:
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
Log:
[ISPN-357] (Create a compressing Marshaller wrapper) Compression Implementation 

Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-02-25 17:42:57 UTC (rev 1554)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-02-26 06:51:08 UTC (rev 1555)
@@ -1,6 +1,9 @@
 package org.infinispan.loaders.cloud;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.HashSet;
@@ -11,6 +14,8 @@
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import org.infinispan.Cache;
 import org.infinispan.config.ConfigurationException;
@@ -51,6 +56,7 @@
  * @since 4.0
  */
 public class CloudCacheStore extends BucketBasedCacheStore {
+   private static final int COMPRESSION_COPY_BYTEARRAY_SIZE = 1024;
    private static final Log log = LogFactory.getLog(CloudCacheStore.class);
    private final ThreadLocal<List<Future<?>>> asyncCommandFutures = new ThreadLocal<List<Future<?>>>();
    private CloudCacheStoreConfig cfg;
@@ -112,7 +118,8 @@
       containerName = getThisContainerName();
       try {
          if (constructInternalBlobstores) {
-            // add an executor as a constructor param to EnterpriseConfigurationModule, pass
+            // add an executor as a constructor param to
+            // EnterpriseConfigurationModule, pass
             // property overrides instead of Properties()
             ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg
                      .getIdentity(), cfg.getPassword(), ImmutableSet.of(
@@ -131,6 +138,7 @@
       }
    }
 
+   @Override
    protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
       Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
 
@@ -143,6 +151,7 @@
       return result;
    }
 
+   @Override
    protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
       String source;
       try {
@@ -153,10 +162,12 @@
       if (containerName.equals(source)) {
          log.info("Attempt to load the same cloud bucket ({0}) ignored", source);
       } else {
-         // TODO implement stream handling. What's the JClouds API to "copy" one bucket to another?
+         // TODO implement stream handling. What's the JClouds API to "copy"
+         // one bucket to another?
       }
    }
 
+   @Override
    protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
       try {
          objectOutput.writeObject(containerName);
@@ -165,13 +176,15 @@
       }
    }
 
+   @Override
    protected void clearLockSafe() {
       List<Future<?>> futures = asyncCommandFutures.get();
       if (futures == null) {
          // is a sync call
          blobStore.clearContainer(containerName);
       } else {
-         // is an async call - invoke clear() on the container asynchronously and store the future
+         // is an async call - invoke clear() on the container asynchronously
+         // and store the future
          // in the 'futures' collection
          futures.add(asyncBlobStore.clearContainer(containerName));
       }
@@ -185,6 +198,7 @@
       }
    }
 
+   @Override
    protected Bucket loadBucket(String hash) throws CacheLoaderException {
       return readFromBlob(blobStore.getBlob(containerName, encodeBucketName(hash)), hash);
    }
@@ -197,8 +211,10 @@
       }
    }
 
+   @Override
    protected void purgeInternal() throws CacheLoaderException {
-      // TODO can expiry data be stored in a blob's metadata? More efficient purging that way. See
+      // TODO can expiry data be stored in a blob's metadata? More efficient
+      // purging that way. See
       // https://jira.jboss.org/jira/browse/ISPN-334
       if (!cfg.isLazyPurgingOnly()) {
          acquireGlobalLock(false);
@@ -223,6 +239,7 @@
       }
    }
 
+   @Override
    protected void insertBucket(Bucket bucket) throws CacheLoaderException {
       Blob blob = blobStore.newBlob(encodeBucketName(bucket.getBucketName()));
       writeToBlob(blob, bucket);
@@ -232,7 +249,8 @@
          // is a sync call
          blobStore.putBlob(containerName, blob);
       } else {
-         // is an async call - invoke clear() on the container asynchronously and store the future
+         // is an async call - invoke clear() on the container asynchronously
+         // and store the future
          // in the 'futures' collection
          futures.add(asyncBlobStore.putBlob(containerName, blob));
       }
@@ -250,10 +268,12 @@
             CacheLoaderException exception = null;
             try {
                futures = asyncCommandFutures.get();
-               if (log.isTraceEnabled()) log.trace("Futures, in order: {0}", futures);
+               if (log.isTraceEnabled())
+                  log.trace("Futures, in order: {0}", futures);
                for (Future<?> f : futures) {
                   Object o = f.get();
-                  if (log.isTraceEnabled()) log.trace("Future {0} returned {1}", f, o);
+                  if (log.isTraceEnabled())
+                     log.trace("Future {0} returned {1}", f, o);
                }
             } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
@@ -269,13 +289,19 @@
       }
    }
 
+   @Override
    protected void updateBucket(Bucket bucket) throws CacheLoaderException {
       insertBucket(bucket);
    }
 
    private void writeToBlob(Blob blob, Bucket bucket) throws CacheLoaderException {
+
       try {
-         blob.setPayload(marshaller.objectToByteBuffer(bucket));
+         final byte[] payloadBuffer = marshaller.objectToByteBuffer(bucket);
+         if (cfg.isCompress())
+            blob.setPayload(compress(payloadBuffer));
+         else
+            blob.setPayload(payloadBuffer);
       } catch (IOException e) {
          throw new CacheLoaderException(e);
       }
@@ -285,16 +311,44 @@
       if (blob == null)
          return null;
       try {
-         Bucket bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
+         Bucket bucket;
+         if (cfg.isCompress())
+            bucket = (Bucket) marshaller.objectFromInputStream(new GZIPInputStream(blob
+                     .getContent()));
+         else
+            bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
+
          if (bucket != null)
             bucket.setBucketName(bucketName);
          return bucket;
-      } catch (Exception e) {
+      } catch (ClassNotFoundException e) {
          throw convertToCacheLoaderException("Unable to read blob", e);
+      } catch (IOException e) {
+         throw convertToCacheLoaderException("Class loading issue", e);
       }
    }
 
+   private byte[] compress(final byte[] payloadBuffer) throws IOException {
+      final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      InputStream input = new ByteArrayInputStream(payloadBuffer);
+      GZIPOutputStream output = new GZIPOutputStream(baos);
+      byte[] buf = new byte[COMPRESSION_COPY_BYTEARRAY_SIZE];
+
+      int bytesRead = input.read(buf);
+      while (bytesRead != -1) {
+         output.write(buf, 0, bytesRead);
+         bytesRead = input.read(buf);
+      }
+      input.close();
+      output.close();
+      return baos.toByteArray();
+   }
+
    private String encodeBucketName(String decodedName) {
-      return (decodedName.startsWith("-")) ? decodedName.replace('-', 'A') : decodedName;
+      final String name = (decodedName.startsWith("-")) ? decodedName.replace('-', 'A')
+               : decodedName;
+      if (cfg.isCompress())
+         return name + ".gz";
+      return name;
    }
 }

Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java	2010-02-25 17:42:57 UTC (rev 1554)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java	2010-02-26 06:51:08 UTC (rev 1555)
@@ -3,23 +3,38 @@
 import org.infinispan.loaders.LockSupportCacheStoreConfig;
 
 /**
- * The cache store config bean for the {@link org.infinispan.loaders.cloud.CloudCacheStore}.  This allows you to tune a number of characteristics
- * of the {@link org.infinispan.loaders.cloud.CloudCacheStore}.
+ * The cache store config bean for the {@link org.infinispan.loaders.cloud.CloudCacheStore}. This
+ * allows you to tune a number of characteristics of the
+ * {@link org.infinispan.loaders.cloud.CloudCacheStore}.
  * <p/>
- *  <ul>
- * <li><tt>identity</tt> - A String that identifies you to the cloud provider.  For example. with AWS, this is your ACCESS KEY.</li>
- * <li><tt>password</tt> - A String that is used to authenticate you with the cloud provider.  For example. with AWS, this is your SECRET KEY.</li>
- * <li><tt>bucketPrefix</tt> - A String that is prepended to generated buckets or containers on the cloud store.  Buckets or containers are named {bucketPrefix}-{cacheName}.</li>
- * <li><tt>proxyHost</tt> - The host name of a proxy to use.  Optional, no proxy is used if this is un-set.</li>
- * <li><tt>proxyPort</tt> - The port of a proxy to use.  Optional, no proxy is used if this is un-set.</li>
- * <li><tt>requestTimeout</tt> - A timeout to use when communicating with the cloud storage provider, in milliseconds.  Defaults to 10000.</li>
- * <li><tt>lazyPurgingOnly</tt> - If enabled, then expired entries are only purged on access, lazily, rather than by using the periodic eviction thread.  Defaults to <tt>true</tt>.</li>
- * <li><tt>cloudService</tt> - The cloud service to use.  Supported values are <tt>s3</tt> (Amazon AWS), <tt>cloudfiles</tt> (Rackspace Cloud), <tt>azureblob</tt> (Microsoft Azure), and <tt>atmos</tt> (Atmos Online Storage Service).</li>
- * <li><tt>maxConnections</tt> - The maximum number of concurrent connections to make to the cloud provider.  Defaults to 10.</li>
- * <li><tt>secure</tt> - Whether to use secure (SSL) connections or not.  Defaults to <tt>true</tt>.</li>
- * <li><tt>cloudServiceLocation</tt> - the data center to use.  Note that this is specific to the cloud provider in question.  E.g., Amazon's S3 service supports storage buckets in several different locations.  Valid strings for S3, for example, are <a href="http://github.com/jclouds/jclouds/blob/master/aws/core/src/main/java/org/jclouds/aws/domain/Region.java">here</a>.  Optional, and defaults to <tt>DEFAULT</tt>.</li>
+ * <ul>
+ * <li><tt>identity</tt> - A String that identifies you to the cloud provider. For example. with
+ * AWS, this is your ACCESS KEY.</li>
+ * <li><tt>password</tt> - A String that is used to authenticate you with the cloud provider. For
+ * example. with AWS, this is your SECRET KEY.</li>
+ * <li><tt>bucketPrefix</tt> - A String that is prepended to generated buckets or containers on the
+ * cloud store. Buckets or containers are named {bucketPrefix}-{cacheName}.</li>
+ * <li><tt>proxyHost</tt> - The host name of a proxy to use. Optional, no proxy is used if this is
+ * un-set.</li>
+ * <li><tt>proxyPort</tt> - The port of a proxy to use. Optional, no proxy is used if this is
+ * un-set.</li>
+ * <li><tt>requestTimeout</tt> - A timeout to use when communicating with the cloud storage
+ * provider, in milliseconds. Defaults to 10000.</li>
+ * <li><tt>lazyPurgingOnly</tt> - If enabled, then expired entries are only purged on access,
+ * lazily, rather than by using the periodic eviction thread. Defaults to <tt>true</tt>.</li>
+ * <li><tt>cloudService</tt> - The cloud service to use. Supported values are <tt>s3</tt> (Amazon
+ * AWS), <tt>cloudfiles</tt> (Rackspace Cloud), <tt>azureblob</tt> (Microsoft Azure), and
+ * <tt>atmos</tt> (Atmos Online Storage Service).</li>
+ * <li><tt>maxConnections</tt> - The maximum number of concurrent connections to make to the cloud
+ * provider. Defaults to 10.</li>
+ * <li><tt>secure</tt> - Whether to use secure (SSL) connections or not. Defaults to <tt>true</tt>.</li>
+ * <li><tt>compress</tt> - Whether to compress stored data. Defaults to <tt>true</tt>.</li>
+ * <li><tt>cloudServiceLocation</tt> - the data center to use. Note that this is specific to the
+ * cloud provider in question. E.g., Amazon's S3 service supports storage buckets in several
+ * different locations. Valid strings for S3, for example, are <a href="http://github.com/jclouds/jclouds/blob/master/aws/core/src/main/java/org/jclouds/aws/domain/Region.java"
+ * >here</a>. Optional, and defaults to <tt>DEFAULT</tt>.</li>
  * </ul>
- *
+ * 
  * @author Manik Surtani
  * @since 4.0
  */
@@ -34,7 +49,9 @@
    private String cloudService;
    private int maxConnections = 10000;
    private boolean secure = true;
-   private String cloudServiceLocation = "DEFAULT";
+   private boolean compress = true;
+
+   private final String cloudServiceLocation = "DEFAULT";
    private static final long serialVersionUID = -9011054600279256849L;
 
    public CloudCacheStoreConfig() {
@@ -121,26 +138,52 @@
       this.secure = secure;
    }
 
+   public boolean isCompress() {
+      return compress;
+   }
+
+   public void setCompress(boolean compress) {
+      this.compress = compress;
+   }
+
    @Override
    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-      if (!super.equals(o)) return false;
+      if (this == o)
+         return true;
+      if (o == null || getClass() != o.getClass())
+         return false;
+      if (!super.equals(o))
+         return false;
 
       CloudCacheStoreConfig that = (CloudCacheStoreConfig) o;
 
-      if (lazyPurgingOnly != that.lazyPurgingOnly) return false;
-      if (maxConnections != that.maxConnections) return false;
-      if (requestTimeout != that.requestTimeout) return false;
-      if (secure != that.secure) return false;
-      if (bucketPrefix != null ? !bucketPrefix.equals(that.bucketPrefix) : that.bucketPrefix != null) return false;
-      if (cloudService != null ? !cloudService.equals(that.cloudService) : that.cloudService != null) return false;
-      if (cloudServiceLocation != null ? !cloudServiceLocation.equals(that.cloudServiceLocation) : that.cloudServiceLocation != null)
+      if (lazyPurgingOnly != that.lazyPurgingOnly)
          return false;
-      if (identity != null ? !identity.equals(that.identity) : that.identity != null) return false;
-      if (password != null ? !password.equals(that.password) : that.password != null) return false;
-      if (proxyHost != null ? !proxyHost.equals(that.proxyHost) : that.proxyHost != null) return false;
-      if (proxyPort != null ? !proxyPort.equals(that.proxyPort) : that.proxyPort != null) return false;
+      if (maxConnections != that.maxConnections)
+         return false;
+      if (requestTimeout != that.requestTimeout)
+         return false;
+      if (secure != that.secure)
+         return false;
+      if (compress != that.compress)
+         return false;
+      if (bucketPrefix != null ? !bucketPrefix.equals(that.bucketPrefix)
+               : that.bucketPrefix != null)
+         return false;
+      if (cloudService != null ? !cloudService.equals(that.cloudService)
+               : that.cloudService != null)
+         return false;
+      if (cloudServiceLocation != null ? !cloudServiceLocation.equals(that.cloudServiceLocation)
+               : that.cloudServiceLocation != null)
+         return false;
+      if (identity != null ? !identity.equals(that.identity) : that.identity != null)
+         return false;
+      if (password != null ? !password.equals(that.password) : that.password != null)
+         return false;
+      if (proxyHost != null ? !proxyHost.equals(that.proxyHost) : that.proxyHost != null)
+         return false;
+      if (proxyPort != null ? !proxyPort.equals(that.proxyPort) : that.proxyPort != null)
+         return false;
 
       return true;
    }
@@ -158,25 +201,19 @@
       result = 31 * result + (cloudService != null ? cloudService.hashCode() : 0);
       result = 31 * result + maxConnections;
       result = 31 * result + (secure ? 1 : 0);
+      result = 31 * result + (compress ? 1 : 0);
       result = 31 * result + (cloudServiceLocation != null ? cloudServiceLocation.hashCode() : 0);
       return result;
    }
 
    @Override
    public String toString() {
-      return "CloudCacheStoreConfig{" +
-            "bucketPrefix='" + bucketPrefix + '\'' +
-            ", identity='" + identity + '\'' +
-            ", password='" + password + '\'' +
-            ", proxyHost='" + proxyHost + '\'' +
-            ", proxyPort='" + proxyPort + '\'' +
-            ", requestTimeout=" + requestTimeout +
-            ", lazyPurgingOnly=" + lazyPurgingOnly +
-            ", cloudService='" + cloudService + '\'' +
-            ", maxConnections=" + maxConnections +
-            ", secure=" + secure +
-            ", cloudServiceLocation='" + cloudServiceLocation + '\'' +
-            '}';
+      return "CloudCacheStoreConfig{" + "bucketPrefix='" + bucketPrefix + '\'' + ", identity='"
+               + identity + '\'' + ", password='" + password + '\'' + ", proxyHost='" + proxyHost
+               + '\'' + ", proxyPort='" + proxyPort + '\'' + ", requestTimeout=" + requestTimeout
+               + ", lazyPurgingOnly=" + lazyPurgingOnly + ", cloudService='" + cloudService + '\''
+               + ", maxConnections=" + maxConnections + ", secure=" + secure + ", compress="
+               + compress + ", cloudServiceLocation='" + cloudServiceLocation + '\'' + '}';
    }
 
    public String getCloudServiceLocation() {



More information about the infinispan-commits mailing list