[infinispan-commits] Infinispan SVN: r1385 - in trunk/cachestore/cloud: src/main and 5 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Jan 18 06:49:39 EST 2010
Author: manik.surtani at jboss.com
Date: 2010-01-18 06:49:39 -0500 (Mon, 18 Jan 2010)
New Revision: 1385
Added:
trunk/cachestore/cloud/src/main/java/
trunk/cachestore/cloud/src/main/java/org/
trunk/cachestore/cloud/src/main/java/org/infinispan/
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudConnectionException.java
Removed:
trunk/cachestore/cloud/src/main/scala/
Modified:
trunk/cachestore/cloud/pom.xml
Log:
Reverted implementation back to Java from Scala, due to poor Scaladoc -> Javadoc integration.
Modified: trunk/cachestore/cloud/pom.xml
===================================================================
--- trunk/cachestore/cloud/pom.xml 2010-01-16 17:35:57 UTC (rev 1384)
+++ trunk/cachestore/cloud/pom.xml 2010-01-18 11:49:39 UTC (rev 1385)
@@ -20,18 +20,7 @@
</properties>
<dependencies>
- <!-- this module is written in Scala rather than Java -->
<dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>${scala.version}</version>
- </dependency>
- <dependency>
- <groupId>org.scala-tools</groupId>
- <artifactId>javautils</artifactId>
- <version>2.7.4-0.1</version>
- </dependency>
- <dependency>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-blobstore</artifactId>
<version>${version.jclouds}</version>
@@ -87,36 +76,8 @@
</repositories>
<build>
- <sourceDirectory>src/main/scala</sourceDirectory>
-
<plugins>
<plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <executions>
- <execution>
- <id>compile</id>
- <goals>
- <goal>compile</goal>
- </goals>
- <phase>compile</phase>
- </execution>
- <execution>
- <id>test-compile</id>
- <goals>
- <goal>testCompile</goal>
- </goals>
- <phase>test-compile</phase>
- </execution>
- <execution>
- <phase>process-resources</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.4.3</version>
Added: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java (rev 0)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-01-18 11:49:39 UTC (rev 1385)
@@ -0,0 +1,275 @@
+package org.infinispan.loaders.cloud;
+
+import com.google.common.collect.ImmutableSet;
+import org.infinispan.Cache;
+import org.infinispan.config.ConfigurationException;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.loaders.CacheLoaderConfig;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheStoreConfig;
+import org.infinispan.loaders.bucket.Bucket;
+import org.infinispan.loaders.bucket.BucketBasedCacheStore;
+import org.infinispan.loaders.modifications.Modification;
+import org.infinispan.marshall.Marshaller;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.jclouds.blobstore.AsyncBlobStore;
+import org.jclouds.blobstore.BlobMap;
+import org.jclouds.blobstore.BlobStore;
+import org.jclouds.blobstore.BlobStoreContext;
+import org.jclouds.blobstore.BlobStoreContextFactory;
+import org.jclouds.blobstore.KeyNotFoundException;
+import org.jclouds.blobstore.domain.Blob;
+import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
+import org.jclouds.logging.log4j.config.Log4JLoggingModule;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+
+/**
+ * The CloudCacheStore implementation that utilizes <a href="http://code.google.com/p/jclouds">JClouds</a> to
+ * communicate with cloud storage providers such as <a href="http://aws.amazon.com/s3/">Amazon's S3<a>, <a
+ * href="http://www.rackspacecloud.com/cloud_hosting_products/files">Rackspace's Cloudfiles</a>, or any other such
+ * provider supported by JClouds.
+ * <p/>
+ * This file store stores stuff in the following format: <tt>http://{cloud-storage-provider}/{bucket}/{bucket_number}.bucket</tt>
+ * <p/>
+ *
+ * @author Manik Surtani
+ * @author Adrian Cole
+ * @since 4.0
+ */
+public class CloudCacheStore extends BucketBasedCacheStore {
+ private static final Log log = LogFactory.getLog(CloudCacheStore.class);
+ private final ThreadLocal<Set<Future<?>>> asyncCommandFutures = new ThreadLocal<Set<Future<?>>>();
+ private CloudCacheStoreConfig cfg;
+ private String containerName;
+ private BlobStoreContext ctx;
+ private BlobStore blobStore;
+ private AsyncBlobStore asyncBlobStore;
+ private boolean pollFutures = false;
+
+ @Override
+ public Class<? extends CacheStoreConfig> getConfigurationClass() {
+ return CloudCacheStoreConfig.class;
+ }
+
+ private String getThisContainerName() {
+ return cfg.getBucketPrefix() + "-" + cache.getName().toLowerCase();
+ }
+
+ @Override
+ protected boolean supportsMultiThreadedPurge() {
+ return true;
+ }
+
+ @Override
+ public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException {
+ this.cfg = (CloudCacheStoreConfig) cfg;
+ init(cfg, cache, m, null, null, null);
+ }
+
+ public void init(CacheLoaderConfig cfg, Cache<?, ?> cache, Marshaller m, BlobStoreContext<?, ?> ctx,
+ BlobStore blobStore, AsyncBlobStore asyncBlobStore) throws CacheLoaderException {
+ super.init(cfg, cache, m);
+ this.cfg = (CloudCacheStoreConfig) cfg;
+ this.cache = cache;
+ this.marshaller = m;
+ this.ctx = ctx;
+ this.blobStore = blobStore;
+ this.asyncBlobStore = asyncBlobStore;
+ }
+
+ @Override
+ public void start() throws CacheLoaderException {
+ super.start();
+ if (cfg.getCloudService() == null) throw new ConfigurationException("CloudService must be set!");
+ if (cfg.getIdentity() == null) throw new ConfigurationException("Identity must be set");
+ if (cfg.getPassword() == null) throw new ConfigurationException("Password must be set");
+ if (cfg.getBucketPrefix() == null) throw new ConfigurationException("CloudBucket must be set");
+ containerName = getThisContainerName();
+ try {
+ // add an executor as a constructor param to EnterpriseConfigurationModule, pass property overrides instead of Properties()
+ ctx = new BlobStoreContextFactory().createContext(cfg.getCloudService(), cfg.getIdentity(), cfg.getPassword(),
+ ImmutableSet.of(new EnterpriseConfigurationModule(), new Log4JLoggingModule()), new Properties());
+ blobStore = ctx.getBlobStore();
+ asyncBlobStore = ctx.getAsyncBlobStore();
+
+ // the "location" is not currently used.
+ if (!blobStore.containerExists(containerName)) blobStore.createContainerInLocation(null, containerName);
+ pollFutures = !cfg.getAsyncStoreConfig().isEnabled();
+ } catch (IOException ioe) {
+ throw new CacheLoaderException("Unable to create context", ioe);
+ }
+ }
+
+ protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
+ Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
+
+ for (Map.Entry<String, Blob> entry : ctx.createBlobMap(containerName).entrySet()) {
+ Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
+ if (bucket.removeExpiredEntries()) updateBucket(bucket);
+ result.addAll(bucket.getStoredEntries());
+ }
+ return result;
+ }
+
+ protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
+ String source = null;
+ try {
+ source = (String) objectInput.readObject();
+ } catch (Exception e) {
+ throw convertToCacheLoaderException("Error while reading from stream", e);
+ }
+ if (containerName.equals(source)) {
+ log.info("Attempt to load the same cloud bucket ({0}) ignored", source);
+ } else {
+ // TODO implement stream handling. What's the JClouds API to "copy" one bucket to another?
+ }
+ }
+
+ protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
+ try {
+ objectOutput.writeObject(containerName);
+ } catch (Exception e) {
+ throw convertToCacheLoaderException("Error while writing to stream", e);
+ }
+ }
+
+ protected void clearLockSafe() {
+ Set<Future<?>> futures = asyncCommandFutures.get();
+ if (futures == null) {
+ // is a sync call
+ blobStore.clearContainer(containerName);
+ } else {
+ // is an async call - invoke clear() on the container asynchronously and store the future in the 'futures' collection
+ futures.add(asyncBlobStore.clearContainer(containerName));
+ }
+ }
+
+ private CacheLoaderException convertToCacheLoaderException(String m, Throwable c) {
+ if (c instanceof CacheLoaderException) {
+ return (CacheLoaderException) c;
+ } else {
+ return new CacheLoaderException(m, c);
+ }
+ }
+
+ protected Bucket loadBucket(String hash) throws CacheLoaderException {
+ try {
+ return readFromBlob(blobStore.getBlob(containerName, hash), hash);
+ } catch (KeyNotFoundException e) {
+ return null;
+ }
+ }
+
+ private void purge(BlobMap blobMap) throws CacheLoaderException {
+ for (Map.Entry<String, Blob> entry : blobMap.entrySet()) {
+ Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
+ if (bucket.removeExpiredEntries()) updateBucket(bucket);
+ }
+ }
+
+ protected void purgeInternal() throws CacheLoaderException {
+ // TODO can expiry data be stored in a blob's metadata? More efficient purging that way. See https://jira.jboss.org/jira/browse/ISPN-334
+ if (!cfg.isLazyPurgingOnly()) {
+ acquireGlobalLock(false);
+ try {
+ final BlobMap blobMap = ctx.createBlobMap(containerName);
+ if (multiThreadedPurge) {
+ purgerService.execute(new Runnable() {
+ public void run() {
+ try {
+ purge(blobMap);
+ } catch (Exception e) {
+ log.warn("Problems purging", e);
+ }
+ }
+ });
+ } else {
+ purge(blobMap);
+ }
+ } finally {
+ releaseGlobalLock(false);
+ }
+ }
+ }
+
+ protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+ Blob blob = blobStore.newBlob(getBucketName(bucket));
+ writeToBlob(blob, bucket);
+
+ Set<Future<?>> futures = asyncCommandFutures.get();
+ if (futures == null) {
+ // is a sync call
+ blobStore.putBlob(containerName, blob);
+ } else {
+ // is an async call - invoke clear() on the container asynchronously and store the future in the 'futures' collection
+ futures.add(asyncBlobStore.putBlob(containerName, blob));
+ }
+ }
+
+ @Override
+ public void applyModifications(List<? extends Modification> modifications) throws CacheLoaderException {
+ Set<Future<?>> futures = new HashSet<Future<?>>();
+ asyncCommandFutures.set(futures);
+
+ try {
+ super.applyModifications(modifications);
+ if (pollFutures) {
+ CacheLoaderException exception = null;
+ try {
+ for (Future<?> f : asyncCommandFutures.get()) f.get();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException ee) {
+ exception = convertToCacheLoaderException("Caught exception in async process", ee.getCause());
+ }
+ if (exception != null) throw exception;
+ }
+ } finally {
+ asyncCommandFutures.remove();
+ }
+ }
+
+ protected void updateBucket(Bucket bucket) throws CacheLoaderException {
+ insertBucket(bucket);
+ }
+
+ private void writeToBlob(Blob blob, Bucket bucket) throws CacheLoaderException {
+ try {
+ blob.setPayload(
+ marshaller.objectToByteBuffer(bucket));
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ private Bucket readFromBlob(Blob blob, String bucketName) throws CacheLoaderException {
+ if (blob == null) return null;
+ try {
+ Bucket bucket = (Bucket) marshaller.objectFromInputStream(blob.getContent());
+ if (bucket != null) bucket.setBucketName(
+ bucketName);
+ return bucket;
+ } catch (Exception e) {
+ throw convertToCacheLoaderException("Unable to read blob", e);
+ }
+ }
+
+ private String getBucketName(Bucket bucket) {
+ log.warn("Bucket is {0}", bucket);
+ String bucketName = bucket.getBucketName();
+ if (bucketName.startsWith("-")) bucketName = bucketName.replace("-", "A");
+ return bucketName;
+ }
+}
Property changes on: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java (rev 0)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java 2010-01-18 11:49:39 UTC (rev 1385)
@@ -0,0 +1,174 @@
+package org.infinispan.loaders.cloud;
+
+import org.infinispan.loaders.LockSupportCacheStoreConfig;
+
+/**
+ * The cache store config bean for the {@link org.infinispan.loaders.cloud.CloudCacheStore}. This allows you to tune a number of characteristics
+ * of the {@link org.infinispan.loaders.cloud.CloudCacheStore}.
+ * <p/>
+ * <ul>
+ * <li><tt>identity</tt> - A String that identifies you to the cloud provider. For example. with AWS, this is your ACCESS KEY.</li>
+ * <li><tt>password</tt> - A String that is used to authenticate you with the cloud provider. For example. with AWS, this is your SECRET KEY.</li>
+ * <li><tt>bucketPrefix</tt> - A String that is prepended to generated buckets or containers on the cloud store. Buckets or containers are named {bucketPrefix}-{cacheName}.</li>
+ * <li><tt>proxyHost</tt> - The host name of a proxy to use. Optional, no proxy is used if this is un-set.</li>
+ * <li><tt>proxyPort</tt> - The port of a proxy to use. Optional, no proxy is used if this is un-set.</li>
+ * <li><tt>requestTimeout</tt> - A timeout to use when communicating with the cloud storage provider, in milliseconds. Defaults to 10000.</li>
+ * <li><tt>lazyPurgingOnly</tt> - If enabled, then expired entries are only purged on access, lazily, rather than by using the periodic eviction thread. Defaults to <tt>true</tt>.</li>
+ * <li><tt>cloudService</tt> - The cloud service to use. Supported values are <tt>s3</tt> (Amazon AWS), <tt>cloudfiles</tt> (Rackspace Cloud), <tt>azureblob</tt> (Microsoft Azure), and <tt>atmos</tt> (Atmos).</li>
+ * <li><tt>maxConnections</tt> - The maximum number of concurrent connections to make to the cloud provider. Defaults to 10.</li>
+ * <li><tt>secure</tt> - Whether to use secure (SSL) connections or not. Defaults to <tt>true</tt>.</li>
+ * </ul>
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class CloudCacheStoreConfig extends LockSupportCacheStoreConfig {
+ private String identity;
+ private String password;
+ private String bucketPrefix;
+ private String proxyHost;
+ private String proxyPort;
+ private long requestTimeout = 10000;
+ private boolean lazyPurgingOnly = true;
+ private String cloudService;
+ private int maxConnections = 10000;
+ private boolean secure = true;
+
+ public CloudCacheStoreConfig() {
+ setCacheLoaderClassName(CloudCacheStore.class.getName());
+ }
+
+ public String getBucketPrefix() {
+ return bucketPrefix;
+ }
+
+ public void setBucketPrefix(String bucketPrefix) {
+ this.bucketPrefix = bucketPrefix;
+ }
+
+ public String getCloudService() {
+ return cloudService;
+ }
+
+ public void setCloudService(String cloudService) {
+ this.cloudService = cloudService;
+ }
+
+ public String getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(String identity) {
+ this.identity = identity;
+ }
+
+ public boolean isLazyPurgingOnly() {
+ return lazyPurgingOnly;
+ }
+
+ public void setLazyPurgingOnly(boolean lazyPurgingOnly) {
+ this.lazyPurgingOnly = lazyPurgingOnly;
+ }
+
+ public int getMaxConnections() {
+ return maxConnections;
+ }
+
+ public void setMaxConnections(int maxConnections) {
+ this.maxConnections = maxConnections;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getProxyHost() {
+ return proxyHost;
+ }
+
+ public void setProxyHost(String proxyHost) {
+ this.proxyHost = proxyHost;
+ }
+
+ public String getProxyPort() {
+ return proxyPort;
+ }
+
+ public void setProxyPort(String proxyPort) {
+ this.proxyPort = proxyPort;
+ }
+
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public void setRequestTimeout(long requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
+ public boolean isSecure() {
+ return secure;
+ }
+
+ public void setSecure(boolean secure) {
+ this.secure = secure;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+
+ CloudCacheStoreConfig that = (CloudCacheStoreConfig) o;
+
+ if (lazyPurgingOnly != that.lazyPurgingOnly) return false;
+ if (maxConnections != that.maxConnections) return false;
+ if (requestTimeout != that.requestTimeout) return false;
+ if (secure != that.secure) return false;
+ if (bucketPrefix != null ? !bucketPrefix.equals(that.bucketPrefix) : that.bucketPrefix != null) return false;
+ if (cloudService != null ? !cloudService.equals(that.cloudService) : that.cloudService != null) return false;
+ if (identity != null ? !identity.equals(that.identity) : that.identity != null) return false;
+ if (password != null ? !password.equals(that.password) : that.password != null) return false;
+ if (proxyHost != null ? !proxyHost.equals(that.proxyHost) : that.proxyHost != null) return false;
+ if (proxyPort != null ? !proxyPort.equals(that.proxyPort) : that.proxyPort != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (identity != null ? identity.hashCode() : 0);
+ result = 31 * result + (password != null ? password.hashCode() : 0);
+ result = 31 * result + (bucketPrefix != null ? bucketPrefix.hashCode() : 0);
+ result = 31 * result + (proxyHost != null ? proxyHost.hashCode() : 0);
+ result = 31 * result + (proxyPort != null ? proxyPort.hashCode() : 0);
+ result = 31 * result + (int) (requestTimeout ^ (requestTimeout >>> 32));
+ result = 31 * result + (lazyPurgingOnly ? 1 : 0);
+ result = 31 * result + (cloudService != null ? cloudService.hashCode() : 0);
+ result = 31 * result + maxConnections;
+ result = 31 * result + (secure ? 1 : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "CloudCacheStoreConfig{" +
+ "bucketPrefix='" + bucketPrefix + '\'' +
+ ", identity='" + identity + '\'' +
+ ", password=(hidden)" +
+ ", proxyHost='" + proxyHost + '\'' +
+ ", proxyPort='" + proxyPort + '\'' +
+ ", requestTimeout=" + requestTimeout +
+ ", lazyPurgingOnly=" + lazyPurgingOnly +
+ ", cloudService='" + cloudService + '\'' +
+ ", maxConnections=" + maxConnections +
+ ", secure=" + secure +
+ '}';
+ }
+}
\ No newline at end of file
Property changes on: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStoreConfig.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudConnectionException.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudConnectionException.java (rev 0)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudConnectionException.java 2010-01-18 11:49:39 UTC (rev 1385)
@@ -0,0 +1,20 @@
+package org.infinispan.loaders.cloud;
+
+import org.infinispan.loaders.CacheLoaderException;
+
+public class CloudConnectionException extends CacheLoaderException {
+ public CloudConnectionException() {
+ }
+
+ public CloudConnectionException(Throwable cause) {
+ super(cause);
+ }
+
+ public CloudConnectionException(String message) {
+ super(message);
+ }
+
+ public CloudConnectionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
Property changes on: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudConnectionException.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the infinispan-commits
mailing list