Author: adriancole
Date: 2009-03-07 22:01:49 -0500 (Sat, 07 Mar 2009)
New Revision: 7881
Added:
core/branches/flat/src/main/java/org/horizon/loader/s3/
core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java
core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java
core/branches/flat/src/test/java/org/horizon/loader/s3/
core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java
core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java
Modified:
core/branches/flat/pom.xml
Log:
added s3 cache store
Modified: core/branches/flat/pom.xml
===================================================================
--- core/branches/flat/pom.xml 2009-03-07 18:58:23 UTC (rev 7880)
+++ core/branches/flat/pom.xml 2009-03-08 03:01:49 UTC (rev 7881)
@@ -75,11 +75,11 @@
</dependency>
<dependency>
- <groupId>net.noderunner</groupId>
- <artifactId>amazon-s3</artifactId>
- <version>1.0.0.0</version>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ <version>0.6.1</version>
<optional>true</optional>
- </dependency>
+ </dependency>
<dependency>
<groupId>log4j</groupId>
@@ -130,6 +130,14 @@
<version>2.5</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>1.4</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- 5.8 is needed for proper parallel test execution -->
<dependency>
<groupId>org.testng</groupId>
@@ -242,8 +250,9 @@
</repository>
<!-- For Amazon S3 artifacts -->
<repository>
- <id>e-xml.sourceforge.net</id>
- <
url>http://e-xml.sourceforge.net/maven2/repository</url>
+ <name>jets3t</name>
+ <id>jets3t</id>
+ <
url>http://jets3t.s3.amazonaws.com/maven2</url>
</repository>
<!-- For Sleepycat -->
<repository>
Added: core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java 2009-03-08
03:01:49 UTC (rev 7881)
@@ -0,0 +1,135 @@
+package org.horizon.loader.s3;
+
+import org.jets3t.service.S3Service;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.impl.rest.httpclient.RestS3Service;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.multithread.S3ServiceSimpleMulti;
+import org.jets3t.service.security.AWSCredentials;
+
+/**
+ * A {@link org.jets3t.service.S3Service jets3t} implementation of {@link S3Connection}.
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class Jets3tS3Connection implements S3Connection {
+ private S3Service s3Service;
+ private S3ServiceSimpleMulti s3MultiService;
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see RestS3Service#RestS3Service(org.jets3t.service.security.AWSCredentials)
+ * @see S3ServiceSimpleMulti#S3ServiceSimpleMulti(org.jets3t.service.S3Service)
+ */
+ public void connect(String awsAccessKey, String awsSecretKey) throws
S3ServiceException {
+ AWSCredentials awsCredentials =
+ new AWSCredentials(awsAccessKey, awsSecretKey);
+ s3Service = new RestS3Service(awsCredentials);
+ s3MultiService = new S3ServiceSimpleMulti(s3Service);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see S3Object#S3Object(String)
+ */
+ public S3Object createObject(String key) {
+ return new S3Object(key);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jets3t.service.S3Service#deleteObject(org.jets3t.service.model.S3Bucket,
String)
+ */
+ public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws
S3ServiceException{
+ s3Service.deleteObject(bucket, objectKey);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jets3t.service.S3Service#getBucket(String)
+ * @see org.jets3t.service.S3Service#createBucket(String)
+ */
+ public S3Bucket getOrCreateBucket(String bucketName) throws S3ServiceException {
+ /* version 0.7.0 supports the following
+ return s3Service.getOrCreateBucket(config.getBucket()); */
+ if (s3Service.isBucketAccessible(bucketName)) {
+ return s3Service.getBucket(bucketName);
+ } else {
+ return s3Service.createBucket(bucketName);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jets3t.service.S3Service#listObjects(org.jets3t.service.model.S3Bucket)
+ */
+ public S3Object[] getAllObjectsInBucketWithoutTheirData(S3Bucket bucket) throws
S3ServiceException {
+ return s3Service.listObjects(bucket);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see S3ServiceSimpleMulti#copyObjects(String, String, String[],
org.jets3t.service.model.S3Object[], boolean)
+ */
+ public void copyObjectsFromOneBucketToAnother(String[] keys, String sourceBucketName,
String destinationBucketName) throws S3ServiceException {
+ S3Object[] destinationObjects = new S3Object[keys.length];
+ int i = 0;
+ for (String key : keys) {
+ destinationObjects[i++] = createObject(key);
+ }
+ s3MultiService.copyObjects(sourceBucketName, destinationBucketName, keys,
destinationObjects, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see
Jets3tS3Connection#getAllObjectsInBucketWithoutTheirData(org.jets3t.service.model.S3Bucket)
+ * @see S3ServiceSimpleMulti#deleteObjects(org.jets3t.service.model.S3Bucket,
org.jets3t.service.model.S3Object[])
+ */
+ public void removeAllObjectsFromBucket(S3Bucket bucket) throws S3ServiceException{
+ S3Object[] objects = getAllObjectsInBucketWithoutTheirData(bucket);
+ s3MultiService.deleteObjects(bucket, objects);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see S3Service#deleteBucket(S3Bucket)
+ */
+ public void removeBucketIfEmpty(S3Bucket bucket) throws S3ServiceException {
+ s3Service.deleteBucket(bucket);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see S3Service#getObject(org.jets3t.service.model.S3Bucket, String)
+ */
+ public S3Object getObjectInBucket(String objectKey, S3Bucket bucket) throws
S3ServiceException {
+ try {
+ return s3Service.getObject(bucket, objectKey);
+ } catch (S3ServiceException e) {
+ if (e.getS3ErrorCode() != null &&
e.getS3ErrorCode().equals("NoSuchKey")) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see S3Service#putObject(org.jets3t.service.model.S3Bucket,
org.jets3t.service.model.S3Object)
+ */
+ public S3Object putObjectIntoBucket(S3Object object, S3Bucket bucket) throws
S3ServiceException {
+ return s3Service.putObject(bucket, object);
+ }
+}
\ No newline at end of file
Added: core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java
(rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java 2009-03-08
03:01:49 UTC (rev 7881)
@@ -0,0 +1,208 @@
+package org.horizon.loader.s3;
+
+import org.horizon.Cache;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.bucket.Bucket;
+import org.horizon.loader.bucket.BucketBasedCacheStore;
+import org.horizon.loader.file.FileCacheStore;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.utils.ServiceUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link org.jets3t.service.S3Service jets3t} implementation of a {@link
org.horizon.loader.bucket.BucketBasedCacheStore}.
+ * This file store stores stuff in the following format:
<
tt>http://s3.amazon.com/{bucket}/bucket_number.bucket</tt>
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class S3CacheStore extends BucketBasedCacheStore {
+
+ private static final Log log = LogFactory.getLog(FileCacheStore.class);
+
+ private S3CacheStoreConfig config;
+ private S3Bucket rootS3Bucket;
+
+ Cache cache;
+ Marshaller marshaller;
+
+ private S3Connection s3Connection;
+
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return S3CacheStoreConfig.class;
+ }
+
+ /**
+ * {@inheritDoc} This initializes the internal <tt>s3Connection</tt> as an
implementation of {@link
+ * Jets3tS3Connection}
+ */
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ init(config, cache, m, new Jets3tS3Connection());
+ }
+
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m, S3Connection
s3Connection) {
+ super.init(config, cache, m);
+ this.config = (S3CacheStoreConfig) config;
+ this.cache = cache;
+ this.marshaller = m;
+ this.s3Connection = s3Connection;
+ }
+
+
+ public void start() throws CacheLoaderException {
+ super.start();
+
+ String awsAccessKey = config.getAwsAccessKey();
+ if (awsAccessKey == null)
+ throw new IllegalArgumentException("awsAccessKey must be set");
+ String awsSecretKey = config.getAwsSecretKey();
+ if (awsSecretKey == null)
+ throw new IllegalArgumentException("awsSecretKey must be set");
+ String s3Bucket = config.getBucket();
+ if (s3Bucket == null)
+ throw new IllegalArgumentException("s3Bucket must be set");
+
+ try {
+ s3Connection.connect(awsAccessKey, awsSecretKey);
+ rootS3Bucket = s3Connection.getOrCreateBucket(s3Bucket);
+ } catch (S3ServiceException e) {
+ throw convertToCacheLoaderException("error opening s3 service", e);
+ }
+ }
+
+ protected Set<StoredEntry> loadAllLockSafe() throws CacheLoaderException {
+ Set<StoredEntry> result = new HashSet<StoredEntry>();
+ try {
+ for (S3Object s3Object :
s3Connection.getAllObjectsInBucketWithoutTheirData(rootS3Bucket)) {
+ Bucket bucket = loadBucket(s3Object);
+ if (bucket != null) {
+ if (bucket.removeExpiredEntries()) {
+ saveBucket(bucket);
+ }
+ result.addAll(bucket.getStoredEntries());
+ }
+ }
+ } catch (S3ServiceException e) {
+ throw convertToCacheLoaderException("Error while loading entries",
e);
+ }
+ return result;
+ }
+
+ protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException
{
+ try {
+ S3Bucket source = (S3Bucket) objectInput.readObject();
+ if (rootS3Bucket.getName().equals(source.getName())) {
+ log.info("Attempt to load the same s3 bucket ignored");
+ } else {
+ S3Object[] sourceObjects =
s3Connection.getAllObjectsInBucketWithoutTheirData(source);
+ String[] sourceKeys = new String[sourceObjects.length];
+
+ int i = 0;
+ for (S3Object sourceObject : sourceObjects) {
+ sourceKeys[i++] = sourceObject.getKey();
+ }
+ s3Connection.copyObjectsFromOneBucketToAnother(sourceKeys, source.getName(),
rootS3Bucket.getName());
+ }
+ loadAll();
+ } catch (Exception e) {
+ throw convertToCacheLoaderException("Error while reading from stream",
e);
+ }
+ }
+
+ protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException
{
+ try {
+ objectOutput.writeObject(rootS3Bucket);
+ } catch (IOException e) {
+ throw convertToCacheLoaderException("Error while writing to stream",
e);
+ }
+ }
+
+ protected void clearLockSafe() throws CacheLoaderException {
+ try {
+ s3Connection.removeAllObjectsFromBucket(rootS3Bucket);
+ } catch (S3ServiceException caught) {
+ throw convertToCacheLoaderException("error recreating bucket " +
config.getBucket(), caught);
+ }
+ }
+
+ CacheLoaderException convertToCacheLoaderException(String message, Exception caught)
{
+ return (caught instanceof CacheLoaderException) ? (CacheLoaderException) caught :
+ new CacheLoaderException(message, caught);
+ }
+
+ protected void purgeInternal() throws CacheLoaderException {
+ loadAll();
+ }
+
+ protected Bucket loadBucket(String bucketName) throws CacheLoaderException {
+ return loadBucket(s3Connection.createObject(bucketName));
+ }
+
+ protected Bucket loadBucket(S3Object s3Object) throws CacheLoaderException {
+ Bucket bucket = null;
+ InputStream is = null;
+ ObjectInputStream ois = null;
+ String key = s3Object.getKey();
+ try {
+ // it is possible that the S3Object above only holds details. Try to fetch, if
this is the case
+ if (s3Object.getDataInputStream() == null) {
+ s3Object = s3Connection.getObjectInBucket(key, rootS3Bucket);
+ }
+
+ // it is possible that the object never existed. in this case, fall out.
+ if (s3Object != null && s3Object.getDataInputStream() != null) {
+ is = s3Object.getDataInputStream();
+ ois = new ObjectInputStream(is);
+ bucket = (Bucket) ois.readObject();
+ s3Object.closeDataInputStream();
+ bucket.setBucketName(s3Object.getKey());
+ }
+ } catch (Exception e) {
+ throw convertToCacheLoaderException("Error while reading from object:
" + key, e);
+ } finally {
+ safeClose(ois);
+ safeClose(is);
+ }
+ return bucket;
+ }
+
+ protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+ saveBucket(bucket);
+ }
+
+ public final void saveBucket(Bucket b) throws CacheLoaderException {
+ try {
+ if (b.getEntries().isEmpty()) {
+ s3Connection.removeObjectFromBucket(b.getBucketName(), rootS3Bucket);
+ } else {
+ ByteArrayInputStream dataIS = new ByteArrayInputStream(
+ marshaller.objectToByteBuffer(b));
+ byte[] md5Hash = ServiceUtils.computeMD5Hash(dataIS);
+ dataIS.reset();
+ S3Object s3Object = s3Connection.createObject(b.getBucketName());
+ s3Object.setDataInputStream(dataIS);
+ s3Object.setContentLength(dataIS.available());
+ s3Object.setMd5Hash(md5Hash);
+ s3Connection.putObjectIntoBucket(s3Object, rootS3Bucket);
+ }
+ } catch (Exception ex) {
+ throw convertToCacheLoaderException("Exception while saving bucket " +
b, ex);
+ }
+ }
+
+}
Added: core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java 2009-03-08
03:01:49 UTC (rev 7881)
@@ -0,0 +1,51 @@
+package org.horizon.loader.s3;
+
+import org.horizon.loader.LockSupportCacheStoreConfig;
+
+/**
+ * Configures {@link org.horizon.loader.s3.S3CacheStore}. This allows you to tune a
number of characteristics of the
+ * {@link S3CacheStore}.
+ * <p/>
+ * <ul> <li><tt>awsAccessKey</tt> - identifies you as the party
responsible for s3 requests. This is required and there
+ * is no default.</li> <li><tt>awsSecretKey</tt> - used to
authenticate you as the owner of <tt>awsAccessKey</tt>. This
+ * is required and there is no default.</li> <li><tt>bucket</tt>
- the name of the s3 bucket used to store cache data.
+ * This is required and there is no default.</li> </ul>
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class S3CacheStoreConfig extends LockSupportCacheStoreConfig {
+ private String awsAccessKey;
+ private String awsSecretKey;
+ private String bucket;
+
+ public S3CacheStoreConfig() {
+ setCacheLoaderClassName(S3CacheStore.class.getName());
+ }
+
+ public String getAwsAccessKey() {
+ return awsAccessKey;
+ }
+
+ public void setAwsAccessKey(String awsAccessKey) {
+ this.awsAccessKey = awsAccessKey;
+ }
+
+ public String getAwsSecretKey() {
+ return awsSecretKey;
+ }
+
+ public void setAwsSecretKey(String awsSecretKey) {
+ this.awsSecretKey = awsSecretKey;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
+
+}
Added: core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java
(rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java 2009-03-08
03:01:49 UTC (rev 7881)
@@ -0,0 +1,35 @@
+package org.horizon.loader.s3;
+
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+
+/**
+ * This interface defines the interactons between the {@link S3CacheStore} and Amazon
S3.
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public interface S3Connection {
+
+ void connect(String awsAccessKey, String awsSecretKey) throws S3ServiceException;
+
+ S3Bucket getOrCreateBucket(String bucketName) throws S3ServiceException;
+
+ void removeBucketIfEmpty(S3Bucket bucket) throws S3ServiceException;
+
+ S3Object createObject(String key);
+
+ S3Object putObjectIntoBucket(S3Object object, S3Bucket bucket) throws
S3ServiceException;
+
+ S3Object getObjectInBucket(String objectKey, S3Bucket bucket) throws
S3ServiceException;
+
+ S3Object[] getAllObjectsInBucketWithoutTheirData(S3Bucket bucket) throws
S3ServiceException;
+
+ public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws
S3ServiceException;
+
+ void removeAllObjectsFromBucket(S3Bucket rootS3Bucket) throws S3ServiceException;
+
+ void copyObjectsFromOneBucketToAnother(String[] keys, String sourceBucketName, String
destinationBucketName) throws S3ServiceException;
+
+}
\ No newline at end of file
Added: core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java 2009-03-08
03:01:49 UTC (rev 7881)
@@ -0,0 +1,104 @@
+package org.horizon.loader.s3;
+
+import org.apache.commons.io.IOUtils;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class MockS3Connection implements S3Connection {
+ private Map<String, S3Bucket> nameToS3Bucket = new ConcurrentHashMap<String,
S3Bucket>();
+ private Map<String, Map<String, S3Object>> bucketToContents = new
ConcurrentHashMap<String, Map<String, S3Object>>();
+
+ public synchronized S3Bucket getOrCreateBucket(String bucketName) throws
S3ServiceException {
+ S3Bucket bucket = nameToS3Bucket.get(bucketName);
+ if (bucket == null) {
+ bucket = new S3Bucket(bucketName);
+ nameToS3Bucket.put(bucketName, bucket);
+ bucketToContents.put(bucketName, new ConcurrentHashMap<String,
S3Object>());
+ }
+ return bucket;
+ }
+
+ public S3Object[] getAllObjectsInBucketWithoutTheirData(S3Bucket bucket) throws
S3ServiceException {
+ Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+ return contents.values().toArray(new S3Object[]{});
+ }
+
+ public void copyObjectsFromOneBucketToAnother(String[] keys, String sourceBucketName,
String destinationBucketName) throws S3ServiceException {
+ Map<String, S3Object> source = bucketToContents.get(sourceBucketName);
+ Map<String, S3Object> destination =
bucketToContents.get(destinationBucketName);
+ for (int i = 0; i < keys.length; i++) {
+ destination.put(keys[i], source.get(keys[i]));
+ }
+ }
+
+ public void removeBucketIfEmpty(S3Bucket bucket) throws S3ServiceException {
+ nameToS3Bucket.remove(bucket.getName());
+ bucketToContents.remove(bucket.getName());
+ }
+
+ public S3Object getObjectInBucket(String objectKey, S3Bucket bucket) throws
S3ServiceException {
+ Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+ return contents.get(objectKey);
+ }
+
+ public S3Object putObjectIntoBucket(S3Object object, S3Bucket bucket) throws
S3ServiceException {
+ Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+ contents.put(object.getKey(), object);
+ return object;
+ }
+
+ public void connect(String awsAccessKey, String awsSecretKey) throws
S3ServiceException {
+ // ignore
+ }
+
+ public S3Object createObject(String key) {
+ return new MockS3Object(key);
+ }
+
+ public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws
S3ServiceException {
+ Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+ contents.remove(objectKey);
+ }
+
+ public void removeAllObjectsFromBucket(S3Bucket bucket) throws S3ServiceException {
+ Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+ contents.clear();
+ }
+
+ class MockS3Object extends S3Object {
+
+ byte[] buff;
+
+ public MockS3Object(String key) {
+ super(key);
+ }
+
+ @Override
+ public void setDataInputStream(InputStream inputStream) {
+ try {
+ buff = IOUtils.toByteArray(inputStream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public InputStream getDataInputStream() throws S3ServiceException {
+ return (buff != null) ? new ByteArrayInputStream(buff) : null;
+ }
+ }
+
+}
+
Copied:
core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java
(from rev 7868,
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java)
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java 2009-03-08
03:01:49 UTC (rev 7881)
@@ -0,0 +1,172 @@
+package org.horizon.loader.s3;
+
+import org.horizon.io.UnclosableObjectInputStream;
+import org.horizon.io.UnclosableObjectOutputStream;
+import org.horizon.loader.BaseCacheStoreTest;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.jets3t.service.model.S3Bucket;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", testName =
"loader.s3.S3CacheStoreIntegrationTest")
+public class S3CacheStoreIntegrationTest extends BaseCacheStoreTest {
+ private String csBucket;
+ private String cs2Bucket;
+ private String accessKey;
+ private String secretKey;
+ private S3Connection s3Connection;
+
+ @BeforeTest(enabled = false)
+ public void initRealConnection() {
+ csBucket =
"your-favorite-bucket-doesnt-need-to-exist-but-if-it-does-this-test-will-nuke-it";
+ cs2Bucket = csBucket + "2";
+ accessKey = "youraccesskey";
+ secretKey = "yoursecretkey";
+ s3Connection = new Jets3tS3Connection();
+ }
+
+ @BeforeTest(enabled = true)
+ public void initMockConnection() {
+ csBucket = "horizontesting";
+ cs2Bucket = csBucket + "2";
+ accessKey = "dummyaccess";
+ secretKey = "dummysecret";
+ s3Connection = new MockS3Connection();
+ }
+
+ @AfterTest
+ public void removeS3Buckets() throws Exception {
+ s3Connection.removeBucketIfEmpty(new S3Bucket(csBucket));
+ s3Connection.removeBucketIfEmpty(new S3Bucket(cs2Bucket));
+ s3Connection = null;
+ }
+
+ protected CacheStore createCacheStore() throws CacheLoaderException {
+ return createAndStartCacheStore(csBucket);
+ }
+
+ protected CacheStore createAnotherCacheStore() throws CacheLoaderException {
+ return createAndStartCacheStore(cs2Bucket);
+ }
+
+ private CacheStore createAndStartCacheStore(String bucket) throws CacheLoaderException
{
+ S3CacheStore cs = new S3CacheStore();
+ S3CacheStoreConfig cfg = new S3CacheStoreConfig();
+ cfg.setBucket(bucket);
+ cfg.setAwsAccessKey(accessKey);
+ cfg.setAwsSecretKey(secretKey);
+ cfg.setPurgeSynchronously(true); // for more accurate unit testing
+ cs.init(cfg, getCache(), getMarshaller(), s3Connection);
+ cs.start();
+ return cs;
+ }
+
+ /* Changes below are needed to support testing of multiple cache stores */
+
+ protected CacheStore cs2;
+
+ @BeforeMethod
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ cs2 = createAnotherCacheStore();
+ }
+
+
+ @AfterMethod
+ @Override
+ public void tearDown() throws CacheLoaderException {
+ super.tearDown();
+ if (cs2 != null) {
+ cs2.clear();
+ cs2.stop();
+ }
+ cs2 = null;
+ }
+
+
+ @Override
+ public void testStreamingAPI() throws IOException, ClassNotFoundException,
CacheLoaderException {
+ cs.store(new StoredEntry("k1", "v1", -1, -1));
+ cs.store(new StoredEntry("k2", "v2", -1, -1));
+ cs.store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ cs.toStream(new UnclosableObjectOutputStream(oos));
+ oos.flush();
+ oos.close();
+ out.close();
+ cs2.clear();
+ ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(out.toByteArray()));
+ cs2.fromStream(new UnclosableObjectInputStream(ois));
+
+ Set<StoredEntry> set = cs2.loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+ @Override
+ public void testStreamingAPIReusingStreams() throws IOException,
ClassNotFoundException, CacheLoaderException {
+ cs.store(new StoredEntry("k1", "v1", -1, -1));
+ cs.store(new StoredEntry("k2", "v2", -1, -1));
+ cs.store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] dummyStartBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+ byte[] dummyEndBytes = {8, 7, 6, 5, 4, 3, 2, 1};
+ out.write(dummyStartBytes);
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ cs.toStream(new UnclosableObjectOutputStream(oos));
+ oos.flush();
+ oos.close();
+ out.write(dummyEndBytes);
+ out.close();
+ cs2.clear();
+
+ // first pop the start bytes
+ byte[] dummy = new byte[8];
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ int bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i = 1; i < 9; i++) assert dummy[i - 1] == i : "Start byte stream
corrupted!";
+ cs2.fromStream(new UnclosableObjectInputStream(new ObjectInputStream(in)));
+ bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i = 8; i > 0; i--) assert dummy[8 - i] == i : "Start byte stream
corrupted!";
+
+ Set<StoredEntry> set = cs2.loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+}
\ No newline at end of file