[infinispan-commits] Infinispan SVN: r1385 - in trunk/cachestore/cloud: src/main and 5 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Jan 18 06:49:39 EST 2010


Author: manik.surtani at jboss.com
Date: 2010-01-18 06:49:39 -0500 (Mon, 18 Jan 2010)
New Revision: 1385

Added:
   trunk/cachestore/cloud/src/main/java/
   trunk/cachestore/cloud/src/main/java/org/
   trunk/cachestore/cloud/src/main/java/org/infinispan/
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudConnectionException.java
Removed:
   trunk/cachestore/cloud/src/main/scala/
Modified:
   trunk/cachestore/cloud/pom.xml
Log:
Reverted implementation back to Java from Scala, due to poor Scaladoc -> Javadoc integration.

Modified: trunk/cachestore/cloud/pom.xml
===================================================================
--- trunk/cachestore/cloud/pom.xml	2010-01-16 17:35:57 UTC (rev 1384)
+++ trunk/cachestore/cloud/pom.xml	2010-01-18 11:49:39 UTC (rev 1385)
@@ -20,18 +20,7 @@
    </properties>
 
    <dependencies>
-      <!-- this module is written in Scala rather than Java -->
       <dependency>
-         <groupId>org.scala-lang</groupId>
-         <artifactId>scala-library</artifactId>
-         <version>${scala.version}</version>
-      </dependency>
-      <dependency>
-          <groupId>org.scala-tools</groupId>
-          <artifactId>javautils</artifactId>
-          <version>2.7.4-0.1</version>
-        </dependency>
-      <dependency>
          <groupId>org.jclouds</groupId>
          <artifactId>jclouds-blobstore</artifactId>
          <version>${version.jclouds}</version>
@@ -87,36 +76,8 @@
    </repositories>
 
    <build>
-      <sourceDirectory>src/main/scala</sourceDirectory>
-
       <plugins>
          <plugin>
-            <groupId>org.scala-tools</groupId>
-            <artifactId>maven-scala-plugin</artifactId>
-            <executions>
-               <execution>
-                  <id>compile</id>
-                  <goals>
-                     <goal>compile</goal>
-                  </goals>
-                  <phase>compile</phase>
-               </execution>
-               <execution>
-                  <id>test-compile</id>
-                  <goals>
-                     <goal>testCompile</goal>
-                  </goals>
-                  <phase>test-compile</phase>
-               </execution>
-               <execution>
-                  <phase>process-resources</phase>
-                  <goals>
-                     <goal>compile</goal>
-                  </goals>
-               </execution>
-            </executions>
-         </plugin>
-         <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-surefire-plugin</artifactId>
             <version>2.4.3</version>

