Author: adriancole
Date: 2009-03-01 19:43:57 -0500 (Sun, 01 Mar 2009)
New Revision: 7814
Added:
core/branches/flat/src/main/java/org/horizon/loader/bdbje/
core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java
core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java
Log:
added bdbje loader support
Added: core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java 2009-03-02
00:43:57 UTC (rev 7814)
@@ -0,0 +1,706 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.serial.SerialBinding;
+import com.sleepycat.bind.serial.StoredClassCatalog;
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.StoredMap;
+import com.sleepycat.je.*;
+import com.sleepycat.util.ExceptionUnwrapper;
+import org.horizon.Cache;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
+import org.horizon.util.ReflectionUtil;
+import org.horizon.util.concurrent.WithinThreadExecutor;
+
+import java.io.File;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * An Oracle SleepyCat JE implementation of a {@link org.horizon.loader.CacheStore}.
<p/>This implementation uses two
+ * databases <ol> <li>stored entry database:
<tt>/{location}/CacheInstance-{@link org.horizon.Cache#getName()
+ * name}</tt></li> {@link StoredEntry stored entries} are stored here, keyed
on {@link
+ * org.horizon.loader.StoredEntry#getKey()} <li>class catalog database:
<tt>/{location}/CacheInstance-{@link
+ * org.horizon.Cache#getName() name}_class_catalog</tt></li> class
descriptions are stored here for efficiency reasons.
+ * </ol>
+ * <p/>
+ * <p/>
+ * All data access is transactional. Any attempted reads to locked records will block.
The maximum duration of this is
+ * set in microseconds via the parameter {@link
org.horizon.loader.bdbje.BdbjeCacheStoreConfig#getLockAcquistionTimeoutMicros()}.
+ * Calls to {@link BdbjeCacheStore#prepare(java.util.List, javax.transaction.Transaction,
boolean) prepare} will attempt
+ * to resolve deadlocks, retrying up to {@link
org.horizon.loader.bdbje.BdbjeCacheStoreConfig#getMaxTxRetries()}
+ * attempts.
+ * <p/>
+ * Unlike the C version of SleepyCat, JE does not support MVCC or READ_COMMITTED
isolation. In other words, readers
+ * will block on any data held by a pending transaction. As such, it is best practice to
keep the duration between
+ * <code>prepare</code> and <code>commit</code> as short as
possible.
+ * <p/>
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class BdbjeCacheStore implements CacheStore {
+
+ private static final Log log = LogFactory.getLog(BdbjeCacheStore.class);
+
+ private BdbjeCacheStoreConfig cfg;
+ private Cache cache;
+ private Marshaller m;
+
+ private Environment env;
+ private String cacheDbName;
+ private String catalogDbName;
+ private StoredClassCatalog catalog;
+ private Database cacheDb;
+ private StoredMap<Object, StoredEntry> cacheMap;
+
+ private PreparableTransactionRunner transactionRunner;
+ private Map<javax.transaction.Transaction, Transaction> txnMap;
+ private ExecutorService purgerService;
+ private CurrentTransaction currentTransaction;
+
+ /**
+ * {@inheritDoc} This implementation expects config to be an instance of {@link
BdbjeCacheStoreConfig}
+ *
+ * @see BdbjeCacheStoreConfig
+ */
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ log.trace("initializing BdbjeCacheStore");
+ printLicense();
+ this.cfg = (BdbjeCacheStoreConfig) config;
+ this.cache = cache;
+ this.m = m;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@link BdbjeCacheStoreConfig}
+ */
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return BdbjeCacheStoreConfig.class;
+ }
+
+ /**
+ * {@inheritDoc} Validates configuration, configures and opens the {@link
Environment}, then {@link
+ * org.horizon.loader.bdbje.BdbjeCacheStore#openDatabases() opens the databases}.
When this is finished,
+ * transactional services are instantiated.
+ */
+ public void start() throws CacheLoaderException {
+ log.trace("starting BdbjeCacheStore");
+ checkNotOpen();
+
+ if (cache == null) {
+ throw new IllegalStateException(
+ "A non-null Cache property (CacheSPI object) is required");
+ }
+
+ String configStr = cfg.getLocation();
+
+ if (cfg.isPurgeSynchronously()) {
+ purgerService = new WithinThreadExecutor();
+ } else {
+ purgerService = Executors.newSingleThreadExecutor();
+ }
+
+ // JBCACHE-1448 db name parsing fix courtesy of Ciro Cavani
+ /* Parse config string. */
+ int offset = configStr.indexOf('#');
+ if (offset >= 0 && offset < configStr.length() - 1) {
+ cacheDbName = configStr.substring(offset + 1);
+ configStr = configStr.substring(0, offset);
+ } else {
+ cacheDbName = cache.getName();
+ if (cacheDbName == null) cacheDbName = "CacheInstance-" +
System.identityHashCode(cache);
+ }
+
+ // datafile location
+ File location = new File(configStr);
+ if (!location.exists()) {
+ boolean created = location.mkdirs();
+ if (!created) throw new CacheLoaderException("Unable to create cache loader
location " + location);
+
+ }
+ if (!location.isDirectory()) {
+ throw new CacheLoaderException("Cache loader location [" + location +
"] is not a directory!");
+ }
+
+ catalogDbName = cacheDbName + "_class_catalog";
+
+ try {
+ /* Open the environment, creating it if it doesn't exist. */
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setLockTimeout(cfg.getLockAcquistionTimeoutMicros());
+ if (log.isDebugEnabled()) {
+ envConfig.setConfigParam("je.txn.deadlockStackTrace",
"true");
+ envConfig.setConfigParam("je.txn.dumpLocks", "true");
+ }
+ log.trace("creating je environment with home dir {0}", location);
+ env = new Environment(location, envConfig);
+ log.debug("created je environment {0} for cache store {1}", env,
this);
+ /* Open cache and catalog databases. */
+ openDatabases();
+ }
+ catch (DatabaseException e) {
+ throw new CacheLoaderException("could not open the sleepycat
database", e);
+ }
+ txnMap = new ConcurrentHashMap<javax.transaction.Transaction,
Transaction>();
+ currentTransaction = CurrentTransaction.getInstance(env);
+ transactionRunner = new PreparableTransactionRunner(env);
+ transactionRunner.setMaxRetries(cfg.getMaxTxRetries());
+ log.debug("started cache store {1}", this);
+ }
+
+
+ /**
+ * Opens all databases and initializes database related information. A {@link
StoredMap} instance is {@link
+ * BdbjeCacheStore#createStoredMapViewOfDatabase(com.sleepycat.je.Database,
com.sleepycat.bind.serial.StoredClassCatalog)
+ * associated} with the stored entry and class catalog databases.
+ */
+ private void openDatabases() throws DatabaseException {
+ log.trace("opening databases");
+ /* Use a generic database config, with no duplicates allowed. */
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+
+ log.trace("opening or creating stored entry database {0}", cacheDbName);
+ cacheDb = env.openDatabase(null, cacheDbName, dbConfig);
+ log.debug("opened stored entry database {0}", cacheDbName);
+
+ log.trace("opening or creating class catalog database {0}",
catalogDbName);
+ Database catalogDb = env.openDatabase(null, catalogDbName, dbConfig);
+ catalog = new StoredClassCatalog(catalogDb);
+ log.debug("created stored class catalog from database {0}",
catalogDbName);
+
+ cacheMap = createStoredMapViewOfDatabase(cacheDb, catalog);
+ }
+
+ /**
+ * create a {@link StoredMap} persisted by the <code>database</code>
+ *
+ * @param database where entries in the StoredMap are persisted
+ * @param classCatalog location to store class descriptions
+ * @return StoredMap backed by the database and classCatalog
+ * @throws DatabaseException if the StoredMap cannot be opened.
+ */
+ private StoredMap createStoredMapViewOfDatabase(Database database, StoredClassCatalog
classCatalog) throws DatabaseException {
+ EntryBinding storedEntryKeyBinding =
+ new SerialBinding(classCatalog, Object.class);
+ EntryBinding storedEntryValueBinding =
+ new SerialBinding(classCatalog, StoredEntry.class);
+ try {
+ return new StoredMap<Object, StoredEntry>(database,
+ storedEntryKeyBinding,
storedEntryValueBinding, true);
+ } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new DatabaseException("error opening stored map", caught);
+ }
+ }
+
+ /**
+ * Closes the JE databases and their associated {@link StoredMap}, and nulls
references to them. The databases are
+ * not removed from the file system. Exceptions during close are ignored.
+ */
+ private void closeDatabases() {
+ log.trace("closing databases");
+ try {
+ cacheDb.close();
+ catalog.close();
+ } catch (DatabaseException e) {
+ log.error("Error closing databases", e);
+ }
+ cacheDb = null;
+ catalog = null;
+ cacheMap = null;
+ }
+
+
+ /**
+ * {@link org.horizon.loader.bdbje.BdbjeCacheStore#closeDatabases() Closes the JE
databases} and the {@link
+ * Environment}. The environment and databases are not removed from the file system.
Exceptions during close are
+ * ignored.
+ */
+ public void stop() throws CacheLoaderException {
+ checkOpen();
+ log.trace("stopping BdbjeCacheStore");
+ transactionRunner = null;
+ currentTransaction = null;
+ txnMap = null;
+ closeDatabases();
+ if (env != null) {
+ try {
+ env.close();
+ }
+ catch (Exception shouldNotOccur) {
+ log.warn("Unexpected exception closing cacheStore",
shouldNotOccur);
+ }
+ }
+ env = null;
+ }
+
+ /**
+ * {@inheritDoc} delegates to {@link BdbjeCacheStore#onePhaseCommit(java.util.List)},
if <code>isOnePhase</code>.
+ * Otherwise, delegates to {@link BdbjeCacheStore#prepare(java.util.List,
javax.transaction.Transaction) prepare}.
+ */
+ public void prepare(List<? extends Modification> mods,
javax.transaction.Transaction tx, boolean isOnePhase) throws CacheLoaderException {
+ if (isOnePhase) {
+ onePhaseCommit(mods);
+ } else {
+ prepare(mods, tx);
+ }
+ }
+
+ /**
+ * Perform the <code>mods</code> atomically by creating a {@link
ModificationsTransactionWorker worker} and invoking
+ * them in {@link
PreparableTransactionRunner#run(com.sleepycat.collections.TransactionWorker)}.
+ *
+ * @param mods actions to perform atomically
+ * @throws CacheLoaderException on problems during the transaction
+ */
+ protected void onePhaseCommit(List<? extends Modification> mods) throws
CacheLoaderException {
+ checkOpen();
+ checkNonNull(mods, "modifications");
+ log.debug("performing one phase transaction");
+ try {
+ transactionRunner.run(new ModificationsTransactionWorker(this, mods));
+ } catch (Exception e) {
+ throw new CacheLoaderException("Problem committing modifications: " +
mods, e);
+ }
+ }
+
+ /**
+ * Looks up the {@link Transaction SleepyCat transaction} associated with
<code>tx</code>. Creates a {@link
+ * org.horizon.loader.bdbje.ModificationsTransactionWorker} instance from
<code>mods</code>. Then prepares the
+ * transaction via {@link
PreparableTransactionRunner#prepare(com.sleepycat.collections.TransactionWorker)}.
+ *
+ * @param mods modifications to be applied
+ * @param tx transaction identifier
+ * @throws CacheLoaderException in the event of problems writing to the store
+ */
+ protected void prepare(List<? extends Modification> mods,
javax.transaction.Transaction tx) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(mods, "modifications");
+ checkNonNull(tx, "tx");
+ log.debug("preparing transaction {0}", tx);
+ try {
+ transactionRunner.prepare(new ModificationsTransactionWorker(this, mods));
+ Transaction txn = currentTransaction.getTransaction();
+ log.trace("transaction {0} == sleepycat transaction {1}", tx, txn);
+ txnMap.put(tx, txn);
+ ReflectionUtil.setValue(currentTransaction, "localTrans", new
ThreadLocal());
+ } catch (Exception e) {
+ throw new CacheLoaderException("Problem preparing transaction", e);
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * This implementation calls {@link
BdbjeCacheStore#completeTransaction(javax.transaction.Transaction, boolean)
+ * completeTransaction} with an argument of false.
+ */
+ public void rollback(javax.transaction.Transaction tx) {
+ try {
+ completeTransaction(tx, false);
+ } catch (Exception e) {
+ log.error("Error rolling back transaction", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * This implementation calls {@link
BdbjeCacheStore#completeTransaction(javax.transaction.Transaction, boolean)
+ * completeTransaction} with an argument of true.
+ */
+ public void commit(javax.transaction.Transaction tx) throws CacheLoaderException {
+ completeTransaction(tx, true);
+ }
+
+ /**
+ * Looks up the SleepyCat transaction associated with the parameter
<code>tx</code>. If there is no associated
+ * sleepycat transaction, an error is logged. If this transaction is the {@link
+ * com.sleepycat.collections.CurrentTransaction#getTransaction()} current
transaction}, it calls {@link
+ * BdbjeCacheStore#completeCurrentTransaction(boolean)} passing the argument
<code>commit</code>. Otherwise, {@link
+ * BdbjeCacheStore#completeTransaction(com.sleepycat.je.Transaction, boolean)
completeTransaction} is called, passing
+ * the SleepyCat transaction and <code>commit</code> as arguments.
+ *
+ * @param tx java transaction used to lookup a SleepyCat transaction
+ * @param commit true to commit false to abort
+ * @throws CacheLoaderException if there are problems committing or aborting the
transaction
+ */
+ protected void completeTransaction(javax.transaction.Transaction tx, boolean commit)
throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(tx, "tx");
+ Transaction txn = txnMap.remove(tx);
+ if (txn != null) {
+ log.debug("transaction {0} == sleepycat transaction {1}", tx, txn);
+ if (currentTransaction.getTransaction() == txn) {
+ completeCurrentTransaction(commit);
+ } else {
+ completeTransaction(txn, commit);
+ }
+ } else {
+ log.error("no sleepycat transaction associated transaction {0}",
tx);
+ }
+ }
+
+ /**
+ * commits or aborts the {@link Transaction}
+ *
+ * @param commit true to commit, false to abort
+ * @throws CacheLoaderException if there was a problem completing the transaction
+ */
+ private void completeTransaction(Transaction txn, boolean commit) throws
CacheLoaderException {
+ try {
+ log.debug("{0} sleepycat transaction {1}", commit ?
"committing" : "aborting", txn);
+ if (commit)
+ txn.commit();
+ else
+ txn.abort();
+ } catch (DatabaseException e) {
+ throw new CacheLoaderException("Problem completing transaction", e);
+ }
+ }
+
+ /**
+ * commits or aborts the {@link
com.sleepycat.collections.CurrentTransaction#getTransaction() current transaction}
+ *
+ * @param commit true to commit, false to abort
+ * @throws CacheLoaderException if there was a problem completing the transaction
+ */
+ private void completeCurrentTransaction(boolean commit) throws CacheLoaderException {
+ try {
+ log.debug("{0} current sleepycat transaction {1}", commit ?
"committing" : "aborting", currentTransaction.getTransaction());
+ if (commit)
+ currentTransaction.commitTransaction();
+ else
+ currentTransaction.abortTransaction();
+ } catch (DatabaseException e) {
+ throw new CacheLoaderException("Problem completing transaction", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc} This implementation delegates to {@link
BdbjeCacheStore#load(Object)}, to ensure that a response is
+ * returned only if the entry is not expired.
+ */
+ public boolean containsKey(Object key) throws CacheLoaderException {
+ return load(key) != null;
+ }
+
+ /**
+ * {@inheritDoc} This implementation delegates to {@link StoredMap#remove(Object)}
+ */
+ public boolean remove(Object key) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(key, "key");
+ log.trace("Removing key {0}", key);
+ try {
+ if (cacheMap.containsKey(key)) {
+ cacheMap.remove(key);
+ return true;
+ }
+ } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new CacheLoaderException("error removing key " + key, caught);
+ }
+ return false;
+ }
+
+ /**
+ * {@inheritDoc} This implementation removes the <code>keys</code>
atomically by creating a list of {@link Remove}
+ * modifications and passing that to {@link
BdbjeCacheStore#onePhaseCommit(java.util.List)}.
+ */
+ public void removeAll(Set<Object> keys) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(keys, "keys");
+ List<Remove> toRemove = new ArrayList<Remove>();
+ for (Object key : keys) {
+ toRemove.add(new Remove(key));
+ }
+ onePhaseCommit(toRemove);
+ }
+
+ /**
+ * {@inheritDoc} This implementation delegates to {@link StoredMap#get(Object)}. If
the object is expired, it will
+ * not be returned.
+ */
+ public StoredEntry load(Object key) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(key, "key");
+ log.trace("Loading key {0}", key);
+ try {
+ StoredEntry s = cacheMap.get(key);
+ if (s == null)
+ return null;
+ if (!s.isExpired())
+ return s;
+ else
+ cacheMap.remove(key);
+ } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new CacheLoaderException("error loading key " + key, caught);
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc} This implementation delegates to {@link StoredMap#put(Object,
Object)}
+ */
+ public void store(StoredEntry ed) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(ed, "entry");
+ log.trace("Storing entry {0}", ed);
+ try {
+ cacheMap.put(ed.getKey(), ed);
+ } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new CacheLoaderException("error storing entry " + ed, caught);
+ }
+ }
+
+ /**
+ * {@inheritDoc} This implementation delegates to {@link StoredMap#clear()}
+ */
+ public void clear() throws CacheLoaderException {
+ checkOpen();
+ log.trace("Clearing store");
+ try {cacheMap.clear(); } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new CacheLoaderException("error clearing store", caught);
+ }
+ }
+
+ /**
+ * {@inheritDoc} This implementation returns a Set from {@link StoredMap#values()}
+ */
+ public Set<StoredEntry> loadAll() throws CacheLoaderException {
+ checkOpen();
+ log.trace("Loading all entries");
+ try {
+ return new HashSet(cacheMap.values());
+ } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new CacheLoaderException("error loading all entries ", caught);
+ }
+ }
+
+ /**
+ * {@inheritDoc} This implementation reads the number of entries to load from the
stream, then begins a transaction.
+ * During that tranasaction, the cachestore is cleared and replaced with entries from
the stream. If there are any
+ * errors during the process, the entire transaction is rolled back. Deadlock
handling is not addressed, as there is
+ * no means to rollback reads from the inputstream.
+ *
+ * @see BdbjeCacheStore#toStream(java.io.ObjectOutput)
+ */
+ public void fromStream(ObjectInput ois) throws CacheLoaderException {
+ checkOpen();
+ log.warn("Clearing all entries and loading from input");
+ try {
+ long recordCount = ois.readLong();
+ log.info("reading {0} records from stream", recordCount);
+ log.info("clearing all records");
+ currentTransaction.beginTransaction(null);
+ cacheMap.clear();
+ Cursor cursor = null;
+ try {
+ cursor = cacheDb.openCursor(currentTransaction.getTransaction(), null);
+ for (int i = 0; i < recordCount; i++) {
+ byte[] keyBytes = (byte[]) ois.readObject();
+ byte[] dataBytes = (byte[]) ois.readObject();
+
+ DatabaseEntry key = new DatabaseEntry(keyBytes);
+ DatabaseEntry data = new DatabaseEntry(dataBytes);
+ cursor.put(key, data);
+ }
+ } finally {
+ if (cursor != null) cursor.close();
+ }
+ completeCurrentTransaction(true);
+ }
+ catch (Exception caught) {
+ completeCurrentTransaction(false);
+ caught = ExceptionUnwrapper.unwrap(caught);
+ CacheLoaderException cle = (caught instanceof CacheLoaderException) ?
(CacheLoaderException) caught :
+ new CacheLoaderException("Problems reading from stream",
caught);
+ throw cle;
+ }
+ }
+
+ /**
+ * @{inheritDoc} Writes the current count of cachestore entries followed by a pair of
byte[]s corresponding to the
+ * SleepyCat binary representation of {@link org.horizon.loader.StoredEntry#getKey()
key} {@link StoredEntry value}.
+ * <p/>
+ * This implementation holds a transaction open to ensure that we see no new records
added while iterating.
+ */
+ public void toStream(ObjectOutput oos) throws CacheLoaderException {
+ checkOpen();
+ log.trace("dumping current database to outputstream");
+ Cursor cursor = null;
+ try {
+ currentTransaction.beginTransaction(null);
+ long recordCount = cacheDb.count();
+ log.debug("writing {0} records to stream", recordCount);
+ oos.writeLong(recordCount);
+
+ cursor = cacheDb.openCursor(currentTransaction.getTransaction(), null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ int recordsWritten = 0;
+ while (cursor.getNext(key, data, null) ==
+ OperationStatus.SUCCESS) {
+ oos.writeObject(key.getData());
+ oos.writeObject(data.getData());
+ recordsWritten++;
+ }
+ log.debug("wrote {0} records to stream", recordsWritten);
+ if (recordsWritten != recordCount)
+ log.warn("expected to write {0} records, but wrote {1}",
recordCount, recordsWritten);
+ cursor.close();
+ cursor = null;
+ currentTransaction.commitTransaction();
+ } catch (Exception caught) {
+ try {
+ currentTransaction.abortTransaction();
+ } catch (DatabaseException e) {
+ log.error("error aborting transaction", e);
+ }
+ caught = ExceptionUnwrapper.unwrap(caught);
+ CacheLoaderException cle = (caught instanceof CacheLoaderException) ?
(CacheLoaderException) caught :
+ new CacheLoaderException("Problems writing to stream", caught);
+ throw cle;
+ }
+ finally {
+ if (cursor != null) try {
+ cursor.close();
+ } catch (DatabaseException e) {
+ throw new CacheLoaderException("Error closing cursor", e);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc} If there is a {@link
com.sleepycat.collections.CurrentTransaction#getTransaction() transaction in
+ * progress}, this method will invoke {@link #doPurgeExpired()} Otherwise, it will
purge expired entries,
+ * autocommitting each.
+ *
+ * @see org.horizon.loader.bdbje.BdbjeCacheStoreConfig#isPurgeSynchronously()
+ * @see org.horizon.loader.bdbje.BdbjeCacheStoreConfig#setMaxTxRetries(int)
+ */
+ public void purgeExpired() throws CacheLoaderException {
+ checkOpen();
+ if (currentTransaction.getTransaction() != null) {
+ doPurgeExpired();
+ } else {
+ purgerService.execute(new Runnable() {
+ public void run() {
+ try {
+ doPurgeExpired();
+ } catch (Exception e) {
+ log.error("error purging expired entries", e);
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Iterate through {@link com.sleepycat.collections.StoredMap#entrySet()} and remove,
if expired.
+ */
+ private void doPurgeExpired() {
+ log.info("purging expired from database");
+ Iterator<Map.Entry<Object, StoredEntry>> i =
cacheMap.entrySet().iterator();
+ while (i.hasNext()) {
+ if (i.next().getValue().isExpired())
+ i.remove();
+ }
+ }
+
+ /**
+ * @return the name of the SleepyCat database persisting this store
+ */
+ public String getCacheDbName() {
+ return cacheDbName;
+ }
+
+ /**
+ * @return the name of the SleepyCat database persisting the class information for
objects in this store
+ */
+ public String getCatalogDbName() {
+ return catalogDbName;
+ }
+
+ /**
+ * prints terms of use for Berkeley DB JE
+ */
+ public void printLicense() {
+ String license =
"\n*************************************************************************************\n"
+
+ "Berkeley DB Java Edition version: " +
JEVersion.CURRENT_VERSION.toString() + "\n" +
+ "JBoss Cache can use Berkeley DB Java Edition from Oracle \n" +
+ "(http://www.oracle.com/database/berkeley-db/je/index.html)\n" +
+ "for persistent, reliable and transaction-protected data
storage.\n" +
+ "If you choose to use Berkeley DB Java Edition with JBoss Cache, you
must comply with the terms\n" +
+ "of Oracle's public license, included in the file
LICENSE.txt.\n" +
+ "If you prefer not to release the source code for your own application
in order to comply\n" +
+ "with the Oracle public license, you may purchase a different license
for use of\n" +
+ "Berkeley DB Java Edition with JBoss Cache.\n" +
+ "See
http://www.oracle.com/database/berkeley-db/je/index.html for
pricing and license terms\n" +
+
"*************************************************************************************";
+ System.out.println(license);
+ }
+
+ /**
+ * Throws an exception if the environment is not open.
+ */
+ private void checkNotOpen() {
+ if (env != null) {
+ throw new IllegalStateException(
+ "Operation not allowed after calling create()");
+ }
+ }
+
+ /**
+ * Throws an exception if the environment is not open.
+ */
+ private void checkOpen() {
+ if (env == null) {
+ throw new IllegalStateException(
+ "Operation not allowed before calling create()");
+ }
+ }
+
+ /**
+ * Throws an exception if the parameter is null.
+ */
+ private void checkNonNull(Object param, String paramName) {
+ if (param == null) {
+ throw new NullPointerException(
+ "Parameter must not be null: " + paramName);
+ }
+ }
+
+}
Added:
core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java 2009-03-02
00:43:57 UTC (rev 7814)
@@ -0,0 +1,66 @@
+package org.horizon.loader.bdbje;
+
+import org.horizon.loader.AbstractCacheLoaderConfig;
+
+/**
+ * Configures {@link org.horizon.loader.bdbje.BdbjeCacheStore}. This allows you to tune
a number of characteristics of
+ * the {@link BdbjeCacheStore}.
+ * <p/>
+ * <ul> <li><tt>location</tt> - a location on disk where the
store can write databases. This defaults to
+ * <tt>${java.io.tmpdir}</tt></li>
<li><tt>purgeSynchronously</tt> - whether {@link
+ * org.horizon.loader.CacheStore#purgeExpired()} calls happen synchronously or not. By
default, this is set to
+ * <tt>false</tt>.</li>
<li><tt>lockAcquistionTimeoutMicros</tt> - the length of time, in
microseconds, to wait for
+ * locks before timing out and throwing an exception. By default, this is set to
<tt>60000000</tt>.</li>
+ * <li><tt>maxTxRetries</tt> - the number of times transaction prepares
will attempt to resolve a deadlock before
+ * throwing an exception. By default, this is set to <tt>5</tt>.</li>
</ul>
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class BdbjeCacheStoreConfig extends AbstractCacheLoaderConfig {
+ private String location = System.getProperty("java.io.tmpdir");
+ private boolean purgeSynchronously;
+ private long lockAcquistionTimeoutMicros = 60 * 1000 * 1000;
+ private int maxTxRetries = 5;
+
+
+ public BdbjeCacheStoreConfig() {
+ setClassName(BdbjeCacheStore.class.getName());
+ }
+
+ public int getMaxTxRetries() {
+ return maxTxRetries;
+ }
+
+ public void setMaxTxRetries(int maxTxRetries) {
+ this.maxTxRetries = maxTxRetries;
+ }
+
+
+ public long getLockAcquistionTimeoutMicros() {
+ return lockAcquistionTimeoutMicros;
+ }
+
+ public void setLockAcquistionTimeoutMicros(long lockAcquistionTimeoutMicros) {
+ this.lockAcquistionTimeoutMicros = lockAcquistionTimeoutMicros;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public void setLocation(String location) {
+ testImmutability("location");
+ this.location = location;
+ }
+
+ public boolean isPurgeSynchronously() {
+ return purgeSynchronously;
+ }
+
+ public void setPurgeSynchronously(boolean purgeSynchronously) {
+ this.purgeSynchronously = purgeSynchronously;
+ }
+
+}
Added:
core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java 2009-03-02
00:43:57 UTC (rev 7814)
@@ -0,0 +1,60 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.collections.TransactionWorker;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+
+import java.util.List;
+
+/**
+ * Adapter that allows a list of {@link Modification}s to be performed atomically via
{@link
+ * com.sleepycat.collections.TransactionRunner}.
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class ModificationsTransactionWorker implements TransactionWorker {
+ private List<? extends Modification> mods;
+ private CacheStore cs;
+
+ /**
+ * Associates {@link Modification}s that will be applied to the supplied {@link
CacheStore}
+ *
+ * @param store what to affect
+ * @param mods actions to take
+ */
+ public ModificationsTransactionWorker(CacheStore store, List<? extends
Modification> mods) {
+ this.cs = store;
+ this.mods = mods;
+ }
+
+ /**
+ * {@inheritDoc} This implementation iterates through a list of work represented by
{@link Modification} objects and
+ * executes it against the {@link CacheStore}.<p/> Current commands supported
are: <ul> <li>STORE</li> <li>CLEAR</li>
+ * <li>REMOVE</li> <li>PURGE_EXPIRED</li> </ul>
+ */
+ public void doWork() throws Exception {
+ for (Modification modification : mods)
+ switch (modification.getType()) {
+ case STORE:
+ Store s = (Store) modification;
+ cs.store(s.getStoredEntry());
+ break;
+ case CLEAR:
+ cs.clear();
+ break;
+ case REMOVE:
+ Remove r = (Remove) modification;
+ cs.remove(r.getKey());
+ break;
+ case PURGE_EXPIRED:
+ cs.purgeExpired();
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown modification type "
+ modification.getType());
+ }
+ }
+}
Added:
core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java 2009-03-02
00:43:57 UTC (rev 7814)
@@ -0,0 +1,95 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.TransactionRunner;
+import com.sleepycat.collections.TransactionWorker;
+import com.sleepycat.compat.DbCompat;
+import com.sleepycat.je.DeadlockException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import com.sleepycat.util.ExceptionUnwrapper;
+
+/**
+ * Adapted version of {@link TransactionRunner}, which allows us to prepare a transaction
without committing it.<p/> The
+ * transaction prepared is accessible via {@link
com.sleepycat.collections.CurrentTransaction#getTransaction()}
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class PreparableTransactionRunner extends TransactionRunner {
+ CurrentTransaction currentTxn;
+
+ /**
+ * Delegates to the {@link
TransactionRunner#TransactionRunner(com.sleepycat.je.Environment, int,
+ * com.sleepycat.je.TransactionConfig) superclass} and caches a current reference to
{@link CurrentTransaction}.
+ *
+ * @see TransactionRunner#TransactionRunner(com.sleepycat.je.Environment, int,
com.sleepycat.je.TransactionConfig)
+ */
+ public PreparableTransactionRunner(Environment env, int maxRetries, TransactionConfig
config) {
+ super(env, maxRetries, config);
+ this.currentTxn = CurrentTransaction.getInstance(env);
+ }
+
+ /**
+ * Delegates to the {@link
TransactionRunner#TransactionRunner(com.sleepycat.je.Environment) superclass} and caches
+ * a current reference to {@link CurrentTransaction}.
+ *
+ * @see TransactionRunner#TransactionRunner(com.sleepycat.je.Environment)
+ */
+ public PreparableTransactionRunner(Environment env) {
+ super(env);
+ this.currentTxn = CurrentTransaction.getInstance(env);
+ }
+
+ /**
+ * Same behaviour as {@link
TransactionRunner#run(com.sleepycat.collections.TransactionWorker) run}, except that the
+ * transaction is not committed on success.
+ *
+ * @see TransactionRunner#run(com.sleepycat.collections.TransactionWorker)
+ */
+ public void prepare(TransactionWorker worker) throws Exception {
+ for (int currentTries = 0; ; currentTries++) {
+ Transaction txn = null;
+ try {
+ txn = currentTxn.beginTransaction(getTransactionConfig());
+ worker.doWork();
+ return;
+ } catch (Throwable caught) {
+ currentTries = abortOverflowingCurrentTriesOnError(txn, currentTries);
+ caught = ExceptionUnwrapper.unwrapAny(caught);
+ rethrowIfNotDeadLock(caught);
+ if (currentTries >= getMaxRetries())
+ throw (DeadlockException) caught;
+ }
+ }
+ }
+
+ int abortOverflowingCurrentTriesOnError(Transaction toAbort, int currentTries) {
+ if (toAbort != null && toAbort == currentTxn.getTransaction()) {
+ try {
+ currentTxn.abortTransaction();
+ } catch (Throwable problemAborting) {
+ /* superclass prints to stderr, so we will also */
+ if (DbCompat.TRANSACTION_RUNNER_PRINT_STACK_TRACES) {
+ problemAborting.printStackTrace();
+ }
+ /* Force the original exception to be thrown. */
+ return Integer.MAX_VALUE;
+ }
+ }
+ return currentTries;
+ }
+
+ void rethrowIfNotDeadLock(Throwable caught) throws Exception {
+ if (!(caught instanceof DeadlockException)) {
+ if (caught instanceof Exception) {
+ throw (Exception) caught;
+ } else {
+ throw (Error) caught;
+ }
+ }
+ }
+
+}
Added:
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java 2009-03-02
00:43:57 UTC (rev 7814)
@@ -0,0 +1,74 @@
+package org.horizon.loader.bdbje;
+
+import org.horizon.loader.CacheLoaderException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests that cover {@link BdbjeCacheStoreConfig }
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName =
"loader.bdbje.BdbjeCacheStoreConfigTest")
+public class BdbjeCacheStoreConfigTest {
+
+ private BdbjeCacheStoreConfig config;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ config = new BdbjeCacheStoreConfig();
+ }
+
+ @AfterMethod
+ public void tearDown() throws CacheLoaderException {
+ config = null;
+ }
+
+
+ @Test
+ public void testGetClassNameDefault() {
+ assert config.getClassName().equals(BdbjeCacheStore.class.getName());
+ }
+
+ @Test
+ public void testgetMaxTxRetries() {
+ assert config.getMaxTxRetries() == 5;
+ }
+
+ @Test
+ public void testSetMaxTxRetries() {
+ config.setMaxTxRetries(1);
+ assert config.getMaxTxRetries() == 1;
+ }
+
+ @Test
+ public void testGetLockAcquistionTimeoutMicros() {
+ assert config.getLockAcquistionTimeoutMicros() == 60 * 1000 * 1000;
+ }
+
+ @Test
+ public void testSetLockAcquistionTimeoutMicros() {
+ config.setLockAcquistionTimeoutMicros(1);
+ assert config.getLockAcquistionTimeoutMicros() == 1;
+ }
+
+ @Test
+ public void testSetLocation() {
+ config.setLocation("foo");
+ assert config.getLocation().equals("foo");
+ }
+
+ @Test
+ public void testIsPurgeSynchronously() {
+ assert !config.isPurgeSynchronously();
+ }
+
+ @Test
+ public void testSetPurgeSynchronously() {
+ config.setPurgeSynchronously(true);
+ assert config.isPurgeSynchronously();
+ }
+}
Added:
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java 2009-03-02
00:43:57 UTC (rev 7814)
@@ -0,0 +1,40 @@
+package org.horizon.loader.bdbje;
+
+import org.horizon.loader.BaseCacheStoreTest;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheStore;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+/**
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName =
"loader.bdbje.BdbjeCacheStoreIntegrationTest")
+public class BdbjeCacheStoreIntegrationTest extends BaseCacheStoreTest {
+ protected CacheStore createCacheStore() throws CacheLoaderException {
+ CacheStore cs = new BdbjeCacheStore();
+ String tmpDir = TestingUtil.TEST_FILES;
+ String tmpCLLoc = tmpDir + "/Horizon-BdbjeCacheStoreIntegrationTest";
+ TestingUtil.recursiveFileRemove(tmpCLLoc);
+
+ BdbjeCacheStoreConfig cfg = new BdbjeCacheStoreConfig();
+ cfg.setLocation(tmpCLLoc);
+ cfg.setPurgeSynchronously(true);
+ cs.init(cfg, getCache(), getMarshaller());
+ cs.start();
+ return cs;
+ }
+
+ @Override
+ public void testTwoPhaseCommitReadCommitted() throws CacheLoaderException {
+ // this depends on READ_COMMTTED, which is not supported on sleepycat
+ }
+
+ @Override
+ public void testRollbackReadCommitted() throws CacheLoaderException {
+ // this depends on READ_COMMTTED, which is not supported on sleepycat
+ }
+
+}
Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java 2009-03-02
00:43:57 UTC (rev 7814)
@@ -0,0 +1,733 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.serial.SerialBinding;
+import com.sleepycat.bind.serial.StoredClassCatalog;
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.StoredMap;
+import com.sleepycat.collections.TransactionRunner;
+import com.sleepycat.collections.TransactionWorker;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.OperationStatus;
+import org.easymock.EasyMock;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Clear;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Learning tests for SleepyCat JE. Behaviour here is used in BdbjeCacheLoader. When
there are upgrades to bdbje, this
+ * test may warrant updating.
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName =
"loader.bdbje.BdbjeLearningTest")
+public class BdbjeLearningTest {
+ String dbHome = TestingUtil.TEST_FILES + "/Horizon-BdbjeLearningTest";
+ Environment env;
+
+ private static final String CLASS_CATALOG = "java_class_catalog";
+ private StoredClassCatalog javaCatalog;
+
+ private static final String STORED_ENTRIES = "storedEntriesDb";
+ private Database storedEntriesDb;
+ private StoredMap<Object, StoredEntry> cacheMap;
+
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ new File(dbHome).mkdirs();
+ System.out.println("Opening environment in: " + dbHome);
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setTransactional(true);
+ envConfig.setAllowCreate(true);
+
+ env = new Environment(new File(dbHome), envConfig);
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+
+ Database catalogDb = env.openDatabase(null, CLASS_CATALOG, dbConfig);
+
+ javaCatalog = new StoredClassCatalog(catalogDb);
+
+ EntryBinding storedEntryKeyBinding =
+ new SerialBinding(javaCatalog, Object.class);
+ EntryBinding storedEntryValueBinding =
+ new SerialBinding(javaCatalog, StoredEntry.class);
+
+ storedEntriesDb = env.openDatabase(null, STORED_ENTRIES, dbConfig);
+
+ cacheMap =
+ new StoredMap<Object, StoredEntry>(storedEntriesDb,
+ storedEntryKeyBinding,
storedEntryValueBinding, true);
+
+
+ }
+
+ public void testTransactionWorker() throws Exception {
+ TransactionRunner runner = new TransactionRunner(env);
+ runner.run(new PopulateDatabase());
+ runner.run(new PrintDatabase());
+
+ }
+
+
+ private class PopulateDatabase implements TransactionWorker {
+ public void doWork()
+ throws Exception {
+ }
+ }
+
+ private class PrintDatabase implements TransactionWorker {
+ public void doWork()
+ throws Exception {
+ }
+ }
+
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ storedEntriesDb.close();
+ javaCatalog.close();
+ env.close();
+
+ TestingUtil.recursiveFileRemove(dbHome);
+ }
+
+
+ private void store(StoredEntry se) {
+ cacheMap.put(se.getKey(), se);
+ }
+
+
+ private StoredEntry load(Object key) {
+ StoredEntry s = cacheMap.get(key);
+ if (s == null)
+ return null;
+ if (!s.isExpired())
+ return s;
+ else
+ cacheMap.remove(key);
+ return null;
+ }
+
+ private Set loadAll() {
+ return new HashSet(cacheMap.values());
+ }
+
+ private void purgeExpired() {
+ Iterator<Map.Entry<Object, StoredEntry>> i =
cacheMap.entrySet().iterator();
+ while (i.hasNext()) {
+ if (i.next().getValue().isExpired())
+ i.remove();
+ }
+ }
+
+ private static final Log log = LogFactory.getLog(BdbjeLearningTest.class);
+
+ private void toStream(OutputStream outputStream) throws CacheLoaderException {
+ ObjectOutputStream oos = null;
+ Cursor cursor = null;
+
+ try {
+ oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream)
outputStream :
+ new ObjectOutputStream(outputStream);
+ long recordCount = storedEntriesDb.count();
+ log.trace("writing {0} records to stream", recordCount);
+ oos.writeLong(recordCount);
+
+ cursor = storedEntriesDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ while (cursor.getNext(key, data, null) ==
+ OperationStatus.SUCCESS) {
+ oos.writeObject(key.getData());
+ oos.writeObject(data.getData());
+ }
+ } catch (IOException e) {
+ throw new CacheLoaderException("Error writing to object stream", e);
+ } catch (DatabaseException e) {
+ throw new CacheLoaderException("Error accessing database", e);
+ }
+ finally {
+ if (cursor != null) try {
+ cursor.close();
+ } catch (DatabaseException e) {
+ throw new CacheLoaderException("Error closing cursor", e);
+ }
+ }
+
+ }
+
+ private void fromStream(InputStream inputStream) throws CacheLoaderException {
+ ObjectInputStream ois = null;
+ try {
+ ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream)
inputStream :
+ new ObjectInputStream(inputStream);
+ long recordCount = ois.readLong();
+ log.info("reading {0} records from stream", recordCount);
+ log.info("clearing all records");
+ cacheMap.clear();
+ Cursor cursor = null;
+ com.sleepycat.je.Transaction txn = env.beginTransaction(null, null);
+ try {
+ cursor = storedEntriesDb.openCursor(txn, null);
+ for (int i = 0; i < recordCount; i++) {
+ byte[] keyBytes = (byte[]) ois.readObject();
+ byte[] dataBytes = (byte[]) ois.readObject();
+
+ DatabaseEntry key = new DatabaseEntry(keyBytes);
+ DatabaseEntry data = new DatabaseEntry(dataBytes);
+ cursor.put(key, data);
+ }
+ cursor.close();
+ cursor = null;
+ txn.commit();
+ } finally {
+ if (cursor != null) cursor.close();
+ }
+
+ }
+ catch (Exception e) {
+ CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
+ new CacheLoaderException("Problems reading from stream", e);
+ throw cle;
+ }
+ }
+
+ class StoreTransactionWorker implements TransactionWorker {
+ StoreTransactionWorker(StoredEntry entry) {
+ this.entry = entry;
+ }
+
+ private StoredEntry entry;
+
+ public void doWork() throws Exception {
+ store(entry);
+ }
+ }
+
+ class ClearTransactionWorker implements TransactionWorker {
+
+ public void doWork() throws Exception {
+ cacheMap.clear();
+ }
+ }
+
+ class RemoveTransactionWorker implements TransactionWorker {
+ RemoveTransactionWorker(Object key) {
+ this.key = key;
+ }
+
+ Object key;
+
+ public void doWork() throws Exception {
+ cacheMap.remove(key);
+ }
+ }
+
+ class PurgeExpiredTransactionWorker implements TransactionWorker {
+ public void doWork() throws Exception {
+ purgeExpired();
+ }
+ }
+
+ class ModificationsTransactionWorker implements TransactionWorker {
+ private List<? extends Modification> mods;
+
+ ModificationsTransactionWorker(List<? extends Modification> mods) {
+ this.mods = mods;
+ }
+
+ public void doWork() throws Exception {
+ for (Modification modification : mods)
+ switch (modification.getType()) {
+ case STORE:
+ Store s = (Store) modification;
+ store(s.getStoredEntry());
+ break;
+ case CLEAR:
+ cacheMap.clear();
+ break;
+ case REMOVE:
+ Remove r = (Remove) modification;
+ cacheMap.remove(r.getKey());
+ break;
+ case PURGE_EXPIRED:
+ purgeExpired();
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown modification type
" + modification.getType());
+ }
+ }
+ }
+
+
+ private void prepare(List<Modification> mods, Transaction tx, boolean
isOnePhase) throws CacheLoaderException {
+ if (isOnePhase) {
+ TransactionRunner runner = new TransactionRunner(env);
+ try {
+ runner.run(new ModificationsTransactionWorker(mods));
+ } catch (Exception e) {
+ e.printStackTrace(); // TODO: Manik: Customise this generated block
+ }
+ } else {
+ PreparableTransactionRunner runner = new PreparableTransactionRunner(env);
+ com.sleepycat.je.Transaction txn = null;
+ try {
+ runner.prepare(new ModificationsTransactionWorker(mods));
+ txn = CurrentTransaction.getInstance(env).getTransaction();
+ txnMap.put(tx, txn);
+ } catch (Exception e) {
+ e.printStackTrace(); // TODO: Manik: Customise this generated block
+ }
+ }
+
+ }
+
+ Map<Transaction, com.sleepycat.je.Transaction> txnMap = new
HashMap<Transaction, com.sleepycat.je.Transaction>();
+
+ private void commit(Transaction tx) {
+ com.sleepycat.je.Transaction txn = txnMap.remove(tx);
+ CurrentTransaction currentTransaction = CurrentTransaction.getInstance(env);
+ if (txn != null) {
+ if (currentTransaction.getTransaction() == txn) {
+ try {
+ currentTransaction.commitTransaction();
+ } catch (DatabaseException e) {
+ e.printStackTrace(); // TODO: Manik: Customise this generated block
+ }
+ } else {
+ log.error("Transactions must be committed on the same thread");
+ }
+ }
+ }
+
+ private void rollback(Transaction tx) {
+ com.sleepycat.je.Transaction txn = txnMap.remove(tx);
+ CurrentTransaction currentTransaction = CurrentTransaction.getInstance(env);
+ if (txn != null) {
+ if (currentTransaction.getTransaction() == txn) {
+ try {
+ currentTransaction.abortTransaction();
+ } catch (DatabaseException e) {
+ e.printStackTrace(); // TODO: Manik: Customise this generated block
+ }
+ } else {
+ log.error("Transactions must be committed on the same thread");
+ }
+ }
+ }
+
+ public void testLoadAndStore() throws InterruptedException, CacheLoaderException {
+ assert !cacheMap.containsKey("k");
+ StoredEntry se = new StoredEntry("k", "v", -1, -1);
+ store(se);
+
+ assert load("k").getValue().equals("v");
+ assert load("k").getLifespan() == -1;
+ assert !load("k").isExpired();
+ assert cacheMap.containsKey("k");
+
+ long now = System.currentTimeMillis();
+ long lifespan = 120000;
+ se = new StoredEntry("k", "v", now, now + lifespan);
+ store(se);
+
+ assert load("k").getValue().equals("v");
+ assert load("k").getLifespan() == lifespan;
+ assert !load("k").isExpired();
+ assert cacheMap.containsKey("k");
+
+ now = System.currentTimeMillis();
+ lifespan = 1;
+ se = new StoredEntry("k", "v", now, now + lifespan);
+ store(se);
+ Thread.sleep(100);
+ assert se.isExpired();
+ assert load("k") == null;
+ assert !cacheMap.containsKey("k");
+ }
+
+
+ public void testOnePhaseCommit() throws CacheLoaderException {
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ prepare(mods, tx, true);
+
+ Set s = loadAll();
+
+ assert load("k2").getValue().equals("v2");
+ assert !cacheMap.containsKey("k1");
+
+ cacheMap.clear();
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ prepare(mods, tx, true);
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert cacheMap.containsKey("k3");
+ }
+
+
+ public void testTwoPhaseCommit() throws Throwable {
+ final List<Throwable> throwables = new ArrayList<Throwable>();
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ prepare(mods, tx, false);
+
+
+ Thread gets1 = new Thread(
+ new Runnable() {
+ public void run() {
+ try {
+ assert load("k2").getValue().equals("v2");
+ assert !cacheMap.containsKey("k1");
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ }
+ }
+ );
+
+ gets1.start();
+ commit(tx);
+
+ gets1.join();
+
+ if (!throwables.isEmpty()) throw throwables.get(0);
+
+
+ cacheMap.clear();
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ prepare(mods, tx, false);
+
+ Thread gets2 = new Thread(
+ new Runnable() {
+ public void run() {
+ try {
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert cacheMap.containsKey("k3");
+
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ }
+ }
+ );
+
+ gets2.start();
+
+
+ commit(tx);
+ gets2.join();
+
+ if (!throwables.isEmpty()) throw throwables.get(0);
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert cacheMap.containsKey("k3");
+ }
+
+
+ public void testRollback() throws Throwable {
+
+ store(new StoredEntry("old", "old", -1, -1));
+
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ mods.add(new Remove("old"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ prepare(mods, tx, false);
+
+// final List<Throwable> throwables = new ArrayList<Throwable>();
+//
+// Thread gets1 = new Thread(
+// new Runnable() {
+// public void run() {
+// try {
+// assert !cacheMap.containsKey("k1");
+// assert !cacheMap.containsKey("k2");
+// assert cacheMap.containsKey("old");
+// } catch (Throwable e) {
+// throwables.add(e);
+// }
+// }
+// }
+// );
+//
+// gets1.start();
+// gets1.join();
+//
+// if (!throwables.isEmpty()) throw throwables.get(0);
+
+ rollback(tx);
+
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert cacheMap.containsKey("old");
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ prepare(mods, tx, false);
+
+// Thread gets2 = new Thread(
+// new Runnable() {
+// public void run() {
+// try {
+// assert !cacheMap.containsKey("k1");
+// assert !cacheMap.containsKey("k2");
+// assert !cacheMap.containsKey("k3");
+// } catch (Throwable e) {
+// throwables.add(e);
+// }
+// }
+// }
+// );
+//
+// gets2.start();
+// gets2.join();
+//
+// if (!throwables.isEmpty()) throw throwables.get(0);
+
+
+ rollback(tx);
+
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert !cacheMap.containsKey("k3");
+ assert cacheMap.containsKey("old");
+ }
+
+
+ public void testCommitAndRollbackWithoutPrepare() throws CacheLoaderException {
+ store(new StoredEntry("old", "old", -1, -1));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ commit(tx);
+ store(new StoredEntry("old", "old", -1, -1));
+ rollback(tx);
+
+ assert cacheMap.containsKey("old");
+ }
+
+ public void testPreload() throws CacheLoaderException {
+ store(new StoredEntry("k1", "v1", -1, -1));
+ store(new StoredEntry("k2", "v2", -1, -1));
+ store(new StoredEntry("k3", "v3", -1, -1));
+
+ Set<StoredEntry> set = loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+ public void testPurgeExpired() throws Exception {
+ long now = System.currentTimeMillis();
+ long lifespan = 1000;
+ store(new StoredEntry("k1", "v1", now, now + lifespan));
+ store(new StoredEntry("k2", "v2", now, now + lifespan));
+ store(new StoredEntry("k3", "v3", now, now + lifespan));
+ assert cacheMap.containsKey("k1");
+ assert cacheMap.containsKey("k2");
+ assert cacheMap.containsKey("k3");
+ Thread.sleep(lifespan + 100);
+ purgeExpired();
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert !cacheMap.containsKey("k3");
+ }
+
+
+ public void testStreamingAPI() throws IOException, ClassNotFoundException,
CacheLoaderException {
+ store(new StoredEntry("k1", "v1", -1, -1));
+ store(new StoredEntry("k2", "v2", -1, -1));
+ store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ toStream(out);
+ out.close();
+ cacheMap.clear();
+ fromStream(new ByteArrayInputStream(out.toByteArray()));
+
+ Set<StoredEntry> set = loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+
+ public void testStreamingAPIReusingStreams() throws IOException,
ClassNotFoundException, CacheLoaderException {
+ store(new StoredEntry("k1", "v1", -1, -1));
+ store(new StoredEntry("k2", "v2", -1, -1));
+ store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] dummyStartBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+ byte[] dummyEndBytes = {8, 7, 6, 5, 4, 3, 2, 1};
+ out.write(dummyStartBytes);
+ toStream(out);
+ out.write(dummyEndBytes);
+ out.close();
+ cacheMap.clear();
+
+ // first pop the start bytes
+ byte[] dummy = new byte[8];
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ int bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i = 1; i < 9; i++) assert dummy[i - 1] == i : "Start byte stream
corrupted!";
+ fromStream(in);
+ bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i = 8; i > 0; i--) assert dummy[8 - i] == i : "Start byte stream
corrupted!";
+
+ Set<StoredEntry> set = loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+ public void testConcurrency() throws Throwable {
+ int numThreads = 3;
+ final int loops = 500;
+ final String[] keys = new String[10];
+ final String[] values = new String[10];
+ for (int i = 0; i < 10; i++) keys[i] = "k" + i;
+ for (int i = 0; i < 10; i++) values[i] = "v" + i;
+
+
+ final Random r = new Random();
+ final List<Throwable> throwables = new LinkedList<Throwable>();
+
+ final Runnable store = new Runnable() {
+ public void run() {
+ try {
+ int randomInt = r.nextInt(10);
+ store(new StoredEntry(keys[randomInt], values[randomInt]));
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ }
+ };
+
+ final Runnable remove = new Runnable() {
+ public void run() {
+ try {
+ cacheMap.remove(keys[r.nextInt(10)]);
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ }
+ };
+
+ final Runnable get = new Runnable() {
+ public void run() {
+ try {
+ int randomInt = r.nextInt(10);
+ StoredEntry se = load(keys[randomInt]);
+ assert se == null || se.getValue().equals(values[randomInt]);
+ loadAll();
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ }
+ };
+
+ Thread[] threads = new Thread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ threads[i] = new Thread(getClass().getSimpleName() + "-" + i) {
+ public void run() {
+ for (int i = 0; i < loops; i++) {
+ store.run();
+ remove.run();
+ get.run();
+ }
+ }
+ };
+ }
+
+ for (Thread t : threads) t.start();
+ for (Thread t : threads) t.join();
+
+ if (!throwables.isEmpty()) throw throwables.get(0);
+ }
+
+
+}
+
Added:
core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java 2009-03-02
00:43:57 UTC (rev 7814)
@@ -0,0 +1,111 @@
+package org.horizon.loader.bdbje;
+
+import static org.easymock.classextension.EasyMock.*;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Clear;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.PurgeExpired;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+
+/**
+ * Unit tests that cover {@link ModificationsTransactionWorker }
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName =
"loader.bdbje.ModificationsTransactionWorkerTest")
+public class ModificationsTransactionWorkerTest {
+
+ @Test
+ public void testDoWorkOnUnsupportedModification() {
+ //TODO: we currently support all modifications...
+ }
+
+ @Test
+ public void testDoWorkOnStore() throws Exception {
+ CacheStore cs = createMock(CacheStore.class);
+ Store store = createMock(Store.class);
+ StoredEntry entry = new StoredEntry("1", "2");
+ expect(store.getType()).andReturn(Modification.Type.STORE);
+ expect(store.getStoredEntry()).andReturn(entry);
+ cs.store(entry);
+ replay(cs);
+ replay(store);
+
+ ModificationsTransactionWorker worker =
+ new ModificationsTransactionWorker(cs,
+ Collections.singletonList(store));
+ worker.doWork();
+ verify(cs);
+ verify(store);
+
+ }
+
+ @Test
+ public void testDoWorkOnRemove() throws Exception {
+ CacheStore cs = createMock(CacheStore.class);
+ Remove store = createMock(Remove.class);
+ expect(store.getType()).andReturn(Modification.Type.REMOVE);
+ expect(store.getKey()).andReturn("1");
+ expect(cs.remove("1")).andReturn(true);
+ replay(cs);
+ replay(store);
+
+ ModificationsTransactionWorker worker =
+ new ModificationsTransactionWorker(cs,
+ Collections.singletonList(store));
+ worker.doWork();
+ verify(cs);
+ verify(store);
+
+ }
+
+ @Test
+ public void testDoWorkOnClear() throws Exception {
+ CacheStore cs = createMock(CacheStore.class);
+ Clear clear = createMock(Clear.class);
+ expect(clear.getType()).andReturn(Modification.Type.CLEAR);
+ cs.clear();
+ replay(cs);
+ replay(clear);
+
+ ModificationsTransactionWorker worker =
+ new ModificationsTransactionWorker(cs,
+ Collections.singletonList(clear));
+ worker.doWork();
+ verify(cs);
+ verify(clear);
+ }
+
+ @Test
+ public void testDoWorkOnPurgeExpired() throws Exception {
+ CacheStore cs = createMock(CacheStore.class);
+ PurgeExpired purge = createMock(PurgeExpired.class);
+ expect(purge.getType()).andReturn(Modification.Type.PURGE_EXPIRED);
+ cs.purgeExpired();
+ replay(cs);
+ replay(purge);
+
+ ModificationsTransactionWorker worker =
+ new ModificationsTransactionWorker(cs,
+ Collections.singletonList(purge));
+ worker.doWork();
+ verify(cs);
+ verify(purge);
+ }
+
+
+// case REMOVE:
+// Remove r = (Remove) modification;
+// cs.remove(r.getKey());
+// break;
+// default:
+// throw new IllegalArgumentException("Unknown modification type " +
modification.getType());
+
+}
Added:
core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java 2009-03-02
00:43:57 UTC (rev 7814)
@@ -0,0 +1,239 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.TransactionWorker;
+import com.sleepycat.je.DeadlockException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.classextension.EasyMock.*;
+import org.horizon.loader.CacheLoaderException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests that cover {@link PreparableTransactionRunner }
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName =
"loader.bdbje.PreparableTransactionRunnerTest")
+public class PreparableTransactionRunnerTest {
+ PreparableTransactionRunner runner;
+ Environment env;
+ EnvironmentConfig config;
+ TransactionWorker worker;
+ Transaction transaction;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ config = createMock(EnvironmentConfig.class);
+ expect(config.getTransactional()).andReturn(true);
+ expect(config.getLocking()).andReturn(true);
+ transaction = createMock(Transaction.class);
+ env = createMock(Environment.class);
+ expect(env.getConfig()).andReturn(config);
+ expect(env.beginTransaction(null, null)).andReturn(transaction);
+ worker = createMock(TransactionWorker.class);
+ }
+
+ @AfterMethod
+ public void tearDown() throws CacheLoaderException {
+ runner = null;
+ env = null;
+ config = null;
+ }
+
+
+ @Test
+ public void testMoreDeadlocks() throws Exception {
+ worker.doWork();
+ expectLastCall().andThrow(new DeadlockException());
+ transaction.abort();
+ expect(env.beginTransaction(null, null)).andReturn(transaction);
+ worker.doWork();
+ expectLastCall().andThrow(new DeadlockException());
+ transaction.abort();
+ expect(env.beginTransaction(null, null)).andReturn(transaction);
+ worker.doWork();
+ expectLastCall().andThrow(new DeadlockException());
+ transaction.abort();
+ replayAll();
+ runner = new PreparableTransactionRunner(env, 2, null);
+ try {
+ runner.prepare(worker);
+ assert false : "should have gotten a deadlock exception";
+ } catch (DeadlockException e) {
+
+ }
+ verifyAll();
+ }
+
+ @Test
+ public void testPrepare() throws Exception {
+
+ worker.doWork();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ runner.prepare(worker);
+ verifyAll();
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ transaction.commit();
+ worker.doWork();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ runner.run(worker);
+ verifyAll();
+ }
+
+
+ @Test
+ public void testOneArgConstructorSetsCurrentTxn() throws Exception {
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ assert CurrentTransaction.getInstance(env) == runner.currentTxn;
+ }
+
+ @Test
+ public void testSetMaxRetries() throws Exception {
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ runner.setMaxRetries(1);
+ assert runner.getMaxRetries() == 1;
+ }
+
+ @Test
+ public void testSetAllowNestedTransactions() throws Exception {
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ runner.setAllowNestedTransactions(false);
+ assert !runner.getAllowNestedTransactions();
+ try {
+ runner.setAllowNestedTransactions(true);
+ assert false : "should have gotten Exception";
+ } catch (UnsupportedOperationException e) {}
+ }
+
+ @Test
+ public void testGetTransactionConfig() throws Exception {
+ replayAll();
+ TransactionConfig config = new TransactionConfig();
+ runner = new PreparableTransactionRunner(env);
+ runner.setTransactionConfig(config);
+ assert runner.getTransactionConfig().equals(config);
+ }
+
+
+ @Test
+ public void testExceptionThrownInPrepare() throws Exception {
+
+ worker.doWork();
+ expectLastCall().andThrow(new RuntimeException());
+ transaction.abort();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+
+ try {
+ runner.prepare(worker);
+ assert false : "should have gotten an exception";
+ } catch (RuntimeException e) {
+
+ }
+ verifyAll();
+ }
+
+ @Test
+ public void testErrorThrownInPrepare() throws Exception {
+
+ worker.doWork();
+ expectLastCall().andThrow(new Error());
+ transaction.abort();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+
+ try {
+ runner.prepare(worker);
+ assert false : "should have gotten an exception";
+ } catch (Error e) {
+
+ }
+ verifyAll();
+ }
+
+
+ @Test
+ public void testExceptionThrownInRun() throws Exception {
+
+ worker.doWork();
+ expectLastCall().andThrow(new RuntimeException());
+ transaction.abort();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+
+ try {
+ runner.prepare(worker);
+ assert false : "should have gotten an exception";
+ } catch (RuntimeException e) {
+
+ }
+ verifyAll();
+ }
+
+ @Test
+ public void testErrorThrownInRun() throws Exception {
+
+ worker.doWork();
+ expectLastCall().andThrow(new Error());
+ transaction.abort();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+
+ try {
+ runner.run(worker);
+ assert false : "should have gotten an exception";
+ } catch (Error e) {
+
+ }
+ verifyAll();
+ }
+
+
+ public void testRethrowIfNotDeadLockDoesntThrowWhenGivenDeadlockException() throws
Exception {
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ runner.rethrowIfNotDeadLock(createNiceMock(DeadlockException.class));
+ }
+
+ public void testThrowableDuringAbort() throws Exception {
+ transaction.abort();
+ expectLastCall().andThrow(new RuntimeException());
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ CurrentTransaction.getInstance(env).beginTransaction(null);
+ int max = runner.abortOverflowingCurrentTriesOnError(transaction, 2);
+ assert max == Integer.MAX_VALUE : "should have overflowed max tries, but got
" + max;
+ verifyAll();
+ }
+
+ private void replayAll() {
+ replay(config);
+ replay(env);
+ replay(transaction);
+ replay(worker);
+ }
+
+ private void verifyAll() {
+ verify(config);
+ verify(env);
+ verify(transaction);
+ verify(worker);
+ }
+}