[jbosscache-commits] JBoss Cache SVN: r7881 - in core/branches/flat: src/main/java/org/horizon/loader and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Sat Mar 7 22:01:49 EST 2009


Author: adriancole
Date: 2009-03-07 22:01:49 -0500 (Sat, 07 Mar 2009)
New Revision: 7881

Added:
   core/branches/flat/src/main/java/org/horizon/loader/s3/
   core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java
   core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java
   core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java
   core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java
   core/branches/flat/src/test/java/org/horizon/loader/s3/
   core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java
   core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java
Modified:
   core/branches/flat/pom.xml
Log:
added s3 cache store

Modified: core/branches/flat/pom.xml
===================================================================
--- core/branches/flat/pom.xml	2009-03-07 18:58:23 UTC (rev 7880)
+++ core/branches/flat/pom.xml	2009-03-08 03:01:49 UTC (rev 7881)
@@ -75,11 +75,11 @@
       </dependency>
 
       <dependency>
-         <groupId>net.noderunner</groupId>
-         <artifactId>amazon-s3</artifactId>
-         <version>1.0.0.0</version>
+         <groupId>net.java.dev.jets3t</groupId>
+         <artifactId>jets3t</artifactId>
+         <version>0.6.1</version>
          <optional>true</optional>
-      </dependency>
+      </dependency> 
 
       <dependency>
          <groupId>log4j</groupId>
@@ -130,6 +130,14 @@
          <version>2.5</version>
          <scope>test</scope>
       </dependency>
+
+      <dependency>
+         <groupId>commons-io</groupId>
+         <artifactId>commons-io</artifactId>
+         <version>1.4</version>
+         <scope>test</scope>
+      </dependency>
+
       <!-- 5.8 is needed for proper parallel test execution -->
       <dependency>
          <groupId>org.testng</groupId>
@@ -242,8 +250,9 @@
       </repository>
       <!-- For Amazon S3 artifacts -->
       <repository>
-         <id>e-xml.sourceforge.net</id>
-         <url>http://e-xml.sourceforge.net/maven2/repository</url>
+        <name>jets3t</name>
+        <id>jets3t</id>
+        <url>http://jets3t.s3.amazonaws.com/maven2</url>
       </repository>
       <!-- For Sleepycat -->
       <repository>

