[infinispan-commits] Infinispan SVN: r1351 - in trunk: cachestore/cloud and 8 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Jan 7 14:20:20 EST 2010


Author: manik.surtani at jboss.com
Date: 2010-01-07 14:20:20 -0500 (Thu, 07 Jan 2010)
New Revision: 1351

Added:
   trunk/cachestore/cloud/src/main/scala/
   trunk/cachestore/cloud/src/main/scala/org/
   trunk/cachestore/cloud/src/main/scala/org/infinispan/
   trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/
   trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/cloud/
   trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/cloud/CloudCacheStore.scala
Removed:
   trunk/cachestore/cloud/src/main/java/
Modified:
   trunk/cachestore/cloud/pom.xml
   trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreFunctionalIntegrationTest.java
   trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java
   trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
   trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java
   trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java
   trunk/pom.xml
Log:
CloudCacheStore reenabled
CloudCacheStore ported to Scala

Modified: trunk/cachestore/cloud/pom.xml
===================================================================
--- trunk/cachestore/cloud/pom.xml	2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/cachestore/cloud/pom.xml	2010-01-07 19:20:20 UTC (rev 1351)
@@ -20,7 +20,18 @@
    </properties>
 
    <dependencies>
+      <!-- this module is written in Scala rather than Java -->
       <dependency>
