[infinispan-commits] Infinispan SVN: r1351 - in trunk: cachestore/cloud and 8 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Jan 7 14:20:20 EST 2010
Author: manik.surtani at jboss.com
Date: 2010-01-07 14:20:20 -0500 (Thu, 07 Jan 2010)
New Revision: 1351
Added:
trunk/cachestore/cloud/src/main/scala/
trunk/cachestore/cloud/src/main/scala/org/
trunk/cachestore/cloud/src/main/scala/org/infinispan/
trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/
trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/cloud/
trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/cloud/CloudCacheStore.scala
Removed:
trunk/cachestore/cloud/src/main/java/
Modified:
trunk/cachestore/cloud/pom.xml
trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreFunctionalIntegrationTest.java
trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java
trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java
trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java
trunk/pom.xml
Log:
CloudCacheStore reenabled
CloudCacheStore ported to Scala
Modified: trunk/cachestore/cloud/pom.xml
===================================================================
--- trunk/cachestore/cloud/pom.xml 2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/cachestore/cloud/pom.xml 2010-01-07 19:20:20 UTC (rev 1351)
@@ -20,7 +20,18 @@
</properties>
<dependencies>
+ <!-- this module is written in Scala rather than Java -->
<dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>javautils</artifactId>
+ <version>2.7.4-0.1</version>
+ </dependency>
+ <dependency>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-blobstore</artifactId>
<version>${version.jclouds}</version>
@@ -84,9 +95,39 @@
</snapshots>
</repository>
</repositories>
+
<build>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+
<plugins>
<plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>test-compile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.4.3</version>
Added: trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/cloud/CloudCacheStore.scala
===================================================================
--- trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/cloud/CloudCacheStore.scala (rev 0)
+++ trunk/cachestore/cloud/src/main/scala/org/infinispan/loaders/cloud/CloudCacheStore.scala 2010-01-07 19:20:20 UTC (rev 1351)
@@ -0,0 +1,283 @@
+package org.infinispan.loaders.cloud
+
+import org.infinispan.util.logging.LogFactory
+import org.infinispan.Cache
+import org.infinispan.marshall.Marshaller
+import org.infinispan.config.ConfigurationException
+import org.infinispan.util.Util
+import org.jclouds.http.HttpUtils
+import java.util.{HashSet, Properties}
+import org.infinispan.container.entries.InternalCacheEntry
+import org.infinispan.loaders.bucket.{Bucket, BucketBasedCacheStore}
+import org.infinispan.loaders.modifications.Modification
+import java.util.concurrent.{ExecutionException, Future}
+import org.jclouds.blobstore.domain.Blob
+import java.io.{IOException, ObjectOutput, ObjectInput}
+import scala.collection._
+import reflect.BeanProperty
+import org.infinispan.loaders.{LockSupportCacheStoreConfig, CacheLoaderConfig, CacheLoaderException}
+import org.jclouds.blobstore._
+import org.scala_tools.javautils.Imports._
+
+/**
+ * The CloudCacheStore implementation that utilizes <a href="http://code.google.com/p/jclouds">JClouds</a> to communicate
+ * with cloud storage providers such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>,
+ * <a href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or any other such
+ * provider supported by JClouds.
+ * <p />
+ * This file store stores stuff in the following format:
+ * <tt>http:// { cloud-storage-provider } / { bucket } / { bucket_number }.bucket</tt>
+ * <p />
+ * @author Manik Surtani
+ * @since 4.0
+ */
+
+class CloudCacheStore extends BucketBasedCacheStore {
+ val log = LogFactory.getLog(classOf[CloudCacheStore])
+ val asyncCommandFutures = new ThreadLocal[mutable.Set[Future[Any]]]()
+ var cfg: CloudCacheStoreConfig = null
+ var containerName: String = null
+ var ctx: BlobStoreContext[Any, Any] = null
+ var blobStore: BlobStore = null
+ var asyncBlobStore: AsyncBlobStore = null
+ var pollFutures = false
+
+ override def getConfigurationClass() = classOf[CloudCacheStoreConfig]
+
+ def getThisContainerName() = cfg.bucketPrefix + "-" + cache.getName.toLowerCase
+
+ override def supportsMultiThreadedPurge() = true
+
+ override def init(cfg: CacheLoaderConfig, cache: Cache[_, _], m: Marshaller) {
+ this.cfg = cfg.asInstanceOf[CloudCacheStoreConfig]
+ init(cfg, cache, m, null, null, null)
+ }
+
+ def init(cfg: CacheLoaderConfig, cache: Cache[_, _], m: Marshaller, ctx: BlobStoreContext[Any, Any], blobStore: BlobStore, asyncBlobStore: AsyncBlobStore) {
+ super.init(cfg, cache, m)
+ this.cfg = cfg.asInstanceOf[CloudCacheStoreConfig]
+ this.cache = cache
+ this.marshaller = m
+ this.ctx = ctx
+ this.blobStore = blobStore
+ this.asyncBlobStore = asyncBlobStore
+ }
+
+ override def start() {
+ super.start
+ if (cfg.cloudService == null) throw new ConfigurationException("CloudService must be set!")
+ if (cfg.identity == null) throw new ConfigurationException("Identity must be set")
+ if (cfg.password == null) throw new ConfigurationException("Password must be set")
+ if (cfg.bucketPrefix == null) throw new ConfigurationException("CloudBucket must be set")
+ containerName = getThisContainerName
+ ctx = createBlobStoreContext(cfg)
+ val bs = ctx.getBlobStore
+ if (!bs.containerExists(containerName)) bs.createContainer(containerName)
+ blobStore = ctx.getBlobStore
+ asyncBlobStore = ctx.getAsyncBlobStore
+ pollFutures = !(cfg.getAsyncStoreConfig.isEnabled.booleanValue)
+ }
+
+ def createBlobStoreContext(cfg: CloudCacheStoreConfig) = {
+ val properties = new Properties()
+ val is = Util loadResourceAsStream "jclouds.properties"
+ if (is != null) {
+ try {
+ properties load is
+ } catch {
+ case e => log.error("Unable to load contents from jclouds.properties", e)
+ }
+ }
+ val factory = new BlobStoreContextFactory(properties)
+
+ // Need a URI in blobstore://account:key@service/container/path
+ // TODO find a better way to create this context! Unnecessary construction of a URI only for it to be broken up again into components from within JClouds
+ factory.createContext(HttpUtils.createUri("blobstore://" + cfg.identity +
+ ":" + cfg.password +
+ "@" + cfg.cloudService + "/")).asInstanceOf[BlobStoreContext[Any, Any]]
+ }
+
+ def loadAllLockSafe() = {
+ val result = new HashSet[InternalCacheEntry]()
+ // TODO change this to use a blobStore
+ val values = ctx.createBlobMap(containerName).values
+ for (blob <- values) {
+ val bucket = readFromBlob(blob)
+ if (bucket.removeExpiredEntries) updateBucket(bucket)
+ result addAll bucket.getStoredEntries
+ }
+ result
+ }
+
+ def fromStreamLockSafe(objectInput: ObjectInput) {
+ var source: String = null
+ try {
+ source = objectInput.readObject.asInstanceOf[String]
+ } catch {
+ case e => throw convertToCacheLoaderException("Error while reading from stream", e)
+ }
+ if (getThisContainerName equals source) {
+ log.info("Attempt to load the same cloud bucket ({0}) ignored", source)
+ } else {
+ // TODO implement stream handling. What's the JClouds API to "copy" one bucket to another?
+ }
+ }
+
+ def toStreamLockSafe(objectOutput: ObjectOutput) {
+ try {
+ objectOutput writeObject getThisContainerName
+ } catch {
+ case e => throw convertToCacheLoaderException("Error while writing to stream", e)
+ }
+ }
+
+ def clearLockSafe() {
+ val futures: mutable.Set[Future[Any]] = asyncCommandFutures.get
+ if (futures == null) {
+ // is a sync call
+ blobStore clearContainer containerName
+ } else {
+ // is an async call - invoke clear() on the container asynchronously and store the future in the 'futures' collection
+ futures += (asyncBlobStore clearContainer containerName).asInstanceOf[Future[Any]]
+ }
+ }
+
+ def convertToCacheLoaderException(m: String, c: Throwable) = {
+ if (c.isInstanceOf[CacheLoaderException]) {
+ c.asInstanceOf[CacheLoaderException]
+ } else {
+ new CacheLoaderException(m, c)
+ }
+ }
+
+ def loadBucket(hash: String) = readFromBlob(blobStore.getBlob(containerName, hash))
+
+ def purge(blobMap: BlobMap) {
+ for (blob <- blobMap.values) {
+ val bucket = readFromBlob(blob)
+ if (bucket.removeExpiredEntries) updateBucket(bucket)
+ }
+ }
+
+ def purgeInternal() {
+ // TODO can expiry data be stored in a blob's metadata? More efficient purging that way.
+ if (!cfg.lazyPurgingOnly) {
+ acquireGlobalLock(false)
+ try {
+ val blobMap = ctx createBlobMap containerName
+ if (multiThreadedPurge) {
+ purgerService.execute(new Runnable() {
+ def run() {
+ try {
+ purge(blobMap)
+ } catch {
+ case e => log.warn("Problems purging", e)
+ }
+ }
+ })
+ } else {
+ purge(blobMap)
+ }
+ } finally {
+ releaseGlobalLock(false)
+ }
+ }
+ }
+
+ def insertBucket(bucket: Bucket) {
+ val blob = blobStore.newBlob(getBucketName(bucket))
+ writeToBlob(blob, bucket)
+
+ val futures: mutable.Set[Future[Any]] = asyncCommandFutures.get();
+ if (futures == null) {
+ // is a sync call
+ blobStore.putBlob(containerName, blob)
+ } else {
+ // is an async call - invoke clear() on the container asynchronously and store the future in the 'futures' collection
+ futures += (asyncBlobStore.putBlob(containerName, blob)).asInstanceOf[Future[Any]]
+ }
+ }
+
+ override def applyModifications(mods: java.util.List[_ <: Modification]) {
+ val futures = mutable.Set[Future[Any]]()
+ asyncCommandFutures set futures
+ try {
+ super.applyModifications(mods)
+ if (pollFutures) {
+ var exception: CacheLoaderException = null
+ try {
+ for (f <- asyncCommandFutures.get) f.get
+ } catch {
+ case ie: InterruptedException => Thread.currentThread.interrupt
+ case ee: ExecutionException => {
+ if (ee.getCause.isInstanceOf[CacheLoaderException])
+ exception = ee.getCause.asInstanceOf[CacheLoaderException]
+ else
+ exception = new CacheLoaderException(ee.getCause)
+ }
+ }
+ if (exception != null) throw exception
+ }
+ } finally {
+ asyncCommandFutures.remove
+ }
+ }
+
+ def updateBucket(bucket: Bucket) {insertBucket(bucket)}
+
+ def writeToBlob(blob: Blob, bucket: Bucket) {
+ try {
+ blob setPayload marshaller.objectToByteBuffer(bucket)
+ } catch {
+ case e: IOException => throw new CacheLoaderException(e)
+ }
+ }
+
+ def readFromBlob(blob: Blob): Bucket = {
+ if (blob == null) return null
+ try {
+ return marshaller.objectFromInputStream(blob.getContent).asInstanceOf[Bucket]
+ } catch {
+ case e => throw new CacheLoaderException(e)
+ }
+ }
+
+ def getBucketName(bucket: Bucket) = {
+ val bucketName = bucket.getBucketName
+ if (bucketName startsWith "-")
+ bucketName replace ("-", "A")
+ else
+ bucketName
+ }
+}
+
+/**
+ * The cache store config bean for this cache store implementation
+ * @author Manik Surtani
+ * @since 4.0
+ */
+// much more concise and expressive than a Java counterpart!
+class CloudCacheStoreConfig(
+ @BeanProperty var identity: String,
+ @BeanProperty var password: String,
+ @BeanProperty var bucketPrefix: String,
+ @BeanProperty var proxyHost: String,
+ @BeanProperty var proxyPort: String,
+ @BeanProperty var requestTimeout: Long,
+ @BeanProperty var lazyPurgingOnly: Boolean,
+ @BeanProperty var cloudService: String,
+ @BeanProperty var maxConnections: Int,
+ @BeanProperty var secure: Boolean
+ ) extends LockSupportCacheStoreConfig {
+ setCacheLoaderClassName(classOf[CloudCacheStore].getName)
+
+ def this() = this (null, null, null, null, null, 10000, true, null, 3, true)
+}
+
+class CloudConnectionException(m: String, c: Throwable) extends CacheLoaderException(m, c) {
+ def this() = this ("", null)
+
+ def this(m: String) = this (m, null)
+
+ def this(c: Throwable) = this ("", c)
+}
\ No newline at end of file
Modified: trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreFunctionalIntegrationTest.java
===================================================================
--- trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreFunctionalIntegrationTest.java 2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreFunctionalIntegrationTest.java 2010-01-07 19:20:20 UTC (rev 1351)
@@ -32,7 +32,7 @@
public class CloudCacheStoreFunctionalIntegrationTest extends BaseCacheStoreFunctionalTest {
private String proxyHost;
- private int proxyPort = -1;
+ private String proxyPort = "-1";
private int maxConnections = 20;
private boolean isSecure = false;
private String csBucket;
@@ -40,10 +40,20 @@
private String secretKey;
private String cs;
- private static final String sysUsername = System.getProperty("infinispan.jclouds.username");
- private static final String sysPassword = System.getProperty("infinispan.jclouds.password");
- private static final String sysService = System.getProperty("infinispan.jclouds.service");
+// private static final String sysUsername = System.getProperty("infinispan.jclouds.username");
+// private static final String sysPassword = System.getProperty("infinispan.jclouds.password");
+// private static final String sysService = System.getProperty("infinispan.jclouds.service");
+ private static final String sysUsername = "manik";
+ private static final String sysPassword = "7391796c16e20c7b9e3eddbf997bc972";
+ private static final String sysService = "cloudfiles";
+
+
+// private static final String sysUsername = "AKIAJOFQDJPJL7K4H2RQ";
+// private static final String sysPassword = "rfOop3ZKoaUVxUK9zCT7WOXsSL7R2sRtbmJtmHNS";
+// private static final String sysService = "s3";
+
+
@BeforeTest
@Parameters({"infinispan.jclouds.username", "infinispan.jclouds.password", "infinispan.jclouds.service"})
protected void setUpClient(@Optional String JcloudsUsername,
Modified: trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java
===================================================================
--- trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java 2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/cachestore/cloud/src/test/java/org/infinispan/loaders/cloud/CloudCacheStoreIntegrationTest.java 2010-01-07 19:20:20 UTC (rev 1351)
@@ -34,7 +34,7 @@
public class CloudCacheStoreIntegrationTest extends BaseCacheStoreTest {
private String proxyHost;
- private int proxyPort = -1;
+ private String proxyPort = "-1";
private int maxConnections = 20;
private boolean isSecure = false;
private String csBucket;
Modified: trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java 2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/core/src/main/java/org/infinispan/loaders/AbstractCacheStore.java 2010-01-07 19:20:20 UTC (rev 1351)
@@ -38,7 +38,7 @@
private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(0);
protected boolean multiThreadedPurge = false;
- public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException{
+ public void init(CacheLoaderConfig config, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException{
this.config = (AbstractCacheStoreConfig) config;
this.marshaller = m;
if (config == null) throw new IllegalStateException("Null config!!!");
Modified: trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java 2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java 2010-01-07 19:20:20 UTC (rev 1351)
@@ -23,7 +23,7 @@
* selecting where refer to state in storage, for example, a different database table name.
* @param m marshaller to use when loading state from a stream, if supported by the implementation.
*/
- void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException;
+ void init(CacheLoaderConfig config, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException;
/**
* Loads an entry mapped to by a given key. Should return null if the entry does not exist. Expired entries are not
Modified: trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java 2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java 2010-01-07 19:20:20 UTC (rev 1351)
@@ -34,7 +34,7 @@
private long globalLockTimeoutMillis;
private LockSupportCacheStoreConfig config;
- public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException {
+ public void init(CacheLoaderConfig config, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException {
super.init(config, cache, m);
this.config = (LockSupportCacheStoreConfig) config;
}
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2010-01-07 19:19:23 UTC (rev 1350)
+++ trunk/pom.xml 2010-01-07 19:20:20 UTC (rev 1351)
@@ -26,7 +26,7 @@
<module>lucene-directory</module>
<module>cachestore</module>
<module>cachestore/bdbje</module>
-<!-- <module>cachestore/cloud</module> -->
+ <module>cachestore/cloud</module>
<module>cachestore/jdbc</module>
<module>cachestore/jdbm</module>
<module>server/rest</module>
More information about the infinispan-commits
mailing list