Added: core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java	2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,135 @@
+package org.horizon.loader.s3;
+
+import org.jets3t.service.S3Service;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.impl.rest.httpclient.RestS3Service;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.multithread.S3ServiceSimpleMulti;
+import org.jets3t.service.security.AWSCredentials;
+
+/**
+ * A {@link org.jets3t.service.S3Service jets3t} implementation of {@link S3Connection}.
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class Jets3tS3Connection implements S3Connection {
+   private S3Service s3Service;
+   private S3ServiceSimpleMulti s3MultiService;
+
+   /**
+    * {@inheritDoc}
+    *
+    * @see RestS3Service#RestS3Service(org.jets3t.service.security.AWSCredentials)
+    * @see S3ServiceSimpleMulti#S3ServiceSimpleMulti(org.jets3t.service.S3Service)
+    */
+   public void connect(String awsAccessKey, String awsSecretKey) throws S3ServiceException {
+      AWSCredentials awsCredentials =
+            new AWSCredentials(awsAccessKey, awsSecretKey);
+      s3Service = new RestS3Service(awsCredentials);
+      s3MultiService = new S3ServiceSimpleMulti(s3Service);
+   }
+
+   /**
+    * {@inheritDoc}
+    *
+    * @see S3Object#S3Object(String)
+    */
+   public S3Object createObject(String key) {
+      return new S3Object(key);
+   }
+
+   /**
+    * {@inheritDoc}
+    *
+    * @see org.jets3t.service.S3Service#deleteObject(org.jets3t.service.model.S3Bucket, String)
+    */
+   public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws S3ServiceException{
+      s3Service.deleteObject(bucket, objectKey);
+   }
+
+   /**
+    * {@inheritDoc}
+    *
+    * @see org.jets3t.service.S3Service#getBucket(String)
+    * @see org.jets3t.service.S3Service#createBucket(String)
+    */
+   public S3Bucket getOrCreateBucket(String bucketName) throws S3ServiceException {
+      /* version 0.7.0 supports the following
+         return s3Service.getOrCreateBucket(config.getBucket()); */
+      if (s3Service.isBucketAccessible(bucketName)) {
+         return s3Service.getBucket(bucketName);
+      } else {
+         return s3Service.createBucket(bucketName);
+      }
+   }
+
+   /**
+    * {@inheritDoc}
+    *
+    * @see org.jets3t.service.S3Service#listObjects(org.jets3t.service.model.S3Bucket)
+    */
+   public S3Object[] getAllObjectsInBucketWithoutTheirData(S3Bucket bucket) throws S3ServiceException {
+      return s3Service.listObjects(bucket);
+   }
+
+   /**
+    * {@inheritDoc}
+    *
+    * @see S3ServiceSimpleMulti#copyObjects(String, String, String[], org.jets3t.service.model.S3Object[], boolean)
+    */
+   public void copyObjectsFromOneBucketToAnother(String[] keys, String sourceBucketName, String destinationBucketName) throws S3ServiceException {
+      S3Object[] destinationObjects = new S3Object[keys.length];
+      int i = 0;
+      for (String key : keys) {
+         destinationObjects[i++] = createObject(key);
+      }
+      s3MultiService.copyObjects(sourceBucketName, destinationBucketName, keys, destinationObjects, false);
+   }
+
+   /**
+    * {@inheritDoc}
+    *
+    * @see Jets3tS3Connection#getAllObjectsInBucketWithoutTheirData(org.jets3t.service.model.S3Bucket)
+    * @see S3ServiceSimpleMulti#deleteObjects(org.jets3t.service.model.S3Bucket, org.jets3t.service.model.S3Object[])
+    */
+   public void removeAllObjectsFromBucket(S3Bucket bucket) throws S3ServiceException{
+      S3Object[] objects = getAllObjectsInBucketWithoutTheirData(bucket);
+      s3MultiService.deleteObjects(bucket, objects);
+   }
+
+   /**
+    * {@inheritDoc}
+    *
+    * @see S3Service#deleteBucket(S3Bucket)
+    */
+   public void removeBucketIfEmpty(S3Bucket bucket) throws S3ServiceException {
+      s3Service.deleteBucket(bucket);
+   }
+
+   /**
+    * {@inheritDoc}
+    *
+    * @see S3Service#getObject(org.jets3t.service.model.S3Bucket, String)
+    */
+   public S3Object getObjectInBucket(String objectKey, S3Bucket bucket) throws S3ServiceException {
+      try {
+         return s3Service.getObject(bucket, objectKey);
+      } catch (S3ServiceException e) {
+         if (e.getS3ErrorCode() != null && e.getS3ErrorCode().equals("NoSuchKey")) {
+            return null;
+         }
+         throw e;
+      }
+   }
+
+   /**
+    * {@inheritDoc}
+    *
+    * @see S3Service#putObject(org.jets3t.service.model.S3Bucket, org.jets3t.service.model.S3Object)
+    */
+   public S3Object putObjectIntoBucket(S3Object object, S3Bucket bucket) throws S3ServiceException {
+      return s3Service.putObject(bucket, object);
+   }
+}
\ No newline at end of file

