JBoss Cache SVN: r7884 - in core/branches/flat: src/main/java/org/horizon/loader and 3 other directories.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-08 17:03:03 -0400 (Sun, 08 Mar 2009)
New Revision: 7884
Added:
core/branches/flat/src/main/java/org/horizon/loader/jdbm/
core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmConnection.java
core/branches/flat/src/test/java/org/horizon/loader/jdbm/
core/branches/flat/src/test/java/org/horizon/loader/jdbm/JdbmCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/jdbm/JdbmLearningTest.java
Modified:
core/branches/flat/pom.xml
Log:
added jdbm cachestore
Modified: core/branches/flat/pom.xml
===================================================================
--- core/branches/flat/pom.xml 2009-03-08 21:02:07 UTC (rev 7883)
+++ core/branches/flat/pom.xml 2009-03-08 21:03:03 UTC (rev 7884)
@@ -52,6 +52,13 @@
<version>1.0</version>
<optional>true</optional>
</dependency>
+ <!-- needed for jdbm -->
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.1</version>
+ <optional>true</optional>
+ </dependency>
<dependency>
<groupId>c3p0</groupId>
Copied: core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmCacheStore.java (from rev 7868, core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmCacheStore.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmCacheStore.java 2009-03-08 21:03:03 UTC (rev 7884)
@@ -0,0 +1,166 @@
+package org.horizon.loader.jdbm;
+
+import jdbm.btree.BTree;
+import jdbm.helper.Tuple;
+import jdbm.helper.TupleBrowser;
+import org.horizon.Cache;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.bucket.Bucket;
+import org.horizon.loader.file.FileCacheStore;
+import org.horizon.marshall.Marshaller;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A JDBM (http://jdbm.sourceforge.net) extension of {@link org.horizon.loader.bucket.BucketBasedCacheStore}. This file
+ * store stores stuff in a {@link BTree} at the following location: <tt>/{location}/cache name/db.db</tt>. The {@link
+ * BTree} is named the same as {@link org.horizon.Cache#getName()}. {@link Bucket}s are stored inside the {@link BTree}
+ * keyed on {@link org.horizon.loader.bucket.Bucket#getBucketName()}.
+ * <p/>
+ * <p/>
+ * JdbmCacheStore uses {@link Bucket}s as entries in {@Btree} are limited by having <tt>int</tt> primary keys.
+ * <p/>
+ * JdbmCacheStore does not use JDBM transactions and instead relies on the locking mechanisms provided by {@link
+ * org.horizon.loader.LockSupportCacheStore}.
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class JdbmCacheStore extends FileCacheStore {
+
+ private BTree btree;
+ private String treeName;
+ private JdbmConnection connection;
+
+ @Override
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ init((JdbmCacheStoreConfig) config, cache, m, new JdbmConnection());
+ }
+
+ public void init(JdbmCacheStoreConfig config, Cache cache, Marshaller m, JdbmConnection connection) {
+ super.init(config, cache, m);
+ this.treeName = cache.getName();
+ this.connection = connection;
+ }
+
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return JdbmCacheStoreConfig.class;
+ }
+
+ @Override
+ public void start() throws CacheLoaderException {
+ super.start();
+ open();
+ }
+
+ private void open() throws CacheLoaderException {
+ connection.openEnvironment(getRoot());
+ try {
+ btree = connection.loadOrCreateBTree(treeName);
+ } catch (IOException caught) {
+ throw new CacheLoaderException("Problem loading btree", caught);
+ }
+ }
+
+ @Override
+ public void stop() throws CacheLoaderException {
+ super.stop();
+ close();
+ }
+
+ private void close() throws CacheLoaderException {
+ btree = null;
+ connection.closeEnvironment();
+ }
+
+ @Override
+ protected Set<StoredEntry> loadAllLockSafe() throws CacheLoaderException {
+ Set<StoredEntry> result = new HashSet<StoredEntry>();
+ try {
+ TupleBrowser browser = btree.browse();
+ Tuple tuple = new Tuple();
+ while (browser.getNext(tuple)) {
+ if (tuple.getValue() != null) {
+ Bucket bucket = (Bucket) tuple.getValue();
+ if (bucket != null) {
+ if (bucket.removeExpiredEntries()) {
+ saveBucket(bucket);
+ }
+ result.addAll(bucket.getStoredEntries());
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new CacheLoaderException("Problem loading entries", e);
+ }
+ return result;
+ }
+
+ @Override
+ protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
+ try {
+ close();
+ String name = (String) objectInput.readObject();
+ super.fromStreamLockSafe(objectInput);
+ String oldName = treeName;
+ treeName = name;
+ open();
+ connection.associateBTreeWithName(btree, oldName);
+ treeName = oldName;
+ } catch (Exception e) {
+ throw new CacheLoaderException("Problem loading from stream", e);
+ }
+ }
+
+ @Override
+ protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
+ try {
+ close();
+ objectOutput.writeObject(treeName);
+ super.toStreamLockSafe(objectOutput);
+ } catch (IOException e) {
+ throw new CacheLoaderException("Problem generating stream", e);
+ } finally {
+ open();
+ }
+ }
+
+ @Override
+ protected void clearLockSafe() throws CacheLoaderException {
+ try {
+ connection.deleteBTree(btree);
+ btree = connection.createBTree(treeName);
+ } catch (IOException caught) {
+ throw convertToCacheLoaderException("error recreating tree" + treeName, caught);
+ }
+ }
+
+ @Override
+ protected Bucket loadBucket(String bucketName) throws CacheLoaderException {
+ try {
+ return (Bucket) btree.find(bucketName);
+ } catch (IOException caught) {
+ throw convertToCacheLoaderException("error finding bucket " + bucketName, caught);
+ }
+ }
+
+ @Override
+ public void saveBucket(Bucket b) throws CacheLoaderException {
+ try {
+ btree.insert(b.getBucketName(), b, true);
+ } catch (IOException caught) {
+ throw convertToCacheLoaderException("error saving bucket " + b.getBucketName(), caught);
+ }
+ }
+
+ CacheLoaderException convertToCacheLoaderException(String message, Exception caught) {
+ return (caught instanceof CacheLoaderException) ? (CacheLoaderException) caught :
+ new CacheLoaderException(message, caught);
+ }
+}
\ No newline at end of file
Copied: core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmCacheStoreConfig.java (from rev 7868, core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmCacheStoreConfig.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmCacheStoreConfig.java 2009-03-08 21:03:03 UTC (rev 7884)
@@ -0,0 +1,19 @@
+package org.horizon.loader.jdbm;
+
+import org.horizon.loader.file.FileCacheStoreConfig;
+
+/**
+ * Configures {@link org.horizon.loader.jdbm.JdbmCacheStore}.
+ *
+ * @author Adrian Cole
+ * @see FileCacheStoreConfig
+ * @since 1.0
+ */
+public class JdbmCacheStoreConfig extends FileCacheStoreConfig {
+
+ public JdbmCacheStoreConfig() {
+ super();
+ setLocation("Horizon-JdbmCacheStore");
+ setCacheLoaderClassName(JdbmCacheStore.class.getName());
+ }
+}
\ No newline at end of file
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmConnection.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmConnection.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbm/JdbmConnection.java 2009-03-08 21:03:03 UTC (rev 7884)
@@ -0,0 +1,81 @@
+package org.horizon.loader.jdbm;
+
+import jdbm.RecordManager;
+import jdbm.RecordManagerFactory;
+import jdbm.btree.BTree;
+import org.apache.commons.collections.ComparatorUtils;
+import org.horizon.loader.CacheLoaderException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Properties;
+
+/**
+ * This interface defines the interactons between the {@link org.horizon.loader.jdbm.JdbmCacheStore} and the JDBM {@link
+ * RecordManager}.
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class JdbmConnection {
+
+ private RecordManager recman;
+
+ public void openEnvironment(File root) throws CacheLoaderException {
+ System.out.println("Opening environment in: " + root);
+ try {
+ recman = createRecordManager(root);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ public void closeEnvironment() throws CacheLoaderException {
+ try {
+ recman.close();
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ recman = null;
+ }
+
+
+ public BTree createBTree(String name) throws IOException {
+ Comparator comp = ComparatorUtils.naturalComparator();
+ /* note the code of btree does not expose its serializer and key/value serializers
+ that are exposed are never used. Accordingly, we don't set the serializer */
+ BTree tree = BTree.createInstance(recman, comp);
+ associateBTreeWithName(tree, name);
+ return tree;
+ }
+
+ public void associateBTreeWithName(BTree tree, String name) throws IOException {
+ recman.setNamedObject(name, tree.getRecid());
+ }
+
+ public RecordManager createRecordManager(File home) throws IOException {
+ return RecordManagerFactory.createRecordManager(new File(home, "/db").getAbsolutePath(), new Properties());
+ }
+
+ public BTree loadOrCreateBTree(String name) throws IOException {
+ BTree btree;
+ try {
+ btree = loadBTree(name);
+ } catch (IllegalArgumentException e) {
+ btree = createBTree(name);
+ }
+ return btree;
+ }
+
+ public void deleteBTree(BTree btree) throws IOException {
+ recman.delete(btree.getRecid());
+ }
+
+ public BTree loadBTree(String name) throws IOException {
+ long recid = recman.getNamedObject(name);
+ if (recid != 0)
+ return BTree.load(recman, recid);
+ else throw new IllegalArgumentException("name not found in record manager" + name);
+ }
+}
\ No newline at end of file
Copied: core/branches/flat/src/test/java/org/horizon/loader/jdbm/JdbmCacheStoreTest.java (from rev 7868, core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java)
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbm/JdbmCacheStoreTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbm/JdbmCacheStoreTest.java 2009-03-08 21:03:03 UTC (rev 7884)
@@ -0,0 +1,36 @@
+package org.horizon.loader.jdbm;
+
+import org.horizon.loader.BaseCacheStoreTest;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheStore;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.io.File;
+
+@Test(groups = "unit", testName = "loader.jdbm.JdbmCacheStoreTest")
+public class JdbmCacheStoreTest extends BaseCacheStoreTest {
+
+ private final String tmpDirectory = TestingUtil.TEST_FILES + File.separator + getClass().getSimpleName();
+ private JdbmCacheStore jcs;
+
+ protected CacheStore createCacheStore() throws CacheLoaderException {
+ jcs = new JdbmCacheStore();
+ JdbmCacheStoreConfig cfg = new JdbmCacheStoreConfig();
+ cfg.setLocation(tmpDirectory);
+ cfg.setPurgeSynchronously(true); // for more accurate unit testing
+ jcs.init(cfg, getCache(), getMarshaller());
+ jcs.start();
+ return jcs;
+ }
+
+ @AfterTest
+ @BeforeTest
+ public void removeTempDirectory() {
+ TestingUtil.recursiveFileRemove(tmpDirectory);
+ }
+
+
+}
\ No newline at end of file
Added: core/branches/flat/src/test/java/org/horizon/loader/jdbm/JdbmLearningTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbm/JdbmLearningTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbm/JdbmLearningTest.java 2009-03-08 21:03:03 UTC (rev 7884)
@@ -0,0 +1,764 @@
+package org.horizon.loader.jdbm;
+
+import jdbm.RecordManager;
+import jdbm.RecordManagerFactory;
+import jdbm.btree.BTree;
+import jdbm.helper.Tuple;
+import jdbm.helper.TupleBrowser;
+import org.apache.commons.collections.ComparatorUtils;
+import org.easymock.EasyMock;
+import org.horizon.io.UnclosableObjectInputStream;
+import org.horizon.io.UnclosableObjectOutputStream;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Clear;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Learning tests for JDBM (http://jdbm.sourceforge.net). Behaviour here is used in JdbmCacheLoader. When there are
+ * upgrades to JDBM, this test may warrant updating.
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName = "loader.bdbje.JdbmLearningTest")
+public class JdbmLearningTest {
+
+ private static final Log log = LogFactory.getLog(JdbmLearningTest.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ String dbHome = TestingUtil.TEST_FILES + "/Horizon-JdbmLearningTest";
+ private BTree btree;
+ private RecordManager recman;
+ private String treeName = this.getClass().getName();
+
+ @BeforeTest
+ @AfterMethod
+ public void tearDown() throws Exception {
+ TestingUtil.recursiveFileRemove(dbHome);
+ }
+
+
+ @BeforeMethod
+ public void setUp() throws CacheLoaderException {
+ new File(dbHome).mkdirs();
+ start();
+
+ }
+
+ private BTree createBTree(RecordManager recman, String name) throws IOException {
+
+ Comparator comp = ComparatorUtils.naturalComparator();
+ /* note the code of btree does not expose its serializer and key/value serializers
+ that are exposed are never used. Accordingly, we don't set the serializer */
+ BTree tree = BTree.createInstance(recman, comp);
+ associateBTreeWithName(recman, tree, name);
+ return tree;
+ }
+
+ private void associateBTreeWithName(RecordManager recman, BTree tree, String name) throws IOException {
+ recman.setNamedObject(name, tree.getRecid());
+ }
+
+ private RecordManager createRecordManager(String home) throws IOException {
+ return RecordManagerFactory.createRecordManager(home + "/db", new Properties());
+ }
+
+ private void start() throws CacheLoaderException {
+ System.out.println("Opening environment in: " + dbHome);
+ try {
+ recman = createRecordManager(dbHome);
+ btree = loadOrCreateBTree(recman, treeName);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ private BTree loadOrCreateBTree(RecordManager recman, String name) throws IOException {
+ BTree btree;
+ try {
+ btree = loadBTree(recman, name);
+ } catch (IllegalArgumentException e) {
+ btree = createBTree(recman, name);
+ }
+ return btree;
+ }
+
+ private void deleteBTree(RecordManager recman, BTree btree) throws IOException {
+ recman.delete(btree.getRecid());
+ }
+
+ private BTree loadBTree(RecordManager recman, String name) throws IOException {
+ long recid = recman.getNamedObject(name);
+ log.debug(name + " located as " + recid);
+ if (recid != 0)
+ return BTree.load(recman, recid);
+ else throw new IllegalArgumentException("name not found in record manager" + name);
+ }
+
+ private void stop() throws CacheLoaderException {
+ btree = null;
+ try {
+ recman.close();
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ recman = null;
+ }
+
+
+ public boolean containsKey(Object key) throws CacheLoaderException {
+ return load(key) != null;
+ }
+
+ public StoredEntry load(Object key) throws CacheLoaderException {
+ try {
+ StoredEntry entry = (StoredEntry) btree.find(key);
+ return removeAndReturnNullIfExpired(entry);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ private StoredEntry removeAndReturnNullIfExpired(StoredEntry entry) throws IOException {
+ if (entry != null && entry.isExpired()) {
+ btree.remove(entry.getKey());
+ return null;
+ }
+ return entry;
+ }
+
+ public Set<StoredEntry> loadAll() throws CacheLoaderException {
+ try {
+ Set<StoredEntry> entries = new HashSet<StoredEntry>();
+ TupleBrowser browser = btree.browse();
+ Tuple tuple = new Tuple();
+ while (browser.getNext(tuple)) {
+ if (tuple.getValue() != null) {
+ StoredEntry entry = (StoredEntry) tuple.getValue();
+ entry = removeAndReturnNullIfExpired(entry);
+ if (entry != null) entries.add(entry);
+ }
+ }
+ return entries;
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ public void store(StoredEntry ed) throws CacheLoaderException {
+ try {
+ btree.insert(ed.getKey(), ed, true);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ private void purgeExpired() throws CacheLoaderException {
+ try {
+ TupleBrowser browser = btree.browse();
+ Tuple tuple = new Tuple();
+ while (browser.getNext(tuple)) {
+ if (tuple.getValue() != null) {
+ StoredEntry entry = (StoredEntry) tuple.getValue();
+ removeAndReturnNullIfExpired(entry);
+ }
+ }
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+
+ public void fromStream(ObjectInput inputStream) throws CacheLoaderException {
+ FileChannel channel = null;
+ try {
+ stop();
+ String name = (String) inputStream.readObject();
+
+ channel = new RandomAccessFile(dbHome + "/db.db", "rw").getChannel();
+ long size = inputStream.readLong();
+ ByteBuffer buff = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
+ long i = 0;
+ while (i++ < size) {
+ buff.put(inputStream.readByte());
+ }
+ channel.close();
+
+ channel = new RandomAccessFile(dbHome + "/db.lg", "rw").getChannel();
+ size = inputStream.readLong();
+ buff = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);
+ i = 0;
+ while (i++ < size) {
+ buff.put(inputStream.readByte());
+ }
+ channel.close();
+
+ String oldName = treeName;
+ treeName = name;
+ start();
+ associateBTreeWithName(recman, btree, oldName);
+ treeName = oldName;
+
+ } catch (FileNotFoundException e) {
+ throw new CacheLoaderException(e);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ } catch (ClassNotFoundException e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ if (channel != null)
+ try {
+ channel.close();
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+ }
+
+ public void toStream(ObjectOutput outputStream) throws CacheLoaderException {
+ FileChannel channel = null;
+ try {
+ stop();
+ outputStream.writeObject(treeName);
+
+ channel = new RandomAccessFile(dbHome + "/db.db", "r").getChannel();
+ long size = channel.size();
+ outputStream.writeLong(size);
+ ByteBuffer buff = channel.map(FileChannel.MapMode.READ_ONLY, 0, size);
+ long i = 0;
+ while (i++ < size) {
+ outputStream.write(buff.get());
+ }
+ channel.close();
+
+ channel = new RandomAccessFile(dbHome + "/db.lg", "r").getChannel();
+ size = channel.size();
+ outputStream.writeLong(size);
+ buff = channel.map(FileChannel.MapMode.READ_ONLY, 0, size);
+ i = 0;
+ while (i++ < size) {
+ outputStream.write(buff.get());
+ }
+ channel.close();
+ channel = null;
+
+ } catch (FileNotFoundException e) {
+ throw new CacheLoaderException(e);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ if (recman == null)
+ start();
+ if (channel != null)
+ try {
+ channel.close();
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+ }
+
+ public void clear() throws CacheLoaderException {
+ try {
+ deleteBTree(recman, btree);
+ btree = createBTree(recman, treeName);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ public boolean remove(Object key) throws CacheLoaderException {
+ try {
+ return btree.remove(key) != null;
+ } catch (IllegalArgumentException e) {
+ return false;
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+
+ private final Map<Transaction, List<? extends Modification>> transactions = new ConcurrentHashMap<Transaction, List<? extends Modification>>();
+
+ protected void applyModifications(List<? extends Modification> mods) throws CacheLoaderException {
+ for (Modification m : mods) {
+ switch (m.getType()) {
+ case STORE:
+ Store s = (Store) m;
+ store(s.getStoredEntry());
+ break;
+ case CLEAR:
+ clear();
+ break;
+ case REMOVE:
+ Remove r = (Remove) m;
+ remove(r.getKey());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown modification type " + m.getType());
+ }
+ }
+ }
+
+ public void prepare(List<? extends Modification> mods, Transaction tx, boolean isOnePhase) throws CacheLoaderException {
+ if (isOnePhase) {
+ applyModifications(mods);
+ } else {
+ transactions.put(tx, mods);
+ }
+ }
+
+ public void rollback(Transaction tx) {
+ transactions.remove(tx);
+ }
+
+ public void commit(Transaction tx) throws CacheLoaderException {
+ List<? extends Modification> list = transactions.remove(tx);
+ if (list != null && !list.isEmpty()) applyModifications(list);
+ }
+
+ public void removeAll(Set<Object> keys) throws CacheLoaderException {
+ if (keys != null && !keys.isEmpty()) {
+ for (Object key : keys) remove(key);
+ }
+ }
+
+
+ public void testLoadAndStore() throws InterruptedException, CacheLoaderException {
+ assert !containsKey("k");
+ StoredEntry se = new StoredEntry("k", "v", -1, -1);
+ store(se);
+
+ assert load("k").getValue().equals("v");
+ assert load("k").getLifespan() == -1;
+ assert !load("k").isExpired();
+ assert containsKey("k");
+
+ long now = System.currentTimeMillis();
+ long lifespan = 120000;
+ se = new StoredEntry("k", "v", now, now + lifespan);
+ store(se);
+
+ assert load("k").getValue().equals("v");
+ assert load("k").getLifespan() == lifespan;
+ assert !load("k").isExpired();
+ assert containsKey("k");
+
+ now = System.currentTimeMillis();
+ lifespan = 1;
+ se = new StoredEntry("k", "v", now, now + lifespan);
+ store(se);
+ Thread.sleep(100);
+ assert se.isExpired();
+ assert load("k") == null;
+ assert !containsKey("k");
+ }
+
+
+ public void testStopStartDoesntNukeValues() throws InterruptedException, CacheLoaderException {
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+
+ long now = System.currentTimeMillis();
+ long lifespan = 1;
+ StoredEntry se1 = new StoredEntry("k1", "v1", now, now + lifespan);
+ StoredEntry se2 = new StoredEntry("k2", "v2");
+
+ store(se1);
+ store(se2);
+ Thread.sleep(100);
+ stop();
+ start();
+ assert se1.isExpired();
+ assert load("k1") == null;
+ assert !containsKey("k1");
+ assert load("k2") != null;
+ assert containsKey("k2");
+ assert load("k2").getValue().equals("v2");
+
+ }
+
+
+ public void testOnePhaseCommit() throws CacheLoaderException {
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ prepare(mods, tx, true);
+
+ assert load("k2").getValue().equals("v2");
+ assert !containsKey("k1");
+
+ clear();
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ prepare(mods, tx, true);
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+ assert containsKey("k3");
+ }
+
+ public void testTwoPhaseCommit() throws CacheLoaderException {
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ prepare(mods, tx, false);
+
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+
+ commit(tx);
+
+ assert load("k2").getValue().equals("v2");
+ assert !containsKey("k1");
+
+ clear();
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ prepare(mods, tx, false);
+
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+ assert !containsKey("k3");
+
+ commit(tx);
+
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+ assert containsKey("k3");
+ }
+
+
+ public void testRollback() throws CacheLoaderException {
+
+ store(new StoredEntry("old", "old", -1, -1));
+
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ mods.add(new Remove("old"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ prepare(mods, tx, false);
+
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+ assert containsKey("old");
+
+ rollback(tx);
+
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+ assert containsKey("old");
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ prepare(mods, tx, false);
+
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+ assert !containsKey("k3");
+
+ rollback(tx);
+
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+ assert !containsKey("k3");
+ assert containsKey("old");
+ }
+
+ public void testRollbackFromADifferentThreadReusingTransactionKey() throws CacheLoaderException, InterruptedException {
+
+ store(new StoredEntry("old", "old", -1, -1));
+
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ mods.add(new Remove("old"));
+ final Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ prepare(mods, tx, false);
+
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ rollback(tx);
+ }
+ });
+
+ t.start();
+ t.join();
+
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+ assert containsKey("old");
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ prepare(mods, tx, false);
+
+ Thread t2 = new Thread(new Runnable() {
+ public void run() {
+ rollback(tx);
+ }
+ });
+
+ t2.start();
+ t2.join();
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+ assert !containsKey("k3");
+ assert containsKey("old");
+ }
+
+ public void testCommitAndRollbackWithoutPrepare() throws CacheLoaderException {
+ store(new StoredEntry("old", "old", -1, -1));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ commit(tx);
+ store(new StoredEntry("old", "old", -1, -1));
+ rollback(tx);
+
+ assert containsKey("old");
+ }
+
+ public void testPreload() throws CacheLoaderException {
+ store(new StoredEntry("k1", "v1", -1, -1));
+ store(new StoredEntry("k2", "v2", -1, -1));
+ store(new StoredEntry("k3", "v3", -1, -1));
+
+ Set<StoredEntry> set = loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+ @Test
+ public void testStoreAndRemoveAll() throws CacheLoaderException {
+ store(new StoredEntry("k1", "v1", -1, -1));
+ store(new StoredEntry("k2", "v2", -1, -1));
+ store(new StoredEntry("k3", "v3", -1, -1));
+ store(new StoredEntry("k4", "v4", -1, -1));
+
+
+ Set<StoredEntry> set = loadAll();
+
+ assert set.size() == 4;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ expected.add("k4");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+
+ Set toRemove = new HashSet();
+ toRemove.add("k1");
+ toRemove.add("k2");
+ toRemove.add("k3");
+ removeAll(toRemove);
+
+ set = loadAll();
+ assert set.size() == 1;
+ set.remove("k4");
+ assert expected.isEmpty();
+ }
+
+ public void testPurgeExpired() throws Exception {
+ long now = System.currentTimeMillis();
+ long lifespan = 1000;
+ store(new StoredEntry("k1", "v1", now, now + lifespan));
+ store(new StoredEntry("k2", "v2", now, now + lifespan));
+ store(new StoredEntry("k3", "v3", now, now + lifespan));
+ assert containsKey("k1");
+ assert containsKey("k2");
+ assert containsKey("k3");
+ Thread.sleep(lifespan + 100);
+ purgeExpired();
+ assert !containsKey("k1");
+ assert !containsKey("k2");
+ assert !containsKey("k3");
+ }
+
+
+ public void testStreamingAPI() throws IOException, ClassNotFoundException, CacheLoaderException {
+ store(new StoredEntry("k1", "v1", -1, -1));
+ store(new StoredEntry("k2", "v2", -1, -1));
+ store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ toStream(new UnclosableObjectOutputStream(oos));
+ oos.flush();
+ oos.close();
+ out.close();
+ clear();
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(out.toByteArray()));
+ fromStream(new UnclosableObjectInputStream(ois));
+
+ Set<StoredEntry> set = loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+ public void testStreamingAPIReusingStreams() throws IOException, ClassNotFoundException, CacheLoaderException {
+ store(new StoredEntry("k1", "v1", -1, -1));
+ store(new StoredEntry("k2", "v2", -1, -1));
+ store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] dummyStartBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+ byte[] dummyEndBytes = {8, 7, 6, 5, 4, 3, 2, 1};
+ out.write(dummyStartBytes);
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ toStream(new UnclosableObjectOutputStream(oos));
+ oos.flush();
+ oos.close();
+ out.write(dummyEndBytes);
+ out.close();
+ clear();
+
+ // first pop the start bytes
+ byte[] dummy = new byte[8];
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ int bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i = 1; i < 9; i++) assert dummy[i - 1] == i : "Start byte stream corrupted!";
+ fromStream(new UnclosableObjectInputStream(new ObjectInputStream(in)));
+ bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i = 8; i > 0; i--) assert dummy[8 - i] == i : "Start byte stream corrupted!";
+
+ Set<StoredEntry> set = loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+ public void testConcurrency() throws Exception {
+ int numThreads = 3;
+ final int loops = 500;
+ final String[] keys = new String[10];
+ final String[] values = new String[10];
+ for (int i = 0; i < 10; i++) keys[i] = "k" + i;
+ for (int i = 0; i < 10; i++) values[i] = "v" + i;
+
+
+ final Random r = new Random();
+ final List<Exception> exceptions = new LinkedList<Exception>();
+
+ final Runnable store = new Runnable() {
+ public void run() {
+ try {
+ int randomInt = r.nextInt(10);
+ store(new StoredEntry(keys[randomInt], values[randomInt]));
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ };
+
+ final Runnable remove = new Runnable() {
+ public void run() {
+ try {
+ remove(keys[r.nextInt(10)]);
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ };
+
+ final Runnable get = new Runnable() {
+ public void run() {
+ try {
+ int randomInt = r.nextInt(10);
+ StoredEntry se = load(keys[randomInt]);
+ assert se == null || se.getValue().equals(values[randomInt]);
+ loadAll();
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ };
+
+ Thread[] threads = new Thread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ threads[i] = new Thread(getClass().getSimpleName() + "-" + i) {
+ public void run() {
+ for (int i = 0; i < loops; i++) {
+ store.run();
+ remove.run();
+ get.run();
+ }
+ }
+ };
+ }
+
+ for (Thread t : threads) t.start();
+ for (Thread t : threads) t.join();
+
+ if (!exceptions.isEmpty()) throw exceptions.get(0);
+ }
+}
15 years, 9 months
JBoss Cache SVN: r7883 - core/branches/flat/src/main/java/org/horizon/loader/file.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-08 17:02:07 -0400 (Sun, 08 Mar 2009)
New Revision: 7883
Modified:
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
Log:
modified to allow for extension
Modified: core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-03-08 21:01:16 UTC (rev 7882)
+++ core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-03-08 21:02:07 UTC (rev 7883)
@@ -33,6 +33,13 @@
Marshaller marshaller;
File root;
+ /**
+ * @return root directory where all files for this {@link org.horizon.loader.CacheStore CacheStore} are written.
+ */
+ public File getRoot() {
+ return root;
+ }
+
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
super.init(config, cache, m);
this.config = (FileCacheStoreConfig) config;
@@ -43,7 +50,6 @@
protected Set<StoredEntry> loadAllLockSafe() throws CacheLoaderException {
Set<StoredEntry> result = new HashSet<StoredEntry>();
for (File bucketFile : root.listFiles()) {
- String bucketName = bucketFile.getName();
Bucket bucket = loadBucket(bucketFile);
if (bucket != null) {
if (bucket.removeExpiredEntries()) {
@@ -155,7 +161,7 @@
saveBucket(bucket);
}
- public final void saveBucket(Bucket b) throws CacheLoaderException {
+ public void saveBucket(Bucket b) throws CacheLoaderException {
File f = new File(root, b.getBucketName());
if (f.exists()) {
if (!f.delete()) log.warn("Had problems removing file {0}", f);
15 years, 9 months
JBoss Cache SVN: r7882 - core/branches/flat/src/main/java/org/horizon/loader/s3.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-08 17:01:16 -0400 (Sun, 08 Mar 2009)
New Revision: 7882
Modified:
core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java
core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java
Log:
added location of tuning properties
Modified: core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java 2009-03-08 03:01:49 UTC (rev 7881)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java 2009-03-08 21:01:16 UTC (rev 7882)
@@ -10,8 +10,12 @@
/**
* A {@link org.jets3t.service.S3Service jets3t} implementation of {@link S3Connection}.
+ * <p/>
+ * Tuning and configuration parameters can be overridden by creating <tt>jets3t.properties</tt> and adding it to your
+ * classpath.
*
* @author Adrian Cole
+ * @link http://jets3t.s3.amazonaws.com/toolkit/configuration.html
* @since 1.0
*/
public class Jets3tS3Connection implements S3Connection {
@@ -45,7 +49,7 @@
*
* @see org.jets3t.service.S3Service#deleteObject(org.jets3t.service.model.S3Bucket, String)
*/
- public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws S3ServiceException{
+ public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws S3ServiceException {
s3Service.deleteObject(bucket, objectKey);
}
@@ -94,7 +98,7 @@
* @see Jets3tS3Connection#getAllObjectsInBucketWithoutTheirData(org.jets3t.service.model.S3Bucket)
* @see S3ServiceSimpleMulti#deleteObjects(org.jets3t.service.model.S3Bucket, org.jets3t.service.model.S3Object[])
*/
- public void removeAllObjectsFromBucket(S3Bucket bucket) throws S3ServiceException{
+ public void removeAllObjectsFromBucket(S3Bucket bucket) throws S3ServiceException {
S3Object[] objects = getAllObjectsInBucketWithoutTheirData(bucket);
s3MultiService.deleteObjects(bucket, objects);
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java 2009-03-08 03:01:49 UTC (rev 7881)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java 2009-03-08 21:01:16 UTC (rev 7882)
@@ -27,8 +27,12 @@
/**
* A {@link org.jets3t.service.S3Service jets3t} implementation of a {@link org.horizon.loader.bucket.BucketBasedCacheStore}.
* This file store stores stuff in the following format: <tt>http://s3.amazon.com/{bucket}/bucket_number.bucket</tt>
+ * <p/>
+ * Tuning and configuration parameters can be overridden by creating <tt>jets3t.properties</tt> and adding it to your
+ * classpath.
*
* @author Adrian Cole
+ * @link http://jets3t.s3.amazonaws.com/toolkit/configuration.html
* @since 1.0
*/
public class S3CacheStore extends BucketBasedCacheStore {
15 years, 9 months
JBoss Cache SVN: r7881 - in core/branches/flat: src/main/java/org/horizon/loader and 3 other directories.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-07 22:01:49 -0500 (Sat, 07 Mar 2009)
New Revision: 7881
Added:
core/branches/flat/src/main/java/org/horizon/loader/s3/
core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java
core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java
core/branches/flat/src/test/java/org/horizon/loader/s3/
core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java
core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java
Modified:
core/branches/flat/pom.xml
Log:
added s3 cache store
Modified: core/branches/flat/pom.xml
===================================================================
--- core/branches/flat/pom.xml 2009-03-07 18:58:23 UTC (rev 7880)
+++ core/branches/flat/pom.xml 2009-03-08 03:01:49 UTC (rev 7881)
@@ -75,11 +75,11 @@
</dependency>
<dependency>
- <groupId>net.noderunner</groupId>
- <artifactId>amazon-s3</artifactId>
- <version>1.0.0.0</version>
+ <groupId>net.java.dev.jets3t</groupId>
+ <artifactId>jets3t</artifactId>
+ <version>0.6.1</version>
<optional>true</optional>
- </dependency>
+ </dependency>
<dependency>
<groupId>log4j</groupId>
@@ -130,6 +130,14 @@
<version>2.5</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>1.4</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- 5.8 is needed for proper parallel test execution -->
<dependency>
<groupId>org.testng</groupId>
@@ -242,8 +250,9 @@
</repository>
<!-- For Amazon S3 artifacts -->
<repository>
- <id>e-xml.sourceforge.net</id>
- <url>http://e-xml.sourceforge.net/maven2/repository</url>
+ <name>jets3t</name>
+ <id>jets3t</id>
+ <url>http://jets3t.s3.amazonaws.com/maven2</url>
</repository>
<!-- For Sleepycat -->
<repository>
Added: core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/Jets3tS3Connection.java 2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,135 @@
+package org.horizon.loader.s3;
+
+import org.jets3t.service.S3Service;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.impl.rest.httpclient.RestS3Service;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.multithread.S3ServiceSimpleMulti;
+import org.jets3t.service.security.AWSCredentials;
+
+/**
+ * A {@link org.jets3t.service.S3Service jets3t} implementation of {@link S3Connection}.
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class Jets3tS3Connection implements S3Connection {
+ private S3Service s3Service;
+ private S3ServiceSimpleMulti s3MultiService;
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see RestS3Service#RestS3Service(org.jets3t.service.security.AWSCredentials)
+ * @see S3ServiceSimpleMulti#S3ServiceSimpleMulti(org.jets3t.service.S3Service)
+ */
+ public void connect(String awsAccessKey, String awsSecretKey) throws S3ServiceException {
+ AWSCredentials awsCredentials =
+ new AWSCredentials(awsAccessKey, awsSecretKey);
+ s3Service = new RestS3Service(awsCredentials);
+ s3MultiService = new S3ServiceSimpleMulti(s3Service);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see S3Object#S3Object(String)
+ */
+ public S3Object createObject(String key) {
+ return new S3Object(key);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jets3t.service.S3Service#deleteObject(org.jets3t.service.model.S3Bucket, String)
+ */
+ public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws S3ServiceException{
+ s3Service.deleteObject(bucket, objectKey);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jets3t.service.S3Service#getBucket(String)
+ * @see org.jets3t.service.S3Service#createBucket(String)
+ */
+ public S3Bucket getOrCreateBucket(String bucketName) throws S3ServiceException {
+ /* version 0.7.0 supports the following
+ return s3Service.getOrCreateBucket(config.getBucket()); */
+ if (s3Service.isBucketAccessible(bucketName)) {
+ return s3Service.getBucket(bucketName);
+ } else {
+ return s3Service.createBucket(bucketName);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see org.jets3t.service.S3Service#listObjects(org.jets3t.service.model.S3Bucket)
+ */
+ public S3Object[] getAllObjectsInBucketWithoutTheirData(S3Bucket bucket) throws S3ServiceException {
+ return s3Service.listObjects(bucket);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see S3ServiceSimpleMulti#copyObjects(String, String, String[], org.jets3t.service.model.S3Object[], boolean)
+ */
+ public void copyObjectsFromOneBucketToAnother(String[] keys, String sourceBucketName, String destinationBucketName) throws S3ServiceException {
+ S3Object[] destinationObjects = new S3Object[keys.length];
+ int i = 0;
+ for (String key : keys) {
+ destinationObjects[i++] = createObject(key);
+ }
+ s3MultiService.copyObjects(sourceBucketName, destinationBucketName, keys, destinationObjects, false);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see Jets3tS3Connection#getAllObjectsInBucketWithoutTheirData(org.jets3t.service.model.S3Bucket)
+ * @see S3ServiceSimpleMulti#deleteObjects(org.jets3t.service.model.S3Bucket, org.jets3t.service.model.S3Object[])
+ */
+ public void removeAllObjectsFromBucket(S3Bucket bucket) throws S3ServiceException{
+ S3Object[] objects = getAllObjectsInBucketWithoutTheirData(bucket);
+ s3MultiService.deleteObjects(bucket, objects);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see S3Service#deleteBucket(S3Bucket)
+ */
+ public void removeBucketIfEmpty(S3Bucket bucket) throws S3ServiceException {
+ s3Service.deleteBucket(bucket);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see S3Service#getObject(org.jets3t.service.model.S3Bucket, String)
+ */
+ public S3Object getObjectInBucket(String objectKey, S3Bucket bucket) throws S3ServiceException {
+ try {
+ return s3Service.getObject(bucket, objectKey);
+ } catch (S3ServiceException e) {
+ if (e.getS3ErrorCode() != null && e.getS3ErrorCode().equals("NoSuchKey")) {
+ return null;
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @see S3Service#putObject(org.jets3t.service.model.S3Bucket, org.jets3t.service.model.S3Object)
+ */
+ public S3Object putObjectIntoBucket(S3Object object, S3Bucket bucket) throws S3ServiceException {
+ return s3Service.putObject(bucket, object);
+ }
+}
\ No newline at end of file
Added: core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStore.java 2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,208 @@
+package org.horizon.loader.s3;
+
+import org.horizon.Cache;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.bucket.Bucket;
+import org.horizon.loader.bucket.BucketBasedCacheStore;
+import org.horizon.loader.file.FileCacheStore;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+import org.jets3t.service.utils.ServiceUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A {@link org.jets3t.service.S3Service jets3t} implementation of a {@link org.horizon.loader.bucket.BucketBasedCacheStore}.
+ * This file store stores stuff in the following format: <tt>http://s3.amazon.com/{bucket}/bucket_number.bucket</tt>
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class S3CacheStore extends BucketBasedCacheStore {
+
+ private static final Log log = LogFactory.getLog(FileCacheStore.class);
+
+ private S3CacheStoreConfig config;
+ private S3Bucket rootS3Bucket;
+
+ Cache cache;
+ Marshaller marshaller;
+
+ private S3Connection s3Connection;
+
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return S3CacheStoreConfig.class;
+ }
+
+ /**
+ * {@inheritDoc} This initializes the internal <tt>s3Connection</tt> as an implementation of {@link
+ * Jets3tS3Connection}
+ */
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ init(config, cache, m, new Jets3tS3Connection());
+ }
+
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m, S3Connection s3Connection) {
+ super.init(config, cache, m);
+ this.config = (S3CacheStoreConfig) config;
+ this.cache = cache;
+ this.marshaller = m;
+ this.s3Connection = s3Connection;
+ }
+
+
+ public void start() throws CacheLoaderException {
+ super.start();
+
+ String awsAccessKey = config.getAwsAccessKey();
+ if (awsAccessKey == null)
+ throw new IllegalArgumentException("awsAccessKey must be set");
+ String awsSecretKey = config.getAwsSecretKey();
+ if (awsSecretKey == null)
+ throw new IllegalArgumentException("awsSecretKey must be set");
+ String s3Bucket = config.getBucket();
+ if (s3Bucket == null)
+ throw new IllegalArgumentException("s3Bucket must be set");
+
+ try {
+ s3Connection.connect(awsAccessKey, awsSecretKey);
+ rootS3Bucket = s3Connection.getOrCreateBucket(s3Bucket);
+ } catch (S3ServiceException e) {
+ throw convertToCacheLoaderException("error opening s3 service", e);
+ }
+ }
+
+ protected Set<StoredEntry> loadAllLockSafe() throws CacheLoaderException {
+ Set<StoredEntry> result = new HashSet<StoredEntry>();
+ try {
+ for (S3Object s3Object : s3Connection.getAllObjectsInBucketWithoutTheirData(rootS3Bucket)) {
+ Bucket bucket = loadBucket(s3Object);
+ if (bucket != null) {
+ if (bucket.removeExpiredEntries()) {
+ saveBucket(bucket);
+ }
+ result.addAll(bucket.getStoredEntries());
+ }
+ }
+ } catch (S3ServiceException e) {
+ throw convertToCacheLoaderException("Error while loading entries", e);
+ }
+ return result;
+ }
+
+ protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
+ try {
+ S3Bucket source = (S3Bucket) objectInput.readObject();
+ if (rootS3Bucket.getName().equals(source.getName())) {
+ log.info("Attempt to load the same s3 bucket ignored");
+ } else {
+ S3Object[] sourceObjects = s3Connection.getAllObjectsInBucketWithoutTheirData(source);
+ String[] sourceKeys = new String[sourceObjects.length];
+
+ int i = 0;
+ for (S3Object sourceObject : sourceObjects) {
+ sourceKeys[i++] = sourceObject.getKey();
+ }
+ s3Connection.copyObjectsFromOneBucketToAnother(sourceKeys, source.getName(), rootS3Bucket.getName());
+ }
+ loadAll();
+ } catch (Exception e) {
+ throw convertToCacheLoaderException("Error while reading from stream", e);
+ }
+ }
+
+ protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
+ try {
+ objectOutput.writeObject(rootS3Bucket);
+ } catch (IOException e) {
+ throw convertToCacheLoaderException("Error while writing to stream", e);
+ }
+ }
+
+ protected void clearLockSafe() throws CacheLoaderException {
+ try {
+ s3Connection.removeAllObjectsFromBucket(rootS3Bucket);
+ } catch (S3ServiceException caught) {
+ throw convertToCacheLoaderException("error recreating bucket " + config.getBucket(), caught);
+ }
+ }
+
+ CacheLoaderException convertToCacheLoaderException(String message, Exception caught) {
+ return (caught instanceof CacheLoaderException) ? (CacheLoaderException) caught :
+ new CacheLoaderException(message, caught);
+ }
+
+ protected void purgeInternal() throws CacheLoaderException {
+ loadAll();
+ }
+
+ protected Bucket loadBucket(String bucketName) throws CacheLoaderException {
+ return loadBucket(s3Connection.createObject(bucketName));
+ }
+
+ protected Bucket loadBucket(S3Object s3Object) throws CacheLoaderException {
+ Bucket bucket = null;
+ InputStream is = null;
+ ObjectInputStream ois = null;
+ String key = s3Object.getKey();
+ try {
+ // it is possible that the S3Object above only holds details. Try to fetch, if this is the case
+ if (s3Object.getDataInputStream() == null) {
+ s3Object = s3Connection.getObjectInBucket(key, rootS3Bucket);
+ }
+
+ // it is possible that the object never existed. in this case, fall out.
+ if (s3Object != null && s3Object.getDataInputStream() != null) {
+ is = s3Object.getDataInputStream();
+ ois = new ObjectInputStream(is);
+ bucket = (Bucket) ois.readObject();
+ s3Object.closeDataInputStream();
+ bucket.setBucketName(s3Object.getKey());
+ }
+ } catch (Exception e) {
+ throw convertToCacheLoaderException("Error while reading from object: " + key, e);
+ } finally {
+ safeClose(ois);
+ safeClose(is);
+ }
+ return bucket;
+ }
+
+ protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+ saveBucket(bucket);
+ }
+
+ public final void saveBucket(Bucket b) throws CacheLoaderException {
+ try {
+ if (b.getEntries().isEmpty()) {
+ s3Connection.removeObjectFromBucket(b.getBucketName(), rootS3Bucket);
+ } else {
+ ByteArrayInputStream dataIS = new ByteArrayInputStream(
+ marshaller.objectToByteBuffer(b));
+ byte[] md5Hash = ServiceUtils.computeMD5Hash(dataIS);
+ dataIS.reset();
+ S3Object s3Object = s3Connection.createObject(b.getBucketName());
+ s3Object.setDataInputStream(dataIS);
+ s3Object.setContentLength(dataIS.available());
+ s3Object.setMd5Hash(md5Hash);
+ s3Connection.putObjectIntoBucket(s3Object, rootS3Bucket);
+ }
+ } catch (Exception ex) {
+ throw convertToCacheLoaderException("Exception while saving bucket " + b, ex);
+ }
+ }
+
+}
Added: core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/S3CacheStoreConfig.java 2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,51 @@
+package org.horizon.loader.s3;
+
+import org.horizon.loader.LockSupportCacheStoreConfig;
+
+/**
+ * Configures {@link org.horizon.loader.s3.S3CacheStore}. This allows you to tune a number of characteristics of the
+ * {@link S3CacheStore}.
+ * <p/>
+ * <ul> <li><tt>awsAccessKey</tt> - identifies you as the party responsible for s3 requests. This is required and there
+ * is no default.</li> <li><tt>awsSecretKey</tt> - used to authenticate you as the owner of <tt>awsAccessKey</tt>. This
+ * is required and there is no default.</li> <li><tt>bucket</tt> - the name of the s3 bucket used to store cache data.
+ * This is required and there is no default.</li> </ul>
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public class S3CacheStoreConfig extends LockSupportCacheStoreConfig {
+ private String awsAccessKey;
+ private String awsSecretKey;
+ private String bucket;
+
+ public S3CacheStoreConfig() {
+ setCacheLoaderClassName(S3CacheStore.class.getName());
+ }
+
+ public String getAwsAccessKey() {
+ return awsAccessKey;
+ }
+
+ public void setAwsAccessKey(String awsAccessKey) {
+ this.awsAccessKey = awsAccessKey;
+ }
+
+ public String getAwsSecretKey() {
+ return awsSecretKey;
+ }
+
+ public void setAwsSecretKey(String awsSecretKey) {
+ this.awsSecretKey = awsSecretKey;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
+
+}
Added: core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/s3/S3Connection.java 2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,35 @@
+package org.horizon.loader.s3;
+
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+
+/**
+ * This interface defines the interactons between the {@link S3CacheStore} and Amazon S3.
+ *
+ * @author Adrian Cole
+ * @since 1.0
+ */
+public interface S3Connection {
+
+ void connect(String awsAccessKey, String awsSecretKey) throws S3ServiceException;
+
+ S3Bucket getOrCreateBucket(String bucketName) throws S3ServiceException;
+
+ void removeBucketIfEmpty(S3Bucket bucket) throws S3ServiceException;
+
+ S3Object createObject(String key);
+
+ S3Object putObjectIntoBucket(S3Object object, S3Bucket bucket) throws S3ServiceException;
+
+ S3Object getObjectInBucket(String objectKey, S3Bucket bucket) throws S3ServiceException;
+
+ S3Object[] getAllObjectsInBucketWithoutTheirData(S3Bucket bucket) throws S3ServiceException;
+
+ public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws S3ServiceException;
+
+ void removeAllObjectsFromBucket(S3Bucket rootS3Bucket) throws S3ServiceException;
+
+ void copyObjectsFromOneBucketToAnother(String[] keys, String sourceBucketName, String destinationBucketName) throws S3ServiceException;
+
+}
\ No newline at end of file
Added: core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/s3/MockS3Connection.java 2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,104 @@
+package org.horizon.loader.s3;
+
+import org.apache.commons.io.IOUtils;
+import org.jets3t.service.S3ServiceException;
+import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.model.S3Object;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class MockS3Connection implements S3Connection {
+ private Map<String, S3Bucket> nameToS3Bucket = new ConcurrentHashMap<String, S3Bucket>();
+ private Map<String, Map<String, S3Object>> bucketToContents = new ConcurrentHashMap<String, Map<String, S3Object>>();
+
+ public synchronized S3Bucket getOrCreateBucket(String bucketName) throws S3ServiceException {
+ S3Bucket bucket = nameToS3Bucket.get(bucketName);
+ if (bucket == null) {
+ bucket = new S3Bucket(bucketName);
+ nameToS3Bucket.put(bucketName, bucket);
+ bucketToContents.put(bucketName, new ConcurrentHashMap<String, S3Object>());
+ }
+ return bucket;
+ }
+
+ public S3Object[] getAllObjectsInBucketWithoutTheirData(S3Bucket bucket) throws S3ServiceException {
+ Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+ return contents.values().toArray(new S3Object[]{});
+ }
+
+ public void copyObjectsFromOneBucketToAnother(String[] keys, String sourceBucketName, String destinationBucketName) throws S3ServiceException {
+ Map<String, S3Object> source = bucketToContents.get(sourceBucketName);
+ Map<String, S3Object> destination = bucketToContents.get(destinationBucketName);
+ for (int i = 0; i < keys.length; i++) {
+ destination.put(keys[i], source.get(keys[i]));
+ }
+ }
+
+ public void removeBucketIfEmpty(S3Bucket bucket) throws S3ServiceException {
+ nameToS3Bucket.remove(bucket.getName());
+ bucketToContents.remove(bucket.getName());
+ }
+
+ public S3Object getObjectInBucket(String objectKey, S3Bucket bucket) throws S3ServiceException {
+ Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+ return contents.get(objectKey);
+ }
+
+ public S3Object putObjectIntoBucket(S3Object object, S3Bucket bucket) throws S3ServiceException {
+ Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+ contents.put(object.getKey(), object);
+ return object;
+ }
+
+ public void connect(String awsAccessKey, String awsSecretKey) throws S3ServiceException {
+ // ignore
+ }
+
+ public S3Object createObject(String key) {
+ return new MockS3Object(key);
+ }
+
+ public void removeObjectFromBucket(String objectKey, S3Bucket bucket) throws S3ServiceException {
+ Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+ contents.remove(objectKey);
+ }
+
+ public void removeAllObjectsFromBucket(S3Bucket bucket) throws S3ServiceException {
+ Map<String, S3Object> contents = bucketToContents.get(bucket.getName());
+ contents.clear();
+ }
+
+ class MockS3Object extends S3Object {
+
+ byte[] buff;
+
+ public MockS3Object(String key) {
+ super(key);
+ }
+
+ @Override
+ public void setDataInputStream(InputStream inputStream) {
+ try {
+ buff = IOUtils.toByteArray(inputStream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public InputStream getDataInputStream() throws S3ServiceException {
+ return (buff != null) ? new ByteArrayInputStream(buff) : null;
+ }
+ }
+
+}
+
Copied: core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java (from rev 7868, core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java)
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/s3/S3CacheStoreIntegrationTest.java 2009-03-08 03:01:49 UTC (rev 7881)
@@ -0,0 +1,172 @@
+package org.horizon.loader.s3;
+
+import org.horizon.io.UnclosableObjectInputStream;
+import org.horizon.io.UnclosableObjectOutputStream;
+import org.horizon.loader.BaseCacheStoreTest;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.jets3t.service.model.S3Bucket;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", testName = "loader.s3.S3CacheStoreIntegrationTest")
+public class S3CacheStoreIntegrationTest extends BaseCacheStoreTest {
+ private String csBucket;
+ private String cs2Bucket;
+ private String accessKey;
+ private String secretKey;
+ private S3Connection s3Connection;
+
+ @BeforeTest(enabled = false)
+ public void initRealConnection() {
+ csBucket = "your-favorite-bucket-doesnt-need-to-exist-but-if-it-does-this-test-will-nuke-it";
+ cs2Bucket = csBucket + "2";
+ accessKey = "youraccesskey";
+ secretKey = "yoursecretkey";
+ s3Connection = new Jets3tS3Connection();
+ }
+
+ @BeforeTest(enabled = true)
+ public void initMockConnection() {
+ csBucket = "horizontesting";
+ cs2Bucket = csBucket + "2";
+ accessKey = "dummyaccess";
+ secretKey = "dummysecret";
+ s3Connection = new MockS3Connection();
+ }
+
+ @AfterTest
+ public void removeS3Buckets() throws Exception {
+ s3Connection.removeBucketIfEmpty(new S3Bucket(csBucket));
+ s3Connection.removeBucketIfEmpty(new S3Bucket(cs2Bucket));
+ s3Connection = null;
+ }
+
+ protected CacheStore createCacheStore() throws CacheLoaderException {
+ return createAndStartCacheStore(csBucket);
+ }
+
+ protected CacheStore createAnotherCacheStore() throws CacheLoaderException {
+ return createAndStartCacheStore(cs2Bucket);
+ }
+
+ private CacheStore createAndStartCacheStore(String bucket) throws CacheLoaderException {
+ S3CacheStore cs = new S3CacheStore();
+ S3CacheStoreConfig cfg = new S3CacheStoreConfig();
+ cfg.setBucket(bucket);
+ cfg.setAwsAccessKey(accessKey);
+ cfg.setAwsSecretKey(secretKey);
+ cfg.setPurgeSynchronously(true); // for more accurate unit testing
+ cs.init(cfg, getCache(), getMarshaller(), s3Connection);
+ cs.start();
+ return cs;
+ }
+
+ /* Changes below are needed to support testing of multiple cache stores */
+
+ protected CacheStore cs2;
+
+ @BeforeMethod
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ cs2 = createAnotherCacheStore();
+ }
+
+
+ @AfterMethod
+ @Override
+ public void tearDown() throws CacheLoaderException {
+ super.tearDown();
+ if (cs2 != null) {
+ cs2.clear();
+ cs2.stop();
+ }
+ cs2 = null;
+ }
+
+
+ @Override
+ public void testStreamingAPI() throws IOException, ClassNotFoundException, CacheLoaderException {
+ cs.store(new StoredEntry("k1", "v1", -1, -1));
+ cs.store(new StoredEntry("k2", "v2", -1, -1));
+ cs.store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ cs.toStream(new UnclosableObjectOutputStream(oos));
+ oos.flush();
+ oos.close();
+ out.close();
+ cs2.clear();
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(out.toByteArray()));
+ cs2.fromStream(new UnclosableObjectInputStream(ois));
+
+ Set<StoredEntry> set = cs2.loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+ @Override
+ public void testStreamingAPIReusingStreams() throws IOException, ClassNotFoundException, CacheLoaderException {
+ cs.store(new StoredEntry("k1", "v1", -1, -1));
+ cs.store(new StoredEntry("k2", "v2", -1, -1));
+ cs.store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] dummyStartBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+ byte[] dummyEndBytes = {8, 7, 6, 5, 4, 3, 2, 1};
+ out.write(dummyStartBytes);
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ cs.toStream(new UnclosableObjectOutputStream(oos));
+ oos.flush();
+ oos.close();
+ out.write(dummyEndBytes);
+ out.close();
+ cs2.clear();
+
+ // first pop the start bytes
+ byte[] dummy = new byte[8];
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ int bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i = 1; i < 9; i++) assert dummy[i - 1] == i : "Start byte stream corrupted!";
+ cs2.fromStream(new UnclosableObjectInputStream(new ObjectInputStream(in)));
+ bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i = 8; i > 0; i--) assert dummy[8 - i] == i : "Start byte stream corrupted!";
+
+ Set<StoredEntry> set = cs2.loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+}
\ No newline at end of file
15 years, 9 months
JBoss Cache SVN: r7880 - core/branches/flat/src/main/java/org/horizon/commands/remote.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-07 13:58:23 -0500 (Sat, 07 Mar 2009)
New Revision: 7880
Modified:
core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
Log:
Lousy typo
Modified: core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java 2009-03-07 18:48:08 UTC (rev 7879)
+++ core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java 2009-03-07 18:58:23 UTC (rev 7880)
@@ -57,7 +57,7 @@
if (modifications != null && modifications.size() == 1) {
this.commands = new ReplicableCommand[]{modifications.get(0)};
} else {
- this.commands = new ReplicateCommand[modifications.size()];
+ this.commands = new ReplicableCommand[modifications.size()];
int i = 0;
for (ReplicableCommand rc : modifications) commands[i++] = rc;
}
15 years, 9 months
JBoss Cache SVN: r7879 - in core/branches/flat/src: main/java/org/horizon/commands/remote and 4 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-07 13:48:08 -0500 (Sat, 07 Mar 2009)
New Revision: 7879
Added:
core/branches/flat/src/test/java/org/horizon/profiling/ProfileTestSlave.java
Modified:
core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java
core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java
core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
core/branches/flat/src/test/java/org/horizon/profiling/AbstractProfileTest.java
core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java
Log:
Replication based perf improvements
Modified: core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java 2009-03-07 13:11:36 UTC (rev 7878)
+++ core/branches/flat/src/main/java/org/horizon/commands/RPCCommand.java 2009-03-07 18:48:08 UTC (rev 7879)
@@ -2,9 +2,6 @@
import org.horizon.interceptors.InterceptorChain;
-import java.util.Collection;
-import java.util.List;
-
/**
* The RPCManager only replicates commands wrapped in an RPCCommand. As a wrapper, an RPCCommand could contain a single
* {@link org.horizon.commands.ReplicableCommand} or a List of them.
@@ -34,23 +31,9 @@
*
* @return a list of all commands.
*/
- List<ReplicableCommand> getCommands();
+ ReplicableCommand[] getCommands();
/**
- * Adds a single command to the list of commands being wrapped
- *
- * @param command command to add
- */
- void addCommand(ReplicableCommand command);
-
- /**
- * Adds a collection of commands to the list of commands being wrapped
- *
- * @param commands commands to add
- */
- void addCommands(Collection<? extends ReplicableCommand> commands);
-
- /**
* @return the name of the cache that produced this command. This will also be the name of the cache this command is
* intended for.
*/
Modified: core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java 2009-03-07 13:11:36 UTC (rev 7878)
+++ core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java 2009-03-07 18:48:08 UTC (rev 7879)
@@ -29,9 +29,7 @@
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
+import java.util.Arrays;
import java.util.List;
/**
@@ -52,20 +50,22 @@
private static final Log log = LogFactory.getLog(ReplicateCommand.class);
private static final boolean trace = log.isTraceEnabled();
- private List<ReplicableCommand> commands;
+ private ReplicableCommand[] commands;
private String cacheName;
public ReplicateCommand(List<ReplicableCommand> modifications, String cacheName) {
if (modifications != null && modifications.size() == 1) {
- this.commands = Collections.singletonList(modifications.get(0));
+ this.commands = new ReplicableCommand[]{modifications.get(0)};
} else {
- this.commands = modifications;
+ this.commands = new ReplicateCommand[modifications.size()];
+ int i = 0;
+ for (ReplicableCommand rc : modifications) commands[i++] = rc;
}
this.cacheName = cacheName;
}
public ReplicateCommand(ReplicableCommand command, String cacheName) {
- commands = Collections.singletonList(command);
+ commands = new ReplicableCommand[]{command};
this.cacheName = cacheName;
}
@@ -85,7 +85,7 @@
*/
public Object perform(InvocationContext ctx) throws Throwable {
if (isSingleCommand()) {
- return processCommand(ctx, commands.get(0));
+ return processCommand(ctx, commands[0]);
} else {
for (ReplicableCommand command : commands) processCommand(ctx, command);
return null;
@@ -134,54 +134,41 @@
return METHOD_ID;
}
- public List<ReplicableCommand> getCommands() {
+ public ReplicableCommand[] getCommands() {
return commands;
}
- public void addCommand(ReplicableCommand command) {
- if (commands == null) {
- commands = Collections.singletonList(command);
- } else {
- upgradeCommandsListIfNeeded();
- commands.add(command);
- }
- }
-
- public void addCommands(Collection<? extends ReplicableCommand> commands) {
- upgradeCommandsListIfNeeded();
- this.commands.addAll(commands);
- }
-
- private void upgradeCommandsListIfNeeded() {
- if (!(commands instanceof ArrayList)) {
- commands = new ArrayList<ReplicableCommand>(commands);
- }
- }
-
public String getCacheName() {
return cacheName;
}
public void setCacheName(String name) {
- this.cacheName = cacheName;
+ this.cacheName = name;
}
- public ReplicableCommand getSingleCommand() {
- return commands.get(0);
+ public final ReplicableCommand getSingleCommand() {
+ return commands == null ? null : commands[0];
}
public Object[] getParameters() {
- return new Object[]{cacheName, commands};
+ int numCommands = commands == null ? 0 : commands.length;
+ Object[] retval = new Object[numCommands + 2];
+ retval[0] = cacheName;
+ retval[1] = numCommands;
+ if (numCommands > 0) System.arraycopy(commands, 0, retval, 2, numCommands);
+ return retval;
}
@SuppressWarnings("unchecked")
public void setParameters(int commandId, Object[] args) {
cacheName = (String) args[0];
- commands = (List<ReplicableCommand>) args[1];
+ int numCommands = (Integer) args[1];
+ commands = new ReplicableCommand[numCommands];
+ System.arraycopy(args, 2, commands, 0, numCommands);
}
- public boolean isSingleCommand() {
- return commands != null && commands.size() == 1;
+ public final boolean isSingleCommand() {
+ return commands != null && commands.length == 1;
}
@Override
@@ -208,18 +195,13 @@
ReplicateCommand clone;
clone = new ReplicateCommand();
clone.interceptorChain = interceptorChain;
- if (commands != null) {
- if (commands.size() == 1)
- clone.commands = Collections.singletonList(commands.get(0));
- else
- clone.commands = new ArrayList<ReplicableCommand>(commands);
- }
+ if (commands != null) clone.commands = commands.clone();
return clone;
}
public boolean containsCommandType(Class<? extends ReplicableCommand> aClass) {
- if (commands.size() == 1) {
- return commands.get(0).getClass().equals(aClass);
+ if (commands.length == 1) {
+ return commands[0].getClass().equals(aClass);
} else {
for (ReplicableCommand command : getCommands()) {
if (command.getClass().equals(aClass)) return true;
@@ -231,7 +213,7 @@
@Override
public String toString() {
return "ReplicateCommand{" +
- "commands=" + commands +
+ "commands=" + (commands == null ? "null" : Arrays.toString(commands)) +
", cacheName='" + cacheName + '\'' +
'}';
}
Modified: core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java 2009-03-07 13:11:36 UTC (rev 7878)
+++ core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java 2009-03-07 18:48:08 UTC (rev 7879)
@@ -12,6 +12,7 @@
import org.horizon.logging.LogFactory;
import org.horizon.notifications.cachemanagerlistener.CacheManagerNotifier;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -22,6 +23,9 @@
*/
public class ComponentRegistry extends AbstractComponentRegistry {
+ // cached component scopes
+ static final Map<Class, Scopes> componentScopeLookup = new HashMap<Class, Scopes>();
+
GlobalComponentRegistry globalComponents;
String cacheName;
Log log = LogFactory.getLog(ComponentRegistry.class);
@@ -62,35 +66,51 @@
}
@Override
- public <T> T getComponent(Class<T> componentType, String name) {
+ public final <T> T getComponent(Class<T> componentType, String name) {
if (isGlobal(componentType)) {
return globalComponents.getComponent(componentType, name);
} else {
- return super.getComponent(componentType, name);
+ return getLocalComponent(componentType, name);
}
}
+ @SuppressWarnings("unchecked")
+ public final <T> T getLocalComponent(Class<T> componentType, String name) {
+ Component wrapper = lookupLocalComponent(componentType, name);
+ if (wrapper == null) return null;
+
+ return (T) (wrapper.instance == NULL_COMPONENT ? null : wrapper.instance);
+ }
+
+ public final <T> T getLocalComponent(Class<T> componentType) {
+ return getLocalComponent(componentType, componentType.getName());
+ }
+
@Override
- protected Map<Class, Class<? extends AbstractComponentFactory>> getDefaultFactoryMap() {
+ protected final Map<Class, Class<? extends AbstractComponentFactory>> getDefaultFactoryMap() {
// delegate to parent. No sense maintaining multiple copies of this map.
return globalComponents.getDefaultFactoryMap();
}
@Override
- protected Component lookupComponent(Class componentClass, String name) {
+ protected final Component lookupComponent(Class componentClass, String name) {
if (isGlobal(componentClass)) {
return globalComponents.lookupComponent(componentClass, name);
} else {
- return super.lookupComponent(componentClass, name);
+ return lookupLocalComponent(componentClass, name);
}
}
- public GlobalComponentRegistry getGlobalComponentRegistry() {
+ protected final Component lookupLocalComponent(Class componentClass, String name) {
+ return super.lookupComponent(componentClass, name);
+ }
+
+ public final GlobalComponentRegistry getGlobalComponentRegistry() {
return globalComponents;
}
@Override
- public void registerComponent(Object component, String name) {
+ public final void registerComponent(Object component, String name) {
if (isGlobal(component.getClass())) {
globalComponents.registerComponent(component, name);
} else {
@@ -99,7 +119,12 @@
}
private boolean isGlobal(Class clazz) {
- Scopes componentScope = ScopeDetector.detectScope(clazz);
+ Scopes componentScope = componentScopeLookup.get(clazz);
+ if (componentScope == null) {
+ componentScope = ScopeDetector.detectScope(clazz);
+ componentScopeLookup.put(clazz, componentScope);
+ }
+
return componentScope == Scopes.GLOBAL;
}
Modified: core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java 2009-03-07 13:11:36 UTC (rev 7878)
+++ core/branches/flat/src/main/java/org/horizon/factories/GlobalComponentRegistry.java 2009-03-07 18:48:08 UTC (rev 7879)
@@ -13,6 +13,8 @@
import javax.management.MBeanServerFactory;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
/**
* A global component registry where shared components are stored.
@@ -96,24 +98,22 @@
}
}
- public ComponentRegistry getNamedComponentRegistry(String name) {
- return getComponent(ComponentRegistry.class, NAMED_REGISTRY_PREFIX + name);
+ Map<String, ComponentRegistry> namedComponents = new HashMap<String, ComponentRegistry>();
+
+ public final ComponentRegistry getNamedComponentRegistry(String name) {
+ return namedComponents.get(name);
}
- public void registerNamedComponentRegistry(ComponentRegistry componentRegistry, String name) {
- registerComponent(componentRegistry, NAMED_REGISTRY_PREFIX + name);
+ public final void registerNamedComponentRegistry(ComponentRegistry componentRegistry, String name) {
+ namedComponents.put(name, componentRegistry);
}
- public void unregisterNamedComponentRegistry(String name) {
- componentLookup.remove(NAMED_REGISTRY_PREFIX + name);
+ public final void unregisterNamedComponentRegistry(String name) {
+ namedComponents.remove(name);
}
- public void rewireNamedRegistries() {
- for (String name : componentLookup.keySet()) {
- if (name.startsWith(NAMED_REGISTRY_PREFIX)) {
- Component c = componentLookup.get(name);
- ((ComponentRegistry) c.instance).rewire();
- }
- }
+ public final void rewireNamedRegistries() {
+ for (ComponentRegistry cr : namedComponents.values())
+ cr.rewire();
}
}
Modified: core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-03-07 13:11:36 UTC (rev 7878)
+++ core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-03-07 18:48:08 UTC (rev 7879)
@@ -198,9 +198,8 @@
marshallObject(le.getTransaction(), out, refMap);
marshallObject(le.getModifications(), out, refMap);
} else if (o instanceof Serializable) {
- if (trace) {
- log.trace("Warning: using object serialization for " + o.getClass());
- }
+ if (trace) log.trace("WARNING: using object serialization for [{0}]", o.getClass());
+
out.writeByte(MAGICNUMBER_SERIALIZABLE);
if (useRefs) writeReference(out, createReference(o, refMap));
out.writeObject(o);
Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-03-07 13:11:36 UTC (rev 7878)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-03-07 18:48:08 UTC (rev 7879)
@@ -47,9 +47,9 @@
throw new IllegalStateException("Cache named " + cacheName + " exists but isn't in a state to handle invocations. Its state is " + cr.getStatus());
}
- InterceptorChain ic = cr.getComponent(InterceptorChain.class);
- InvocationContextContainer icc = cr.getComponent(InvocationContextContainer.class);
- CommandsFactory commandsFactory = cr.getComponent(CommandsFactory.class);
+ InterceptorChain ic = cr.getLocalComponent(InterceptorChain.class);
+ InvocationContextContainer icc = cr.getLocalComponent(InvocationContextContainer.class);
+ CommandsFactory commandsFactory = cr.getLocalComponent(CommandsFactory.class);
cmd.setInterceptorChain(ic);
// initialize this command with components specific to the intended cache instance
Modified: core/branches/flat/src/test/java/org/horizon/profiling/AbstractProfileTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/profiling/AbstractProfileTest.java 2009-03-07 13:11:36 UTC (rev 7878)
+++ core/branches/flat/src/test/java/org/horizon/profiling/AbstractProfileTest.java 2009-03-07 18:48:08 UTC (rev 7879)
@@ -1,6 +1,7 @@
package org.horizon.profiling;
import org.horizon.config.Configuration;
+import org.horizon.config.GlobalConfiguration;
import org.horizon.manager.CacheManager;
import org.horizon.manager.DefaultCacheManager;
import org.horizon.test.SingleCacheManagerTest;
@@ -9,11 +10,18 @@
@Test(groups = "profiling", enabled = false, testName = "profiling.SingleCacheManagerTest")
public abstract class AbstractProfileTest extends SingleCacheManagerTest {
+ protected static final String LOCAL_CACHE_NAME = "local";
+ protected static final String REPL_SYNC_CACHE_NAME = "repl_sync";
+
protected CacheManager createCacheManager() throws Exception {
Configuration cfg = new Configuration();
cfg.setConcurrencyLevel(2000);
- cacheManager = new DefaultCacheManager(cfg);
- cache = cacheManager.getCache();
+ cacheManager = new DefaultCacheManager(GlobalConfiguration.getClusteredDefault());
+ cacheManager.defineCache(LOCAL_CACHE_NAME, cfg);
+ Configuration replCfg = cfg.clone();
+ replCfg.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ replCfg.setFetchInMemoryState(false);
+ cacheManager.defineCache(REPL_SYNC_CACHE_NAME, replCfg);
return cacheManager;
}
}
Modified: core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java 2009-03-07 13:11:36 UTC (rev 7878)
+++ core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java 2009-03-07 18:48:08 UTC (rev 7879)
@@ -21,7 +21,7 @@
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
*/
-@Test(groups = "profiling", enabled = true, testName = "profiling.ProfileTest")
+@Test(groups = "profiling", enabled = false, testName = "profiling.ProfileTest")
public class ProfileTest extends AbstractProfileTest {
/*
Test configuration options
@@ -35,12 +35,18 @@
private List<Object> keys = new ArrayList<Object>(MAX_OVERALL_KEYS);
- @Test(enabled = true)
+ @Test(enabled = false)
public void testLocalMode() throws Exception {
- runCompleteTest();
+ runCompleteTest(LOCAL_CACHE_NAME);
}
- private void runCompleteTest() throws Exception {
+ @Test(enabled = false)
+ public void testReplMode() throws Exception {
+ runCompleteTest(REPL_SYNC_CACHE_NAME);
+ }
+
+ private void runCompleteTest(String cacheName) throws Exception {
+ cache = cacheManager.getCache(cacheName);
init();
startup();
warmup();
Added: core/branches/flat/src/test/java/org/horizon/profiling/ProfileTestSlave.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/profiling/ProfileTestSlave.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/profiling/ProfileTestSlave.java 2009-03-07 18:48:08 UTC (rev 7879)
@@ -0,0 +1,13 @@
+package org.horizon.profiling;
+
+import org.testng.annotations.Test;
+
+@Test(groups = "profiling", enabled = false, testName = "profiling.ProfileTestSlave")
+public class ProfileTestSlave extends AbstractProfileTest {
+ @Test(enabled = true)
+ public void testReplMode() throws Exception {
+ cache = cacheManager.getCache(REPL_SYNC_CACHE_NAME);
+ System.out.println("Waiting for test completion. Hit any key when done.");
+ System.in.read();
+ }
+}
15 years, 9 months
JBoss Cache SVN: r7878 - benchmarks/benchmark-fwk/trunk/cache-products/Horizon-1.0.0/lib.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-07 08:11:36 -0500 (Sat, 07 Mar 2009)
New Revision: 7878
Modified:
benchmarks/benchmark-fwk/trunk/cache-products/Horizon-1.0.0/lib/horizon.jar
Log:
Modified: benchmarks/benchmark-fwk/trunk/cache-products/Horizon-1.0.0/lib/horizon.jar
===================================================================
(Binary files differ)
15 years, 9 months
JBoss Cache SVN: r7877 - core/branches/flat/src/main/java/org/horizon/container.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-07 08:05:59 -0500 (Sat, 07 Mar 2009)
New Revision: 7877
Modified:
core/branches/flat/src/main/java/org/horizon/container/CachedValue.java
Log:
Assertions to make sure certain methods arent called
Modified: core/branches/flat/src/main/java/org/horizon/container/CachedValue.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/CachedValue.java 2009-03-07 12:56:38 UTC (rev 7876)
+++ core/branches/flat/src/main/java/org/horizon/container/CachedValue.java 2009-03-07 13:05:59 UTC (rev 7877)
@@ -37,9 +37,6 @@
return false;
}
- public void copyForUpdate(DataContainer container, boolean writeSkewCheck) {
- }
-
public void commitUpdate(DataContainer container) {
}
@@ -55,6 +52,7 @@
}
public void setCreated(boolean created) {
+ throw new UnsupportedOperationException();
}
public final boolean isDeleted() {
@@ -62,6 +60,7 @@
}
public void setDeleted(boolean deleted) {
+ throw new UnsupportedOperationException();
}
public final boolean isValid() {
@@ -69,6 +68,7 @@
}
public void setValid(boolean valid) {
+ throw new UnsupportedOperationException();
}
public long getLifespan() {
@@ -76,5 +76,6 @@
}
public void setLifespan(long lifespan) {
+ throw new UnsupportedOperationException();
}
}
\ No newline at end of file
15 years, 9 months
JBoss Cache SVN: r7876 - in core/branches/flat/src: main/java/org/horizon/factories and 1 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-07 07:56:38 -0500 (Sat, 07 Mar 2009)
New Revision: 7876
Added:
core/branches/flat/src/main/java/org/horizon/container/UpdateableEntry.java
Modified:
core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java
core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java
core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java
core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java
Log:
MVCC fixes
Modified: core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java 2009-03-07 12:41:42 UTC (rev 7875)
+++ core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java 2009-03-07 12:56:38 UTC (rev 7876)
@@ -30,8 +30,6 @@
public interface MVCCEntry {
boolean isNullEntry();
- void copyForUpdate(DataContainer container, boolean writeSkewCheck);
-
void commitUpdate(DataContainer container);
void rollbackUpdate();
Modified: core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java 2009-03-07 12:41:42 UTC (rev 7875)
+++ core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java 2009-03-07 12:56:38 UTC (rev 7876)
@@ -32,7 +32,7 @@
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
* @since 1.0
*/
-public class ReadCommittedEntry implements MVCCEntry {
+public class ReadCommittedEntry implements UpdateableEntry {
private static final Log log = LogFactory.getLog(ReadCommittedEntry.class);
private static final boolean trace = log.isTraceEnabled();
Added: core/branches/flat/src/main/java/org/horizon/container/UpdateableEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/UpdateableEntry.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/container/UpdateableEntry.java 2009-03-07 12:56:38 UTC (rev 7876)
@@ -0,0 +1,10 @@
+package org.horizon.container;
+
+/**
+ * // TODO: Manik: Document this!
+ *
+ * @author Manik Surtani
+ */
+public interface UpdateableEntry extends MVCCEntry {
+ void copyForUpdate(DataContainer container, boolean writeSkewCheck);
+}
Modified: core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java 2009-03-07 12:41:42 UTC (rev 7875)
+++ core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java 2009-03-07 12:56:38 UTC (rev 7876)
@@ -28,6 +28,7 @@
import org.horizon.container.NullMarkerEntry;
import org.horizon.container.ReadCommittedEntry;
import org.horizon.container.RepeatableReadEntry;
+import org.horizon.container.UpdateableEntry;
import org.horizon.context.InvocationContext;
import org.horizon.factories.annotations.Inject;
import org.horizon.factories.annotations.Start;
@@ -65,7 +66,7 @@
writeSkewCheck = configuration.isWriteSkewCheck();
}
- public final MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert, long lifespan) {
+ public final UpdateableEntry createWrappedEntry(Object key, Object value, boolean isForInsert, long lifespan) {
if (value == null && !isForInsert) return useRepeatableRead ? NULL_MARKER : null;
return useRepeatableRead ? new RepeatableReadEntry(key, value, lifespan) : new ReadCommittedEntry(key, value, lifespan);
@@ -106,8 +107,18 @@
{
// acquire lock if needed
if (alreadyLocked || acquireLock(ctx, key)) {
+ UpdateableEntry ue;
+ if (mvccEntry instanceof UpdateableEntry) {
+ ue = (UpdateableEntry) mvccEntry;
+ } else {
+ // this is a read-only entry that needs to be copied to a proper read-write entry!!
+ ue = createWrappedEntry(key, mvccEntry.getValue(), false, mvccEntry.getLifespan());
+ mvccEntry = ue;
+ ctx.putLookedUpEntry(key, mvccEntry);
+ }
+
// create a copy of the underlying entry
- mvccEntry.copyForUpdate(container, writeSkewCheck);
+ ue.copyForUpdate(container, writeSkewCheck);
}
if (trace) log.trace("Exists in context.");
if (mvccEntry.isDeleted() && createIfAbsent) {
@@ -123,20 +134,22 @@
// exists in cache! Just acquire lock if needed, and wrap.
// do we need a lock?
boolean needToCopy = alreadyLocked || acquireLock(ctx, key) || ctx.hasOption(Options.SKIP_LOCKING); // even if we do not acquire a lock, if skip-locking is enabled we should copy
- mvccEntry = createWrappedEntry(key, cachedValue.getValue(), false, cachedValue.getLifespan());
- ctx.putLookedUpEntry(key, mvccEntry);
- if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
+ UpdateableEntry ue = createWrappedEntry(key, cachedValue.getValue(), false, cachedValue.getLifespan());
+ ctx.putLookedUpEntry(key, ue);
+ if (needToCopy) ue.copyForUpdate(container, writeSkewCheck);
+ mvccEntry = ue;
} else if (createIfAbsent) {
// this is the *only* point where new entries can be created!!
if (trace) log.trace("Creating new entry.");
// now to lock and create the entry. Lock first to prevent concurrent creation!
if (!alreadyLocked) acquireLock(ctx, key);
notifier.notifyCacheEntryCreated(key, true, ctx);
- mvccEntry = createWrappedEntry(key, null, true, -1);
- mvccEntry.setCreated(true);
- ctx.putLookedUpEntry(key, mvccEntry);
- mvccEntry.copyForUpdate(container, writeSkewCheck);
+ UpdateableEntry ue = createWrappedEntry(key, null, true, -1);
+ ue.setCreated(true);
+ ctx.putLookedUpEntry(key, ue);
+ ue.copyForUpdate(container, writeSkewCheck);
notifier.notifyCacheEntryCreated(key, false, ctx);
+ mvccEntry = ue;
}
}
Modified: core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java 2009-03-07 12:41:42 UTC (rev 7875)
+++ core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java 2009-03-07 12:56:38 UTC (rev 7876)
@@ -118,7 +118,7 @@
assertInCacheAndStore("k" + i, "v" + i, lifespan);
}
- cache.remove("k1", "some rubbish");
+ assert !cache.remove("k1", "some rubbish");
for (int i = 1; i < 8; i++) {
// even numbers have lifespans
15 years, 9 months
JBoss Cache SVN: r7875 - in core/branches/flat/src: test/java/org/horizon/loader and 2 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-07 07:41:42 -0500 (Sat, 07 Mar 2009)
New Revision: 7875
Added:
core/branches/flat/src/test/java/org/horizon/loader/ConcurrentLoadAndEvictTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java
core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java
Log:
Fixed load and concurrent evict bug, added test
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-03-07 12:27:52 UTC (rev 7874)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-03-07 12:41:42 UTC (rev 7875)
@@ -105,31 +105,31 @@
}
private void loadIfNeeded(InvocationContext ctx, Object key) throws Throwable {
+ // first check if the container contains the key we need. Try and load this into the context.
+ MVCCEntry e = entryFactory.wrapEntryForReading(ctx, key);
+ if (e == null || e.isNullEntry()) {
- // TODO this needs re-ordering. Checking the loader first is too expensive for cache hits if we already have this in memory.
- // It is better to check in memory first, but we must also guard against concurrent eviction and cache lookups
- // since the loader may skip loading for a key in memory but the key is evicted before it is read by the reader,
- // returning incorrect results. TODO: Write a test for this.
- // A potential solution is to test for the entry in memory, and then store it in context if available. Else, check
- // if it is in the loader.
+ // we *may* need to load this.
+ if (!loader.containsKey(key)) {
+ log.trace("No need to load. Key doesn't exist in the loader.");
+ return;
+ }
- if (!loader.containsKey(key)) {
- log.trace("No need to load. Key doesn't exist in the loader.");
- return;
- }
+ // Obtain a temporary lock to verify the key is not being concurrently added
+ boolean keyLocked = entryFactory.acquireLock(ctx, key);
- // Obtain a temporary lock to verify the key is not being concurrently added
- boolean keyLocked = entryFactory.acquireLock(ctx, key);
- if (dataContainer.containsKey(key)) {
- if (keyLocked) entryFactory.releaseLock(key);
- log.trace("No need to load. Key exists in the data container.");
- return;
+ // check again, in case there is a concurrent addition
+ if (dataContainer.containsKey(key)) {
+ if (keyLocked) entryFactory.releaseLock(key);
+ log.trace("No need to load. Key exists in the data container.");
+ return;
+ }
+
+ // Reuse the lock and create a new entry for loading
+ MVCCEntry n = entryFactory.wrapEntryForWriting(ctx, key, true, false, keyLocked);
+ n = loadEntry(ctx, key, n);
+ ctx.setContainsModifications(true);
}
-
- // Reuse the lock and create a new entry for loading
- MVCCEntry n = entryFactory.wrapEntryForWriting(ctx, key, true, false, keyLocked);
- n = loadEntry(ctx, key, n);
- ctx.setContainsModifications(true);
}
/**
Added: core/branches/flat/src/test/java/org/horizon/loader/ConcurrentLoadAndEvictTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/ConcurrentLoadAndEvictTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/ConcurrentLoadAndEvictTest.java 2009-03-07 12:41:42 UTC (rev 7875)
@@ -0,0 +1,118 @@
+package org.horizon.loader;
+
+import org.horizon.commands.read.GetKeyValueCommand;
+import org.horizon.commands.write.EvictCommand;
+import org.horizon.config.CacheLoaderManagerConfig;
+import org.horizon.config.Configuration;
+import org.horizon.config.CustomInterceptorConfig;
+import org.horizon.container.DataContainer;
+import org.horizon.context.InvocationContext;
+import org.horizon.interceptors.CacheLoaderInterceptor;
+import org.horizon.interceptors.base.CommandInterceptor;
+import org.horizon.loader.dummy.DummyInMemoryCacheStore;
+import org.horizon.manager.CacheManager;
+import org.horizon.manager.DefaultCacheManager;
+import org.horizon.test.SingleCacheManagerTest;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests a thread going past the cache loader interceptor and the interceptor deciding that loading is not necessary,
+ * then another thread rushing ahead and evicting the entry from memory.
+ *
+ * @author Manik Surtani
+ */
+@Test(groups = "functional", testName = "loader.ConcurrentLoadAndEvictTest")
+public class ConcurrentLoadAndEvictTest extends SingleCacheManagerTest {
+ SlowDownInterceptor sdi;
+
+ protected CacheManager createCacheManager() throws Exception {
+ Configuration config = new Configuration();
+ // we need a loader:
+ CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+ config.setCacheLoaderManagerConfig(clmc);
+ clmc.addCacheLoaderConfig(new DummyInMemoryCacheStore.Cfg());
+
+ // we also need a custom interceptor to intercept get() calls after the CLI, to slow it down so an evict goes
+ // through first
+
+ sdi = new SlowDownInterceptor();
+ CustomInterceptorConfig cic = new CustomInterceptorConfig(sdi);
+ cic.setAfterInterceptor(CacheLoaderInterceptor.class);
+ config.setCustomInterceptors(Collections.singletonList(cic));
+ return new DefaultCacheManager(config);
+ }
+
+ public void testEvictBeforeRead() throws CacheLoaderException, ExecutionException, InterruptedException {
+ cache = cacheManager.getCache();
+ cache.put("a", "b");
+ assert cache.get("a").equals("b");
+ CacheLoader cl = TestingUtil.getCacheLoader(cache);
+ assert cl != null;
+ StoredEntry se = cl.load("a");
+ assert se != null;
+ assert se.getValue().equals("b");
+
+ // now attempt a concurrent get and evict.
+ ExecutorService e = Executors.newFixedThreadPool(1);
+ sdi.enabled = true;
+
+ // call the get
+ Future<String> future = e.submit(new Callable<String>() {
+ public String call() throws Exception {
+ return (String) cache.get("a");
+ }
+ });
+
+ // now run the evict.
+ cache.evict("a");
+
+ // make sure the get call, which would have gone past the cache loader interceptor first, gets the correct value.
+ assert future.get().equals("b");
+
+ // disable the SlowDownInterceptor
+ sdi.enabled = false;
+
+ // and check that the key actually has been evicted
+ assert !TestingUtil.extractComponent(cache, DataContainer.class).containsKey("a");
+ }
+
+ public static class SlowDownInterceptor extends CommandInterceptor {
+ volatile boolean enabled = false;
+ CountDownLatch getLatch = new CountDownLatch(1);
+ CountDownLatch evictLatch = new CountDownLatch(1);
+
+ @Override
+ public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
+ if (enabled) {
+ getLatch.countDown();
+ if (!evictLatch.await(60000, TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Didn't see evict after 60 seconds!");
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws Throwable {
+ if (enabled) {
+ if (!getLatch.await(60000, TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Didn't see get after 60 seconds!");
+ }
+ try {
+ return invokeNextInterceptor(ctx, command);
+ } finally {
+ if (enabled) evictLatch.countDown();
+ }
+ }
+ }
+}
Modified: core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java 2009-03-07 12:27:52 UTC (rev 7874)
+++ core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java 2009-03-07 12:41:42 UTC (rev 7875)
@@ -21,7 +21,7 @@
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
*/
-@Test(groups = "profiling", enabled = false, testName = "profiling.ProfileTest")
+@Test(groups = "profiling", enabled = true, testName = "profiling.ProfileTest")
public class ProfileTest extends AbstractProfileTest {
/*
Test configuration options
@@ -35,7 +35,7 @@
private List<Object> keys = new ArrayList<Object>(MAX_OVERALL_KEYS);
- @Test(enabled = false)
+ @Test(enabled = true)
public void testLocalMode() throws Exception {
runCompleteTest();
}
Modified: core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java 2009-03-07 12:27:52 UTC (rev 7874)
+++ core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java 2009-03-07 12:41:42 UTC (rev 7875)
@@ -10,14 +10,16 @@
import org.horizon.AdvancedCache;
import org.horizon.Cache;
import org.horizon.CacheDelegate;
-import org.horizon.config.GlobalConfiguration;
import org.horizon.commands.CommandsFactory;
import org.horizon.commands.VisitableCommand;
+import org.horizon.config.GlobalConfiguration;
import org.horizon.factories.ComponentRegistry;
import org.horizon.factories.GlobalComponentRegistry;
import org.horizon.interceptors.InterceptorChain;
import org.horizon.interceptors.base.CommandInterceptor;
import org.horizon.lifecycle.ComponentStatus;
+import org.horizon.loader.CacheLoader;
+import org.horizon.loader.CacheLoaderManager;
import org.horizon.lock.LockManager;
import org.horizon.manager.CacheManager;
import org.horizon.manager.DefaultCacheManager;
@@ -31,8 +33,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Properties;
import java.util.Random;
-import java.util.Properties;
public class TestingUtil {
private static Random random = new Random();
@@ -125,11 +127,11 @@
}
/**
- * Waits for the given memebrs to be removed from the cluster. The difference between this and
- * {@link #blockUntilViewsReceived(long, org.horizon.manager.CacheManager[])} methods(s) is that it does not barf if
- * more than expected memebers is in the cluster - this is because we expect to start with a grater number fo memebers
- * than we eventually expect. It will barf though, if the number of members is not the one expected but only after the
- * timeout expieres.
+ * Waits for the given memebrs to be removed from the cluster. The difference between this and {@link
+ * #blockUntilViewsReceived(long, org.horizon.manager.CacheManager[])} methods(s) is that it does not barf if more
+ * than expected memebers is in the cluster - this is because we expect to start with a grater number fo memebers
+ * than we eventually expect. It will barf though, if the number of members is not the one expected but only after
+ * the timeout expieres.
*/
public static void blockForMemberToFail(long timeout, CacheManager... cacheManagers) {
blockUntilViewsReceived(timeout, false, cacheManagers);
@@ -623,7 +625,7 @@
Properties newTransportProps = new Properties();
newTransportProps.put(JGroupsTransport.CONFIGURATION_STRING, JGroupsConfigBuilder.getJGroupsConfig());
globalConfiguration.setTransportProperties(newTransportProps);
- return new DefaultCacheManager(globalConfiguration);
+ return new DefaultCacheManager(globalConfiguration);
}
public static CacheManager createLocalCacheManager() {
@@ -637,4 +639,13 @@
globalConfiguration.setTransportProperties(newTransportProps);
return new DefaultCacheManager(globalConfiguration);
}
+
+ public static CacheLoader getCacheLoader(Cache cache) {
+ CacheLoaderManager clm = extractComponent(cache, CacheLoaderManager.class);
+ if (clm != null && clm.isEnabled()) {
+ return clm.getCacheLoader();
+ } else {
+ return null;
+ }
+ }
}
15 years, 9 months