+         <groupId>org.scala-lang</groupId>
+         <artifactId>scala-library</artifactId>
+         <version>${scala.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.scala-tools</groupId>
+          <artifactId>javautils</artifactId>
+          <version>2.7.4-0.1</version>
+        </dependency>
+      <dependency>
          <groupId>org.jclouds</groupId>
          <artifactId>jclouds-blobstore</artifactId>
          <version>${version.jclouds}</version>
@@ -84,9 +95,39 @@
          </snapshots>
       </repository>
    </repositories>
+
    <build>
+      <sourceDirectory>src/main/scala</sourceDirectory>
+      <testSourceDirectory>src/test/scala</testSourceDirectory>
+
       <plugins>
          <plugin>
+            <groupId>org.scala-tools</groupId>
+            <artifactId>maven-scala-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>compile</id>
+                  <goals>
+                     <goal>compile</goal>
+                  </goals>
+                  <phase>compile</phase>
+               </execution>
+               <execution>
+                  <id>test-compile</id>
+                  <goals>
+                     <goal>testCompile</goal>
+                  </goals>
+                  <phase>test-compile</phase>
+               </execution>
+               <execution>
+                  <phase>process-resources</phase>
+                  <goals>
+                     <goal>compile</goal>
+                  </goals>
+               </execution>
+            </executions>
+         </plugin>
+         <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-surefire-plugin</artifactId>
             <version>2.4.3</version>

Added: trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/cloud/CloudCacheStore.scala
===================================================================
--- trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/cloud/CloudCacheStore.scala	                        (rev 0)
+++ trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/cloud/CloudCacheStore.scala	2010-01-07 19:20:20 UTC (rev 1351)
@@ -0,0 +1,283 @@
+package org.infinispan.loaders.cloud
+
+import org.infinispan.util.logging.LogFactory
+import org.infinispan.Cache
+import org.infinispan.marshall.Marshaller
+import org.infinispan.config.ConfigurationException
+import org.infinispan.util.Util
+import org.jclouds.http.HttpUtils
+import java.util.{HashSet, Properties}
+import org.infinispan.container.entries.InternalCacheEntry
+import org.infinispan.loaders.bucket.{Bucket, BucketBasedCacheStore}
+import org.infinispan.loaders.modifications.Modification
+import java.util.concurrent.{ExecutionException, Future}
+import org.jclouds.blobstore.domain.Blob
+import java.io.{IOException, ObjectOutput, ObjectInput}
+import scala.collection._
+import reflect.BeanProperty
+import org.infinispan.loaders.{LockSupportCacheStoreConfig, CacheLoaderConfig, CacheLoaderException}
+import org.jclouds.blobstore._
+import org.scala_tools.javautils.Imports._
+
+/**
+ * The CloudCacheStore implementation that utilizes <a href="http://code.google.com/p/jclouds">JClouds</a> to communicate
+ * with cloud storage providers such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>,
+ * <a href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or any other such
+ * provider supported by JClouds.
+ * <p />
+ * This file store stores stuff in the following format:
+ * <tt>http://    { cloud-storage-provider } /    { bucket } /    { bucket_number }.bucket</tt>
+ * <p /> 
+ * @author Manik Surtani
+ * @since 4.0
+ */
+
+class CloudCacheStore extends BucketBasedCacheStore {
+   val log = LogFactory.getLog(classOf[CloudCacheStore])
+   val asyncCommandFutures = new ThreadLocal[mutable.Set[Future[Any]]]()
+   var cfg: CloudCacheStoreConfig = null
+   var containerName: String = null
+   var ctx: BlobStoreContext[Any, Any] = null
+   var blobStore: BlobStore = null
+   var asyncBlobStore: AsyncBlobStore = null
+   var pollFutures = false
+
+   override def getConfigurationClass() = classOf[CloudCacheStoreConfig]
+
+   def getThisContainerName() = cfg.bucketPrefix + "-" + cache.getName.toLowerCase
+
+   override def supportsMultiThreadedPurge() = true
+
+   override def init(cfg: CacheLoaderConfig, cache: Cache[_, _], m: Marshaller) {
+      this.cfg = cfg.asInstanceOf[CloudCacheStoreConfig]
+      init(cfg, cache, m, null, null, null)
+   }
+
+   def init(cfg: CacheLoaderConfig, cache: Cache[_, _], m: Marshaller, ctx: BlobStoreContext[Any, Any], blobStore: BlobStore, asyncBlobStore: AsyncBlobStore) {
+      super.init(cfg, cache, m)
+      this.cfg = cfg.asInstanceOf[CloudCacheStoreConfig]
+      this.cache = cache
+      this.marshaller = m
+      this.ctx = ctx
+      this.blobStore = blobStore
+      this.asyncBlobStore = asyncBlobStore
+   }
+
+   override def start() {
+      super.start
+      if (cfg.cloudService == null) throw new ConfigurationException("CloudService must be set!")
+      if (cfg.identity == null) throw new ConfigurationException("Identity must be set")
+      if (cfg.password == null) throw new ConfigurationException("Password must be set")
+      if (cfg.bucketPrefix == null) throw new ConfigurationException("CloudBucket must be set")
+      containerName = getThisContainerName
+      ctx = createBlobStoreContext(cfg)
+      val bs = ctx.getBlobStore
+      if (!bs.containerExists(containerName)) bs.createContainer(containerName)
+      blobStore = ctx.getBlobStore
+      asyncBlobStore = ctx.getAsyncBlobStore
+      pollFutures = !(cfg.getAsyncStoreConfig.isEnabled.booleanValue)
+   }
+
+   def createBlobStoreContext(cfg: CloudCacheStoreConfig) = {
+      val properties = new Properties()
+      val is = Util loadResourceAsStream "jclouds.properties"
+      if (is != null) {
+         try {
+            properties load is
+         } catch {
+            case e => log.error("Unable to load contents from jclouds.properties", e)
+         }
+      }
+      val factory = new BlobStoreContextFactory(properties)
+
+      // Need a URI in blobstore://account:key@service/container/path
+      // TODO find a better way to create this context!  Unnecessary construction of a URI only for it to be broken up again into components from within JClouds
+      factory.createContext(HttpUtils.createUri("blobstore://" + cfg.identity +
+            ":" + cfg.password +
+            "@" + cfg.cloudService + "/")).asInstanceOf[BlobStoreContext[Any, Any]]
+   }
+
+   def loadAllLockSafe() = {
+      val result = new HashSet[InternalCacheEntry]()
+      // TODO change this to use a blobStore
+      val values = ctx.createBlobMap(containerName).values
+      for (blob <- values) {
+         val bucket = readFromBlob(blob)
+         if (bucket.removeExpiredEntries) updateBucket(bucket)
+         result addAll bucket.getStoredEntries
+      }
+      result
+   }
+
+   def fromStreamLockSafe(objectInput: ObjectInput) {
+      var source: String = null
+      try {
+         source = objectInput.readObject.asInstanceOf[String]
+      } catch {
+         case e => throw convertToCacheLoaderException("Error while reading from stream", e)
+      }
+      if (getThisContainerName equals source) {
+         log.info("Attempt to load the same cloud bucket ({0}) ignored", source)
+      } else {
+         // TODO implement stream handling.   What's the JClouds API to "copy" one bucket to another?
+      }
+   }
+
+   def toStreamLockSafe(objectOutput: ObjectOutput) {
+      try {
+         objectOutput writeObject getThisContainerName
+      } catch {
+         case e => throw convertToCacheLoaderException("Error while writing to stream", e)
+      }
+   }
+
+   def clearLockSafe() {
+      val futures: mutable.Set[Future[Any]] = asyncCommandFutures.get
+      if (futures == null) {
+         // is a sync call
+         blobStore clearContainer containerName
+      } else {
+         // is an async call - invoke clear() on the container asynchronously and store the future in the 'futures' collection
+         futures += (asyncBlobStore clearContainer containerName).asInstanceOf[Future[Any]]
+      }
+   }
+
+   def convertToCacheLoaderException(m: String, c: Throwable) = {
+      if (c.isInstanceOf[CacheLoaderException]) {
+         c.asInstanceOf[CacheLoaderException]
+      } else {
+         new CacheLoaderException(m, c)
+      }
+   }
+
+   def loadBucket(hash: String) = readFromBlob(blobStore.getBlob(containerName, hash))
+
+   def purge(blobMap: BlobMap) {
+      for (blob <- blobMap.values) {
+         val bucket = readFromBlob(blob)
+         if (bucket.removeExpiredEntries) updateBucket(bucket)
+      }
+   }
+
+   def purgeInternal() {
+      // TODO can expiry data be stored in a blob's metadata?  More efficient purging that way.
+      if (!cfg.lazyPurgingOnly) {
+         acquireGlobalLock(false)
+         try {
+            val blobMap = ctx createBlobMap containerName
+            if (multiThreadedPurge) {
+               purgerService.execute(new Runnable() {
+                  def run() {
+                     try {
+                        purge(blobMap)
+                     } catch {
+                        case e => log.warn("Problems purging", e)
+                     }
+                  }
+               })
+            } else {
+               purge(blobMap)
+            }
+         } finally {
+            releaseGlobalLock(false)
+         }
+      }
+   }
+
+   def insertBucket(bucket: Bucket) {
+      val blob = blobStore.newBlob(getBucketName(bucket))
+      writeToBlob(blob, bucket)
+
+      val futures: mutable.Set[Future[Any]] = asyncCommandFutures.get();
+      if (futures == null) {
+         // is a sync call
+         blobStore.putBlob(containerName, blob)
+      } else {
+         // is an async call - invoke clear() on the container asynchronously and store the future in the 'futures' collection
+         futures += (asyncBlobStore.putBlob(containerName, blob)).asInstanceOf[Future[Any]]
+      }
+   }
+
+   override def applyModifications(mods: java.util.List[_ <: Modification]) {
+      val futures = mutable.Set[Future[Any]]()
+      asyncCommandFutures set futures
+      try {
+         super.applyModifications(mods)
+         if (pollFutures) {
+            var exception: CacheLoaderException = null
+            try {
+               for (f <- asyncCommandFutures.get) f.get
+            } catch {
+               case ie: InterruptedException => Thread.currentThread.interrupt
+               case ee: ExecutionException => {
+                  if (ee.getCause.isInstanceOf[CacheLoaderException])
+                     exception = ee.getCause.asInstanceOf[CacheLoaderException]
+                  else
+                     exception = new CacheLoaderException(ee.getCause)
+               }
+            }
+            if (exception != null) throw exception
+         }
+      } finally {
+         asyncCommandFutures.remove
+      }
+   }
+
+   def updateBucket(bucket: Bucket) {insertBucket(bucket)}
+
+   def writeToBlob(blob: Blob, bucket: Bucket) {
+      try {
+         blob setPayload marshaller.objectToByteBuffer(bucket)
+      } catch {
+         case e: IOException => throw new CacheLoaderException(e)
+      }
+   }
+
+   def readFromBlob(blob: Blob): Bucket = {
+      if (blob == null) return null
+      try {
+         return marshaller.objectFromInputStream(blob.getContent).asInstanceOf[Bucket]
+      } catch {
+         case e => throw new CacheLoaderException(e)
+      }
+   }
+
+   def getBucketName(bucket: Bucket) = {
+      val bucketName = bucket.getBucketName
+      if (bucketName startsWith "-")
+         bucketName replace ("-", "A")
+      else
+         bucketName
+   }
+}
+
+/**
+ * The cache store config bean for this cache store implementation
+ * @author Manik Surtani
+ * @since 4.0
+ */
+// much more concise and expressive than a Java counterpart!
+class CloudCacheStoreConfig(
+      @BeanProperty var identity: String,
+      @BeanProperty var password: String,
+      @BeanProperty var bucketPrefix: String,
+      @BeanProperty var proxyHost: String,
+      @BeanProperty var proxyPort: String,
+      @BeanProperty var requestTimeout: Long,
+      @BeanProperty var lazyPurgingOnly: Boolean,
+      @BeanProperty var cloudService: String,
+      @BeanProperty var maxConnections: Int,
+      @BeanProperty var secure: Boolean
+      ) extends LockSupportCacheStoreConfig {
+   setCacheLoaderClassName(classOf[CloudCacheStore].getName)
+
+   def this() = this (null, null, null, null, null, 10000, true, null, 3, true)
+}
+
+class CloudConnectionException(m: String, c: Throwable) extends CacheLoaderException(m, c) {
+   def this() = this ("", null)
+
+   def this(m: String) = this (m, null)
+
+   def this(c: Throwable) = this ("", c)
+}
\ No newline at end of file

Modified: trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreFunctionalIntegrationTest.java
===================================================================
--- trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreFunctionalIntegrationTest.java	2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreFunctionalIntegrationTest.java	2010-01-07 19:20:20 UTC (rev 1351)
@@ -32,7 +32,7 @@
 public class CloudCacheStoreFunctionalIntegrationTest extends BaseCacheStoreFunctionalTest {
 
    private String proxyHost;
-   private int proxyPort = -1;
+   private String proxyPort = "-1";
    private int maxConnections = 20;
    private boolean isSecure = false;
    private String csBucket;
@@ -40,10 +40,20 @@
    private String secretKey;
    private String cs;
 
-   private static final String sysUsername = System.getProperty("infinispan.jclouds.username");
-   private static final String sysPassword = System.getProperty("infinispan.jclouds.password");
-   private static final String sysService = System.getProperty("infinispan.jclouds.service");
+//   private static final String sysUsername = System.getProperty("infinispan.jclouds.username");
+//   private static final String sysPassword = System.getProperty("infinispan.jclouds.password");
+//   private static final String sysService = System.getProperty("infinispan.jclouds.service");
 
+   private static final String sysUsername = "manik";
+   private static final String sysPassword = "7391796c16e20c7b9e3eddbf997bc972";
+   private static final String sysService = "cloudfiles";
+
+
+//   private static final String sysUsername = "AKIAJOFQDJPJL7K4H2RQ";
+//   private static final String sysPassword = "rfOop3ZKoaUVxUK9zCT7WOXsSL7R2sRtbmJtmHNS";
+//   private static final String sysService = "s3";
+
+
    @BeforeTest
    @Parameters({"infinispan.jclouds.username", "infinispan.jclouds.password", "infinispan.jclouds.service"})
    protected void setUpClient(@Optional String JcloudsUsername,

Modified: trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java
===================================================================
--- trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java	2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java	2010-01-07 19:20:20 UTC (rev 1351)
@@ -34,7 +34,7 @@
 public class CloudCacheStoreIntegrationTest extends BaseCacheStoreTest {
 
    private String proxyHost;
-   private int proxyPort = -1;
+   private String proxyPort = "-1";
    private int maxConnections = 20;
    private boolean isSecure = false;
    private String csBucket;

Modified: trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java	2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java	2010-01-07 19:20:20 UTC (rev 1351)
@@ -38,7 +38,7 @@
    private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(0);
    protected boolean multiThreadedPurge = false;
 
-   public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException{
+   public void init(CacheLoaderConfig config, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException{
       this.config = (AbstractCacheStoreConfig) config;
       this.marshaller = m;
       if (config == null) throw new IllegalStateException("Null config!!!");

Modified: trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java	2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java	2010-01-07 19:20:20 UTC (rev 1351)
@@ -23,7 +23,7 @@
     *               selecting where refer to state in storage, for example, a different database table name.
     * @param m      marshaller to use when loading state from a stream, if supported by the implementation.
     */
-   void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException;
+   void init(CacheLoaderConfig config, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException;
 
    /**
     * Loads an entry mapped to by a given key.  Should return null if the entry does not exist.  Expired entries are not

Modified: trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java	2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java	2010-01-07 19:20:20 UTC (rev 1351)
@@ -34,7 +34,7 @@
    private long globalLockTimeoutMillis;
    private LockSupportCacheStoreConfig config;
 
-   public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException {
+   public void init(CacheLoaderConfig config, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException {
       super.init(config, cache, m);
       this.config = (LockSupportCacheStoreConfig) config;
    }

Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml	2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/pom.xml	2010-01-07 19:20:20 UTC (rev 1351)
@@ -26,7 +26,7 @@
       <module>lucene-directory</module>
       <module>cachestore</module>
       <module>cachestore/bdbje</module>
-<!--      <module>cachestore/cloud</module> -->
+      <module>cachestore/cloud</module>
       <module>cachestore/jdbc</module>
       <module>cachestore/jdbm</module>
       <module>server/rest</module>



More information about the infinispan-commits mailing list