Added: core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java	2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,208 @@
+package org.horizon.loader.s3;
+
+import org.horizon.Cache;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.bucket.Bucket;
+import org.horizon.loader.bucket.BucketBasedCacheStore;
+import org.horizon.loader.file.FileCacheStore;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.utils.ServiceUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link org.jets3t.service.S3Service jets3t} implementation of a {@link org.horizon.loader.bucket.BucketBasedCacheStore}.
+ * This file store stores stuff in the following format: <tt>http://s3.amazon.com/{bucket}/bucket_number.bucket</tt>
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class S3CacheStore extends BucketBasedCacheStore {
+
+   private static final Log log = LogFactory.getLog(FileCacheStore.class);
+
+   private S3CacheStoreConfig config;
+   private S3Bucket rootS3Bucket;
+
+   Cache cache;
+   Marshaller marshaller;
+
+   private S3Connection s3Connection;
+
+   public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+      return S3CacheStoreConfig.class;
+   }
+
+   /**
+    * {@inheritDoc} This initializes the internal <tt>s3Connection</tt> as an implementation of {@link
+    * Jets3tS3Connection}
+    */
+   public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+      init(config, cache, m, new Jets3tS3Connection());
+   }
+
+   public void init(CacheLoaderConfig config, Cache cache, Marshaller m, S3Connection s3Connection) {
+      super.init(config, cache, m);
+      this.config = (S3CacheStoreConfig) config;
+      this.cache = cache;
+      this.marshaller = m;
+      this.s3Connection = s3Connection;
+   }
+
+
+   public void start() throws CacheLoaderException {
+      super.start();
+
+      String awsAccessKey = config.getAwsAccessKey();
+      if (awsAccessKey == null)
+         throw new IllegalArgumentException("awsAccessKey must be set");
+      String awsSecretKey = config.getAwsSecretKey();
+      if (awsSecretKey == null)
+         throw new IllegalArgumentException("awsSecretKey must be set");
+      String s3Bucket = config.getBucket();
+      if (s3Bucket == null)
+         throw new IllegalArgumentException("s3Bucket must be set");
+
+      try {
+         s3Connection.connect(awsAccessKey, awsSecretKey);
+         rootS3Bucket = s3Connection.getOrCreateBucket(s3Bucket);
+      } catch (S3ServiceException e) {
+         throw convertToCacheLoaderException("error opening s3 service", e);
+      }
+   }
+
+   protected Set<StoredEntry> loadAllLockSafe() throws CacheLoaderException {
+      Set<StoredEntry> result = new HashSet<StoredEntry>();
+      try {
+         for (S3Object s3Object : s3Connection.getAllObjectsInBucketWithoutTheirData(rootS3Bucket)) {
+            Bucket bucket = loadBucket(s3Object);
+            if (bucket != null) {
+               if (bucket.removeExpiredEntries()) {
+                  saveBucket(bucket);
+               }
+               result.addAll(bucket.getStoredEntries());
+            }
+         }
+      } catch (S3ServiceException e) {
+         throw convertToCacheLoaderException("Error while loading entries", e);
+      }
+      return result;
+   }
+
+   protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
+      try {
+         S3Bucket source = (S3Bucket) objectInput.readObject();
+         if (rootS3Bucket.getName().equals(source.getName())) {
+            log.info("Attempt to load the same s3 bucket ignored");
+         } else {
+            S3Object[] sourceObjects = s3Connection.getAllObjectsInBucketWithoutTheirData(source);
+            String[] sourceKeys = new String[sourceObjects.length];
+
+            int i = 0;
+            for (S3Object sourceObject : sourceObjects) {
+               sourceKeys[i++] = sourceObject.getKey();
+            }
+            s3Connection.copyObjectsFromOneBucketToAnother(sourceKeys, source.getName(), rootS3Bucket.getName());
+         }
+         loadAll();
+      } catch (Exception e) {
+         throw convertToCacheLoaderException("Error while reading from stream", e);
+      }
+   }
+
+   protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
+      try {
+         objectOutput.writeObject(rootS3Bucket);
+      } catch (IOException e) {
+         throw convertToCacheLoaderException("Error while writing to stream", e);
+      }
+   }
+
+   protected void clearLockSafe() throws CacheLoaderException {
+      try {
+         s3Connection.removeAllObjectsFromBucket(rootS3Bucket);
+      } catch (S3ServiceException caught) {
+         throw convertToCacheLoaderException("error recreating bucket " + config.getBucket(), caught);
+      }
+   }
+
+   CacheLoaderException convertToCacheLoaderException(String message, Exception caught) {
+      return (caught instanceof CacheLoaderException) ? (CacheLoaderException) caught :
+            new CacheLoaderException(message, caught);
+   }
+
+   protected void purgeInternal() throws CacheLoaderException {
+      loadAll();
+   }
+
+   protected Bucket loadBucket(String bucketName) throws CacheLoaderException {
+      return loadBucket(s3Connection.createObject(bucketName));
+   }
+
+   protected Bucket loadBucket(S3Object s3Object) throws CacheLoaderException {
+      Bucket bucket = null;
+      InputStream is = null;
+      ObjectInputStream ois = null;
+      String key = s3Object.getKey();
+      try {
+         // it is possible that the S3Object above only holds details.  Try to fetch, if this is the case
+         if (s3Object.getDataInputStream() == null) {
+            s3Object = s3Connection.getObjectInBucket(key, rootS3Bucket);
+         }
+
+         // it is possible that the object never existed. in this case, fall out.
+         if (s3Object != null && s3Object.getDataInputStream() != null) {
+            is = s3Object.getDataInputStream();
+            ois = new ObjectInputStream(is);
+            bucket = (Bucket) ois.readObject();
+            s3Object.closeDataInputStream();
+            bucket.setBucketName(s3Object.getKey());
+         }
+      } catch (Exception e) {
+         throw convertToCacheLoaderException("Error while reading from object: " + key, e);
+      } finally {
+         safeClose(ois);
+         safeClose(is);
+      }
+      return bucket;
+   }
+
+   protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+      saveBucket(bucket);
+   }
+
+   public final void saveBucket(Bucket b) throws CacheLoaderException {
+      try {
+         if (b.getEntries().isEmpty()) {
+            s3Connection.removeObjectFromBucket(b.getBucketName(), rootS3Bucket);
+         } else {
+            ByteArrayInputStream dataIS = new ByteArrayInputStream(
+                  marshaller.objectToByteBuffer(b));
+            byte[] md5Hash = ServiceUtils.computeMD5Hash(dataIS);
+            dataIS.reset();
+            S3Object s3Object = s3Connection.createObject(b.getBucketName());
+            s3Object.setDataInputStream(dataIS);
+            s3Object.setContentLength(dataIS.available());
+            s3Object.setMd5Hash(md5Hash);
+            s3Connection.putObjectIntoBucket(s3Object, rootS3Bucket);
+         }
+      } catch (Exception ex) {
+         throw convertToCacheLoaderException("Exception while saving bucket " + b, ex);
+      }
+   }
+
+}