Added: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	                        (rev 0)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-01-18 11:49:39 UTC (rev 1385)
@@ -0,0 +1,275 @@
+package org.infinispan.loaders.cloud;
+
+import com.google.common.collect.ImmutableSet;
+import org.infinispan.Cache;
+import org.infinispan.config.ConfigurationException;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.loaders.CacheLoaderConfig;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheStoreConfig;
+import org.infinispan.loaders.bucket.Bucket;
+import org.infinispan.loaders.bucket.BucketBasedCacheStore;
+import org.infinispan.loaders.modifications.Modification;
+import org.infinispan.marshall.Marshaller;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.jclouds.blobstore.AsyncBlobStore;
+import org.jclouds.blobstore.BlobMap;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.BlobStoreContextFactory;
+import org.jclouds.blobstore.KeyNotFoundException;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
+import org.jclouds.logging.log4j.config.Log4JLoggingModule;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+
+/**
+ * The CloudCacheStore implementation that utilizes <a href="http://code.google.com/p/jclouds">JClouds</a> to
+ * communicate with cloud storage providers such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
+ * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or any other such
+ * provider supported by JClouds.
+ * <p/>
+ * This file store stores stuff in the following format: <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
+ * <p/>
+ *
+ * @author Manik Surtani
+ * @author Adrian Cole
+ * @since 4.0
+ */
+public class CloudCacheStore extends BucketBasedCacheStore {
+   private static final Log log = LogFactory.getLog(CloudCacheStore.class);
+   private final ThreadLocal<Set<Future<?>>> asyncCommandFutures = new ThreadLocal<Set<Future<?>>>();
+   private CloudCacheStoreConfig cfg;
+   private String containerName;
+   private BlobStoreContext ctx;
+   private BlobStore blobStore;
+   private AsyncBlobStore asyncBlobStore;
+   private boolean pollFutures = false;
+
+   @Override
+   public Class<? extends CacheStoreConfig> getConfigurationClass() {
+      return CloudCacheStoreConfig.class;
+   }
+
+   private String getThisContainerName() {
+      return cfg.getBucketPrefix() + "-" + cache.getName().toLowerCase();
+   }
+
+   @Override
+   protected boolean supportsMultiThreadedPurge() {
+      return true;
+   }
+
+   @Override
+   public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException {
+      this.cfg = (CloudCacheStoreConfig) cfg;
+      init(cfg, cache, m, null, null, null);
+   }
+
+   public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m, BlobStoreContext<?, ?> ctx,
+                    BlobStore blobStore, AsyncBlobStore asyncBlobStore) throws CacheLoaderException {
+      super.init(cfg, cache, m);
+      this.cfg = (CloudCacheStoreConfig) cfg;
+      this.cache = cache;
+      this.marshaller = m;
+      this.ctx = ctx;
+      this.blobStore = blobStore;
+      this.asyncBlobStore = asyncBlobStore;
+   }
+
+   @Override
+   public void start() throws CacheLoaderException {
+      super.start();
+      if (cfg.getCloudService() == null) throw new ConfigurationException("CloudService must be set!");
+      if (cfg.getIdentity() == null) throw new ConfigurationException("Identity must be set");
+      if (cfg.getPassword() == null) throw new ConfigurationException("Password must be set");
+      if (cfg.getBucketPrefix() == null) throw new ConfigurationException("CloudBucket must be set");
+      containerName = getThisContainerName();
+      try {
+         // add an executor as a constructor param to EnterpriseConfigurationModule, pass property overrides instead of Properties()
+         ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg.getIdentity(), cfg.getPassword(),
+                                                           ImmutableSet.of(new EnterpriseConfigurationModule(), new Log4JLoggingModule()), new Properties());
+         blobStore = ctx.getBlobStore();
+         asyncBlobStore = ctx.getAsyncBlobStore();
+
+         // the "location" is not currently used.
+         if (!blobStore.containerExists(containerName)) blobStore.createContainerInLocation(null, containerName);
+         pollFutures = !cfg.getAsyncStoreConfig().isEnabled();
+      } catch (IOException ioe) {
+         throw new CacheLoaderException("Unable to create context", ioe);
+      }
+   }
+
+   protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
+      Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
+
+      for (Map.Entry<String, Blob> entry : ctx.createBlobMap(containerName).entrySet()) {
+         Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
+         if (bucket.removeExpiredEntries()) updateBucket(bucket);
+         result.addAll(bucket.getStoredEntries());
+      }
+      return result;
+   }
+
+   protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
+      String source = null;
+      try {
+         source = (String) objectInput.readObject();
+      } catch (Exception e) {
+         throw convertToCacheLoaderException("Error while reading from stream", e);
+      }
+      if (containerName.equals(source)) {
+         log.info("Attempt to load the same cloud bucket ({0}) ignored", source);
+      } else {
+         // TODO implement stream handling.   What's the JClouds API to "copy" one bucket to another?
+      }
+   }
+
+   protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
+      try {
+         objectOutput.writeObject(containerName);
+      } catch (Exception e) {
+         throw convertToCacheLoaderException("Error while writing to stream", e);
+      }
+   }
+
+   protected void clearLockSafe() {
+      Set<Future<?>> futures = asyncCommandFutures.get();
+      if (futures == null) {
+         // is a sync call
+         blobStore.clearContainer(containerName);
+      } else {
+         // is an async call - invoke clear() on the container asynchronously and store the future in the 'futures' collection
+         futures.add(asyncBlobStore.clearContainer(containerName));
+      }
+   }
+
+   private CacheLoaderException convertToCacheLoaderException(String m, Throwable c) {
+      if (c instanceof CacheLoaderException) {
+         return (CacheLoaderException) c;
+      } else {
+         return new CacheLoaderException(m, c);
+      }
+   }
+
+   protected Bucket loadBucket(String hash) throws CacheLoaderException {
+      try {
+         return readFromBlob(blobStore.getBlob(containerName, hash), hash);
+      } catch (KeyNotFoundException e) {
+         return null;
+      }
+   }
+
+   private void purge(BlobMap blobMap) throws CacheLoaderException {
+      for (Map.Entry<String, Blob> entry : blobMap.entrySet()) {
+         Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
+         if (bucket.removeExpiredEntries()) updateBucket(bucket);
+      }
+   }
+
+   protected void purgeInternal() throws CacheLoaderException {
+      // TODO can expiry data be stored in a blob's metadata?  More efficient purging that way.  See https://jira.jboss.org/jira/browse/ISPN-334
+      if (!cfg.isLazyPurgingOnly()) {
+         acquireGlobalLock(false);
+         try {
+            final BlobMap blobMap = ctx.createBlobMap(containerName);
+            if (multiThreadedPurge) {
+               purgerService.execute(new Runnable() {
+                  public void run() {
+                     try {
+                        purge(blobMap);
+                     } catch (Exception e) {
+                        log.warn("Problems purging", e);
+                     }
+                  }
+               });
+            } else {
+               purge(blobMap);
+            }
+         } finally {
+            releaseGlobalLock(false);
+         }
+      }
+   }
+
+   protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+      Blob blob = blobStore.newBlob(getBucketName(bucket));
+      writeToBlob(blob, bucket);
+
+      Set<Future<?>> futures = asyncCommandFutures.get();
+      if (futures == null) {
+         // is a sync call
+         blobStore.putBlob(containerName, blob);
+      } else {
+         // is an async call - invoke clear() on the container asynchronously and store the future in the 'futures' collection
+         futures.add(asyncBlobStore.putBlob(containerName, blob));
+      }
+   }
+
+   @Override
+   public void applyModifications(List<? extends Modification> modifications) throws CacheLoaderException {
+      Set<Future<?>> futures = new HashSet<Future<?>>();
+      asyncCommandFutures.set(futures);
+
+      try {
+         super.applyModifications(modifications);
+         if (pollFutures) {
+            CacheLoaderException exception = null;
+            try {
+               for (Future<?> f : asyncCommandFutures.get()) f.get();
+            } catch (InterruptedException ie) {
+               Thread.currentThread().interrupt();
+            } catch (ExecutionException ee) {
+               exception = convertToCacheLoaderException("Caught exception in async process", ee.getCause());
+            }
+            if (exception != null) throw exception;
+         }
+      } finally {
+         asyncCommandFutures.remove();
+      }
+   }
+
+   protected void updateBucket(Bucket bucket) throws CacheLoaderException {
+      insertBucket(bucket);
+   }
+
+   private void writeToBlob(Blob blob, Bucket bucket) throws CacheLoaderException {
+      try {
+         blob.setPayload(
+               marshaller.objectToByteBuffer(bucket));
+      } catch (IOException e) {
+         throw new CacheLoaderException(e);
+      }
+   }
+
+   private Bucket readFromBlob(Blob blob, String bucketName) throws CacheLoaderException {
+      if (blob == null) return null;
+      try {
+         Bucket bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
+         if (bucket != null) bucket.setBucketName(
+               bucketName);
+         return bucket;
+      } catch (Exception e) {
+         throw convertToCacheLoaderException("Unable to read blob", e);
+      }
+   }
+
+   private String getBucketName(Bucket bucket) {
+      log.warn("Bucket is {0}", bucket);
+      String bucketName = bucket.getBucketName();
+      if (bucketName.startsWith("-")) bucketName = bucketName.replace("-", "A");
+      return bucketName;
+   }
+}