Added: core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java	2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,51 @@
+package org.horizon.loader.s3;
+
+import org.horizon.loader.LockSupportCacheStoreConfig;
+
+/**
+ * Configures {@link org.horizon.loader.s3.S3CacheStore}.  This allows you to tune a number of characteristics of the
+ * {@link S3CacheStore}.
+ * <p/>
+ * <ul> <li><tt>awsAccessKey</tt> - identifies you as the party responsible for s3 requests.  This is required and there
+ * is no default.</li> <li><tt>awsSecretKey</tt> - used to authenticate you as the owner of <tt>awsAccessKey</tt>.  This
+ * is required and there is no default.</li> <li><tt>bucket</tt> - the name of the s3 bucket used to store cache data.
+ * This is required and there is no default.</li> </ul>
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class S3CacheStoreConfig extends LockSupportCacheStoreConfig {
+   private String awsAccessKey;
+   private String awsSecretKey;
+   private String bucket;
+
+   public S3CacheStoreConfig() {
+      setCacheLoaderClassName(S3CacheStore.class.getName());
+   }
+
+   public String getAwsAccessKey() {
+      return awsAccessKey;
+   }
+
+   public void setAwsAccessKey(String awsAccessKey) {
+      this.awsAccessKey = awsAccessKey;
+   }
+
+   public String getAwsSecretKey() {
+      return awsSecretKey;
+   }
+
+   public void setAwsSecretKey(String awsSecretKey) {
+      this.awsSecretKey = awsSecretKey;
+   }
+
+   public String getBucket() {
+      return bucket;
+   }
+
+   public void setBucket(String bucket) {
+      this.bucket = bucket;
+   }
+
+
+}

Added: core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java	2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,35 @@
+package org.horizon.loader.s3;
+
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+
+/**
+ * This interface defines the interactons between the {@link S3CacheStore} and Amazon S3.
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public interface S3Connection {
+
+   void connect(String awsAccessKey, String awsSecretKey) throws S3ServiceException;
+
+   S3Bucket getOrCreateBucket(String bucketName) throws S3ServiceException;
+
+   void removeBucketIfEmpty(S3Bucket bucket) throws S3ServiceException;
+
+   S3Object createObject(String key);
+
+   S3Object putObjectIntoBucket(S3Object object, S3Bucket bucket) throws S3ServiceException;
+
+   S3Object getObjectInBucket(String objectKey, S3Bucket bucket) throws S3ServiceException;
+
+   S3Object[] getAllObjectsInBucketWithoutTheirData(S3Bucket bucket) throws S3ServiceException;
+
+   public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws S3ServiceException;
+
+   void removeAllObjectsFromBucket(S3Bucket rootS3Bucket) throws S3ServiceException;
+
+   void copyObjectsFromOneBucketToAnother(String[] keys, String sourceBucketName, String destinationBucketName) throws S3ServiceException;
+
+}
\ No newline at end of file

Added: core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java	2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,104 @@
+package org.horizon.loader.s3;
+
+import org.apache.commons.io.IOUtils;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class MockS3Connection implements S3Connection {
+   private Map<String, S3Bucket> nameToS3Bucket = new ConcurrentHashMap<String, S3Bucket>();
+   private Map<String, Map<String, S3Object>> bucketToContents = new ConcurrentHashMap<String, Map<String, S3Object>>();
+
+   public synchronized S3Bucket getOrCreateBucket(String bucketName) throws S3ServiceException {
+      S3Bucket bucket = nameToS3Bucket.get(bucketName);
+      if (bucket == null) {
+         bucket = new S3Bucket(bucketName);
+         nameToS3Bucket.put(bucketName, bucket);
+         bucketToContents.put(bucketName, new ConcurrentHashMap<String, S3Object>());
+      }
+      return bucket;
+   }
+
+   public S3Object[] getAllObjectsInBucketWithoutTheirData(S3Bucket bucket) throws S3ServiceException {
+      Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+      return contents.values().toArray(new S3Object[]{});
+   }
+
+   public void copyObjectsFromOneBucketToAnother(String[] keys, String sourceBucketName, String destinationBucketName) throws S3ServiceException {
+      Map<String, S3Object> source = bucketToContents.get(sourceBucketName);
+      Map<String, S3Object> destination = bucketToContents.get(destinationBucketName);
+      for (int i = 0; i < keys.length; i++) {
+         destination.put(keys[i], source.get(keys[i]));
+      }
+   }
+
+   public void removeBucketIfEmpty(S3Bucket bucket) throws S3ServiceException {
+      nameToS3Bucket.remove(bucket.getName());
+      bucketToContents.remove(bucket.getName());
+   }
+
+   public S3Object getObjectInBucket(String objectKey, S3Bucket bucket) throws S3ServiceException {
+      Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+      return contents.get(objectKey);
+   }
+
+   public S3Object putObjectIntoBucket(S3Object object, S3Bucket bucket) throws S3ServiceException {
+      Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+      contents.put(object.getKey(), object);
+      return object;
+   }
+
+   public void connect(String awsAccessKey, String awsSecretKey) throws S3ServiceException {
+      // ignore
+   }
+
+   public S3Object createObject(String key) {
+      return new MockS3Object(key);
+   }
+
+   public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws S3ServiceException {
+      Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+      contents.remove(objectKey);
+   }
+
+   public void removeAllObjectsFromBucket(S3Bucket bucket) throws S3ServiceException {
+      Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+      contents.clear();
+   }
+
+   class MockS3Object extends S3Object {
+
+      byte[] buff;
+
+      public MockS3Object(String key) {
+         super(key);
+      }
+
+      @Override
+      public void setDataInputStream(InputStream inputStream) {
+         try {
+            buff = IOUtils.toByteArray(inputStream);
+         } catch (IOException e) {
+            throw new RuntimeException(e);
+         }
+      }
+
+      @Override
+      public InputStream getDataInputStream() throws S3ServiceException {
+         return (buff != null) ? new ByteArrayInputStream(buff) : null;
+      }
+   }
+
+}
+

Copied: core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java (from rev 7868, core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java)
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java	2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,172 @@
+package org.horizon.loader.s3;
+
+import org.horizon.io.UnclosableObjectInputStream;
+import org.horizon.io.UnclosableObjectOutputStream;
+import org.horizon.loader.BaseCacheStoreTest;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.jets3t.service.model.S3Bucket;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+ at Test(groups = "unit", testName = "loader.s3.S3CacheStoreIntegrationTest")
+public class S3CacheStoreIntegrationTest extends BaseCacheStoreTest {
+   private String csBucket;
+   private String cs2Bucket;
+   private String accessKey;
+   private String secretKey;
+   private S3Connection s3Connection;
+
+   @BeforeTest(enabled = false)
+   public void initRealConnection() {
+      csBucket = "your-favorite-bucket-doesnt-need-to-exist-but-if-it-does-this-test-will-nuke-it";
+      cs2Bucket = csBucket + "2";
+      accessKey = "youraccesskey";
+      secretKey = "yoursecretkey";
+      s3Connection = new Jets3tS3Connection();
+   }
+
+   @BeforeTest(enabled = true)
+   public void initMockConnection() {
+      csBucket = "horizontesting";
+      cs2Bucket = csBucket + "2";
+      accessKey = "dummyaccess";
+      secretKey = "dummysecret";
+      s3Connection = new MockS3Connection();
+   }
+
+   @AfterTest
+   public void removeS3Buckets() throws Exception {
+      s3Connection.removeBucketIfEmpty(new S3Bucket(csBucket));
+      s3Connection.removeBucketIfEmpty(new S3Bucket(cs2Bucket));
+      s3Connection = null;
+   }
+
+   protected CacheStore createCacheStore() throws CacheLoaderException {
+      return createAndStartCacheStore(csBucket);
+   }
+
+   protected CacheStore createAnotherCacheStore() throws CacheLoaderException {
+      return createAndStartCacheStore(cs2Bucket);
+   }
+
+   private CacheStore createAndStartCacheStore(String bucket) throws CacheLoaderException {
+      S3CacheStore cs = new S3CacheStore();
+      S3CacheStoreConfig cfg = new S3CacheStoreConfig();
+      cfg.setBucket(bucket);
+      cfg.setAwsAccessKey(accessKey);
+      cfg.setAwsSecretKey(secretKey);
+      cfg.setPurgeSynchronously(true); // for more accurate unit testing
+      cs.init(cfg, getCache(), getMarshaller(), s3Connection);
+      cs.start();
+      return cs;
+   }
+
+   /*  Changes below are needed to support testing of multiple cache stores */
+
+   protected CacheStore cs2;
+
+   @BeforeMethod
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+      cs2 = createAnotherCacheStore();
+   }
+
+
+   @AfterMethod
+   @Override
+   public void tearDown() throws CacheLoaderException {
+      super.tearDown();
+      if (cs2 != null) {
+         cs2.clear();
+         cs2.stop();
+      }
+      cs2 = null;
+   }
+
+
+   @Override
+   public void testStreamingAPI() throws IOException, ClassNotFoundException, CacheLoaderException {
+      cs.store(new StoredEntry("k1", "v1", -1, -1));
+      cs.store(new StoredEntry("k2", "v2", -1, -1));
+      cs.store(new StoredEntry("k3", "v3", -1, -1));
+
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      ObjectOutputStream oos = new ObjectOutputStream(out);
+      cs.toStream(new UnclosableObjectOutputStream(oos));
+      oos.flush();
+      oos.close();
+      out.close();
+      cs2.clear();
+      ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(out.toByteArray()));
+      cs2.fromStream(new UnclosableObjectInputStream(ois));
+
+      Set<StoredEntry> set = cs2.loadAll();
+
+      assert set.size() == 3;
+      Set expected = new HashSet();
+      expected.add("k1");
+      expected.add("k2");
+      expected.add("k3");
+      for (StoredEntry se : set) assert expected.remove(se.getKey());
+      assert expected.isEmpty();
+   }
+
+   @Override
+   public void testStreamingAPIReusingStreams() throws IOException, ClassNotFoundException, CacheLoaderException {
+      cs.store(new StoredEntry("k1", "v1", -1, -1));
+      cs.store(new StoredEntry("k2", "v2", -1, -1));
+      cs.store(new StoredEntry("k3", "v3", -1, -1));
+
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      byte[] dummyStartBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+      byte[] dummyEndBytes = {8, 7, 6, 5, 4, 3, 2, 1};
+      out.write(dummyStartBytes);
+      ObjectOutputStream oos = new ObjectOutputStream(out);
+      cs.toStream(new UnclosableObjectOutputStream(oos));
+      oos.flush();
+      oos.close();
+      out.write(dummyEndBytes);
+      out.close();
+      cs2.clear();
+
+      // first pop the start bytes
+      byte[] dummy = new byte[8];
+      ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+      int bytesRead = in.read(dummy, 0, 8);
+      assert bytesRead == 8;
+      for (int i = 1; i < 9; i++) assert dummy[i - 1] == i : "Start byte stream corrupted!";
+      cs2.fromStream(new UnclosableObjectInputStream(new ObjectInputStream(in)));
+      bytesRead = in.read(dummy, 0, 8);
+      assert bytesRead == 8;
+      for (int i = 8; i > 0; i--) assert dummy[8 - i] == i : "Start byte stream corrupted!";
+
+      Set<StoredEntry> set = cs2.loadAll();
+
+      assert set.size() == 3;
+      Set expected = new HashSet();
+      expected.add("k1");
+      expected.add("k2");
+      expected.add("k3");
+      for (StoredEntry se : set) assert expected.remove(se.getKey());
+      assert expected.isEmpty();
+   }
+}
\ No newline at end of file




More information about the jbosscache-commits mailing list