Property changes on: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java	                        (rev 0)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java	2010-01-18 11:49:39 UTC (rev 1385)
@@ -0,0 +1,174 @@
+package org.infinispan.loaders.cloud;
+
+import org.infinispan.loaders.LockSupportCacheStoreConfig;
+
+/**
+ * The cache store config bean for the {@link org.infinispan.loaders.cloud.CloudCacheStore}.  This allows you to tune a number of characteristics
+ * of the {@link org.infinispan.loaders.cloud.CloudCacheStore}.
+ * <p/>
+ *  <ul>
+ * <li><tt>identity</tt> - A String that identifies you to the cloud provider.  For example. with AWS, this is your ACCESS KEY.</li>
+ * <li><tt>password</tt> - A String that is used to authenticate you with the cloud provider.  For example. with AWS, this is your SECRET KEY.</li>
+ * <li><tt>bucketPrefix</tt> - A String that is prepended to generated buckets or containers on the cloud store.  Buckets or containers are named {bucketPrefix}-{cacheName}.</li>
+ * <li><tt>proxyHost</tt> - The host name of a proxy to use.  Optional, no proxy is used if this is un-set.</li>
+ * <li><tt>proxyPort</tt> - The port of a proxy to use.  Optional, no proxy is used if this is un-set.</li>
+ * <li><tt>requestTimeout</tt> - A timeout to use when communicating with the cloud storage provider, in milliseconds.  Defaults to 10000.</li>
+ * <li><tt>lazyPurgingOnly</tt> - If enabled, then expired entries are only purged on access, lazily, rather than by using the periodic eviction thread.  Defaults to <tt>true</tt>.</li>
+ * <li><tt>cloudService</tt> - The cloud service to use.  Supported values are <tt>s3</tt> (Amazon AWS), <tt>cloudfiles</tt> (Rackspace Cloud), <tt>azureblob</tt> (Microsoft Azure), and <tt>atmos</tt> (Atmos).</li>
+ * <li><tt>maxConnections</tt> - The maximum number of concurrent connections to make to the cloud provider.  Defaults to 10.</li>
+ * <li><tt>secure</tt> - Whether to use secure (SSL) connections or not.  Defaults to <tt>true</tt>.</li>
+ * </ul>
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class CloudCacheStoreConfig extends LockSupportCacheStoreConfig {
+   private String identity;
+   private String password;
+   private String bucketPrefix;
+   private String proxyHost;
+   private String proxyPort;
+   private long requestTimeout = 10000;
+   private boolean lazyPurgingOnly = true;
+   private String cloudService;
+   private int maxConnections = 10000;
+   private boolean secure = true;
+
+   public CloudCacheStoreConfig() {
+      setCacheLoaderClassName(CloudCacheStore.class.getName());
+   }
+
+   public String getBucketPrefix() {
+      return bucketPrefix;
+   }
+
+   public void setBucketPrefix(String bucketPrefix) {
+      this.bucketPrefix = bucketPrefix;
+   }
+
+   public String getCloudService() {
+      return cloudService;
+   }
+
+   public void setCloudService(String cloudService) {
+      this.cloudService = cloudService;
+   }
+
+   public String getIdentity() {
+      return identity;
+   }
+
+   public void setIdentity(String identity) {
+      this.identity = identity;
+   }
+
+   public boolean isLazyPurgingOnly() {
+      return lazyPurgingOnly;
+   }
+
+   public void setLazyPurgingOnly(boolean lazyPurgingOnly) {
+      this.lazyPurgingOnly = lazyPurgingOnly;
+   }
+
+   public int getMaxConnections() {
+      return maxConnections;
+   }
+
+   public void setMaxConnections(int maxConnections) {
+      this.maxConnections = maxConnections;
+   }
+
+   public String getPassword() {
+      return password;
+   }
+
+   public void setPassword(String password) {
+      this.password = password;
+   }
+
+   public String getProxyHost() {
+      return proxyHost;
+   }
+
+   public void setProxyHost(String proxyHost) {
+      this.proxyHost = proxyHost;
+   }
+
+   public String getProxyPort() {
+      return proxyPort;
+   }
+
+   public void setProxyPort(String proxyPort) {
+      this.proxyPort = proxyPort;
+   }
+
+   public long getRequestTimeout() {
+      return requestTimeout;
+   }
+
+   public void setRequestTimeout(long requestTimeout) {
+      this.requestTimeout = requestTimeout;
+   }
+
+   public boolean isSecure() {
+      return secure;
+   }
+
+   public void setSecure(boolean secure) {
+      this.secure = secure;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      if (!super.equals(o)) return false;
+
+      CloudCacheStoreConfig that = (CloudCacheStoreConfig) o;
+
+      if (lazyPurgingOnly != that.lazyPurgingOnly) return false;
+      if (maxConnections != that.maxConnections) return false;
+      if (requestTimeout != that.requestTimeout) return false;
+      if (secure != that.secure) return false;
+      if (bucketPrefix != null ? !bucketPrefix.equals(that.bucketPrefix) : that.bucketPrefix != null) return false;
+      if (cloudService != null ? !cloudService.equals(that.cloudService) : that.cloudService != null) return false;
+      if (identity != null ? !identity.equals(that.identity) : that.identity != null) return false;
+      if (password != null ? !password.equals(that.password) : that.password != null) return false;
+      if (proxyHost != null ? !proxyHost.equals(that.proxyHost) : that.proxyHost != null) return false;
+      if (proxyPort != null ? !proxyPort.equals(that.proxyPort) : that.proxyPort != null) return false;
+
+      return true;
+   }
+
+   @Override
+   public int hashCode() {
+      int result = super.hashCode();
+      result = 31 * result + (identity != null ? identity.hashCode() : 0);
+      result = 31 * result + (password != null ? password.hashCode() : 0);
+      result = 31 * result + (bucketPrefix != null ? bucketPrefix.hashCode() : 0);
+      result = 31 * result + (proxyHost != null ? proxyHost.hashCode() : 0);
+      result = 31 * result + (proxyPort != null ? proxyPort.hashCode() : 0);
+      result = 31 * result + (int) (requestTimeout ^ (requestTimeout >>> 32));
+      result = 31 * result + (lazyPurgingOnly ? 1 : 0);
+      result = 31 * result + (cloudService != null ? cloudService.hashCode() : 0);
+      result = 31 * result + maxConnections;
+      result = 31 * result + (secure ? 1 : 0);
+      return result;
+   }
+
+   @Override
+   public String toString() {
+      return "CloudCacheStoreConfig{" +
+            "bucketPrefix='" + bucketPrefix + '\'' +
+            ", identity='" + identity + '\'' +
+            ", password=(hidden)" +
+            ", proxyHost='" + proxyHost + '\'' +
+            ", proxyPort='" + proxyPort + '\'' +
+            ", requestTimeout=" + requestTimeout +
+            ", lazyPurgingOnly=" + lazyPurgingOnly +
+            ", cloudService='" + cloudService + '\'' +
+            ", maxConnections=" + maxConnections +
+            ", secure=" + secure +
+            '}';
+   }
+}
\ No newline at end of file


Property changes on: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudConnectionException.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudConnectionException.java	                        (rev 0)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudConnectionException.java	2010-01-18 11:49:39 UTC (rev 1385)
@@ -0,0 +1,20 @@
+package org.infinispan.loaders.cloud;
+
+import org.infinispan.loaders.CacheLoaderException;
+
+public class CloudConnectionException extends CacheLoaderException {
+   public CloudConnectionException() {
+   }
+
+   public CloudConnectionException(Throwable cause) {
+      super(cause);
+   }
+
+   public CloudConnectionException(String message) {
+      super(message);
+   }
+
+   public CloudConnectionException(String message, Throwable cause) {
+      super(message, cause);
+   }
+}


Property changes on: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudConnectionException.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF



More information about the infinispan-commits mailing list