[jbosscache-commits] JBoss Cache SVN: r7814 - in core/branches/flat/src: main/java/org/horizon/loader/bdbje and 2 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Sun Mar 1 19:43:58 EST 2009


Author: adriancole
Date: 2009-03-01 19:43:57 -0500 (Sun, 01 Mar 2009)
New Revision: 7814

Added:
   core/branches/flat/src/main/java/org/horizon/loader/bdbje/
   core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java
   core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java
   core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java
   core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java
   core/branches/flat/src/test/java/org/horizon/loader/bdbje/
   core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java
   core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java
   core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java
   core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java
   core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java
Log:
added bdbje loader support

Added: core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java	2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,706 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.serial.SerialBinding;
+import com.sleepycat.bind.serial.StoredClassCatalog;
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.StoredMap;
+import com.sleepycat.je.*;
+import com.sleepycat.util.ExceptionUnwrapper;
+import org.horizon.Cache;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
+import org.horizon.util.ReflectionUtil;
+import org.horizon.util.concurrent.WithinThreadExecutor;
+
+import java.io.File;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * An Oracle SleepyCat JE implementation of a {@link org.horizon.loader.CacheStore}.  <p/>This implementation uses two
+ * databases <ol> <li>stored entry database: <tt>/{location}/CacheInstance-{@link org.horizon.Cache#getName()
+ * name}</tt></li> {@link StoredEntry stored entries} are stored here, keyed on {@link
+ * org.horizon.loader.StoredEntry#getKey()} <li>class catalog database: <tt>/{location}/CacheInstance-{@link
+ * org.horizon.Cache#getName() name}_class_catalog</tt></li> class descriptions are stored here for efficiency reasons.
+ * </ol>
+ * <p/>
+ * <p/>
+ * All data access is transactional.  Any attempted reads to locked records will block.  The maximum duration of this is
+ * set in microseconds via the parameter {@link org.horizon.loader.bdbje.BdbjeCacheStoreConfig#getLockAcquistionTimeoutMicros()}.
+ * Calls to {@link BdbjeCacheStore#prepare(java.util.List, javax.transaction.Transaction, boolean) prepare} will attempt
+ * to resolve deadlocks, retrying up to {@link org.horizon.loader.bdbje.BdbjeCacheStoreConfig#getMaxTxRetries()}
+ * attempts.
+ * <p/>
+ * Unlike the C version of SleepyCat, JE does not support MVCC or READ_COMMITTED isolation.  In other words, readers
+ * will block on any data held by a pending transaction.  As such, it is best practice to keep the duration between
+ * <code>prepare</code> and <code>commit</code> as short as possible.
+ * <p/>
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class BdbjeCacheStore implements CacheStore {
+
+   private static final Log log = LogFactory.getLog(BdbjeCacheStore.class);
+
+   private BdbjeCacheStoreConfig cfg;
+   private Cache cache;
+   private Marshaller m;
+
+   private Environment env;
+   private String cacheDbName;
+   private String catalogDbName;
+   private StoredClassCatalog catalog;
+   private Database cacheDb;
+   private StoredMap<Object, StoredEntry> cacheMap;
+
+   private PreparableTransactionRunner transactionRunner;
+   private Map<javax.transaction.Transaction, Transaction> txnMap;
+   private ExecutorService purgerService;
+   private CurrentTransaction currentTransaction;
+
+   /**
+    * {@inheritDoc} This implementation expects config to be an instance of {@link BdbjeCacheStoreConfig}
+    *
+    * @see BdbjeCacheStoreConfig
+    */
+   public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+      log.trace("initializing BdbjeCacheStore");
+      printLicense();
+      this.cfg = (BdbjeCacheStoreConfig) config;
+      this.cache = cache;
+      this.m = m;
+   }
+
+
+   /**
+    * {@inheritDoc}
+    *
+    * @return {@link BdbjeCacheStoreConfig}
+    */
+   public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+      return BdbjeCacheStoreConfig.class;
+   }
+
+   /**
+    * {@inheritDoc} Validates configuration, configures and opens the {@link Environment}, then {@link
+    * org.horizon.loader.bdbje.BdbjeCacheStore#openDatabases() opens the databases}.  When this is finished,
+    * transactional services are instantiated.
+    */
+   public void start() throws CacheLoaderException {
+      log.trace("starting BdbjeCacheStore");
+      checkNotOpen();
+
+      if (cache == null) {
+         throw new IllegalStateException(
+               "A non-null Cache property (CacheSPI object) is required");
+      }
+
+      String configStr = cfg.getLocation();
+
+      if (cfg.isPurgeSynchronously()) {
+         purgerService = new WithinThreadExecutor();
+      } else {
+         purgerService = Executors.newSingleThreadExecutor();
+      }
+
+      // JBCACHE-1448 db name parsing fix courtesy of Ciro Cavani
+      /* Parse config string. */
+      int offset = configStr.indexOf('#');
+      if (offset >= 0 && offset < configStr.length() - 1) {
+         cacheDbName = configStr.substring(offset + 1);
+         configStr = configStr.substring(0, offset);
+      } else {
+         cacheDbName = cache.getName();
+         if (cacheDbName == null) cacheDbName = "CacheInstance-" + System.identityHashCode(cache);
+      }
+
+      // datafile location
+      File location = new File(configStr);
+      if (!location.exists()) {
+         boolean created = location.mkdirs();
+         if (!created) throw new CacheLoaderException("Unable to create cache loader location " + location);
+
+      }
+      if (!location.isDirectory()) {
+         throw new CacheLoaderException("Cache loader location [" + location + "] is not a directory!");
+      }
+
+      catalogDbName = cacheDbName + "_class_catalog";
+
+      try {
+         /* Open the environment, creating it if it doesn't exist. */
+         EnvironmentConfig envConfig = new EnvironmentConfig();
+         envConfig.setAllowCreate(true);
+         envConfig.setTransactional(true);
+         envConfig.setLockTimeout(cfg.getLockAcquistionTimeoutMicros());
+         if (log.isDebugEnabled()) {
+            envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
+            envConfig.setConfigParam("je.txn.dumpLocks", "true");
+         }
+         log.trace("creating je environment with home dir {0}", location);
+         env = new Environment(location, envConfig);
+         log.debug("created je environment {0} for cache store {1}", env, this);
+         /* Open cache and catalog databases. */
+         openDatabases();
+      }
+      catch (DatabaseException e) {
+         throw new CacheLoaderException("could not open the sleepycat database", e);
+      }
+      txnMap = new ConcurrentHashMap<javax.transaction.Transaction, Transaction>();
+      currentTransaction = CurrentTransaction.getInstance(env);
+      transactionRunner = new PreparableTransactionRunner(env);
+      transactionRunner.setMaxRetries(cfg.getMaxTxRetries());
+      log.debug("started cache store {1}", this);
+   }
+
+
+   /**
+    * Opens all databases and initializes database related information.  A {@link StoredMap} instance is {@link
+    * BdbjeCacheStore#createStoredMapViewOfDatabase(com.sleepycat.je.Database, com.sleepycat.bind.serial.StoredClassCatalog)
+    * associated} with the stored entry and class catalog databases.
+    */
+   private void openDatabases() throws DatabaseException {
+      log.trace("opening databases");
+      /* Use a generic database config, with no duplicates allowed. */
+      DatabaseConfig dbConfig = new DatabaseConfig();
+      dbConfig.setTransactional(true);
+      dbConfig.setAllowCreate(true);
+
+      log.trace("opening or creating stored entry database {0}", cacheDbName);
+      cacheDb = env.openDatabase(null, cacheDbName, dbConfig);
+      log.debug("opened stored entry database {0}", cacheDbName);
+
+      log.trace("opening or creating class catalog database {0}", catalogDbName);
+      Database catalogDb = env.openDatabase(null, catalogDbName, dbConfig);
+      catalog = new StoredClassCatalog(catalogDb);
+      log.debug("created stored class catalog from database {0}", catalogDbName);
+
+      cacheMap = createStoredMapViewOfDatabase(cacheDb, catalog);
+   }
+
+   /**
+    * create a {@link StoredMap} persisted by the <code>database</code>
+    *
+    * @param database     where entries in the StoredMap are persisted
+    * @param classCatalog location to store class descriptions
+    * @return StoredMap backed by the database and classCatalog
+    * @throws DatabaseException if the StoredMap cannot be opened.
+    */
+   private StoredMap createStoredMapViewOfDatabase(Database database, StoredClassCatalog classCatalog) throws DatabaseException {
+      EntryBinding storedEntryKeyBinding =
+            new SerialBinding(classCatalog, Object.class);
+      EntryBinding storedEntryValueBinding =
+            new SerialBinding(classCatalog, StoredEntry.class);
+      try {
+         return new StoredMap<Object, StoredEntry>(database,
+                                                   storedEntryKeyBinding, storedEntryValueBinding, true);
+      } catch (Exception caught) {
+         caught = ExceptionUnwrapper.unwrap(caught);
+         throw new DatabaseException("error opening stored map", caught);
+      }
+   }
+
+   /**
+    * Closes the JE databases and their associated {@link StoredMap}, and nulls references to them. The databases are
+    * not removed from the file system. Exceptions during close are ignored.
+    */
+   private void closeDatabases() {
+      log.trace("closing databases");
+      try {
+         cacheDb.close();
+         catalog.close();
+      } catch (DatabaseException e) {
+         log.error("Error closing databases", e);
+      }
+      cacheDb = null;
+      catalog = null;
+      cacheMap = null;
+   }
+
+
+   /**
+    * {@link org.horizon.loader.bdbje.BdbjeCacheStore#closeDatabases() Closes the JE databases} and the {@link
+    * Environment}.  The environment and databases are not removed from the file system. Exceptions during close are
+    * ignored.
+    */
+   public void stop() throws CacheLoaderException {
+      checkOpen();
+      log.trace("stopping BdbjeCacheStore");
+      transactionRunner = null;
+      currentTransaction = null;
+      txnMap = null;
+      closeDatabases();
+      if (env != null) {
+         try {
+            env.close();
+         }
+         catch (Exception shouldNotOccur) {
+            log.warn("Unexpected exception closing cacheStore", shouldNotOccur);
+         }
+      }
+      env = null;
+   }
+
+   /**
+    * {@inheritDoc} delegates to {@link BdbjeCacheStore#onePhaseCommit(java.util.List)}, if <code>isOnePhase</code>.
+    * Otherwise, delegates to {@link BdbjeCacheStore#prepare(java.util.List, javax.transaction.Transaction) prepare}.
+    */
+   public void prepare(List<? extends Modification> mods, javax.transaction.Transaction tx, boolean isOnePhase) throws CacheLoaderException {
+      if (isOnePhase) {
+         onePhaseCommit(mods);
+      } else {
+         prepare(mods, tx);
+      }
+   }
+
+   /**
+    * Perform the <code>mods</code> atomically by creating a {@link ModificationsTransactionWorker worker} and invoking
+    * them in {@link PreparableTransactionRunner#run(com.sleepycat.collections.TransactionWorker)}.
+    *
+    * @param mods actions to perform atomically
+    * @throws CacheLoaderException on problems during the transaction
+    */
+   protected void onePhaseCommit(List<? extends Modification> mods) throws CacheLoaderException {
+      checkOpen();
+      checkNonNull(mods, "modifications");
+      log.debug("performing one phase transaction");
+      try {
+         transactionRunner.run(new ModificationsTransactionWorker(this, mods));
+      } catch (Exception e) {
+         throw new CacheLoaderException("Problem committing modifications: " + mods, e);
+      }
+   }
+
+   /**
+    * Looks up the {@link Transaction SleepyCat transaction} associated with <code>tx</code>.  Creates a {@link
+    * org.horizon.loader.bdbje.ModificationsTransactionWorker} instance from <code>mods</code>.  Then prepares the
+    * transaction via {@link PreparableTransactionRunner#prepare(com.sleepycat.collections.TransactionWorker)}.
+    *
+    * @param mods modifications to be applied
+    * @param tx   transaction identifier
+    * @throws CacheLoaderException in the event of problems writing to the store
+    */
+   protected void prepare(List<? extends Modification> mods, javax.transaction.Transaction tx) throws CacheLoaderException {
+      checkOpen();
+      checkNonNull(mods, "modifications");
+      checkNonNull(tx, "tx");
+      log.debug("preparing transaction {0}", tx);
+      try {
+         transactionRunner.prepare(new ModificationsTransactionWorker(this, mods));
+         Transaction txn = currentTransaction.getTransaction();
+         log.trace("transaction {0} == sleepycat transaction {1}", tx, txn);
+         txnMap.put(tx, txn);
+         ReflectionUtil.setValue(currentTransaction, "localTrans", new ThreadLocal());
+      } catch (Exception e) {
+         throw new CacheLoaderException("Problem preparing transaction", e);
+      }
+   }
+
+
+   /**
+    * {@inheritDoc}
+    * <p/>
+    * This implementation calls {@link BdbjeCacheStore#completeTransaction(javax.transaction.Transaction, boolean)
+    * completeTransaction} with an argument of false.
+    */
+   public void rollback(javax.transaction.Transaction tx) {
+      try {
+         completeTransaction(tx, false);
+      } catch (Exception e) {
+         log.error("Error rolling back transaction", e);
+      }
+   }
+
+   /**
+    * {@inheritDoc}
+    * <p/>
+    * This implementation calls {@link BdbjeCacheStore#completeTransaction(javax.transaction.Transaction, boolean)
+    * completeTransaction} with an argument of true.
+    */
+   public void commit(javax.transaction.Transaction tx) throws CacheLoaderException {
+      completeTransaction(tx, true);
+   }
+
+   /**
+    * Looks up the SleepyCat transaction associated with the parameter <code>tx</code>.  If there is no associated
+    * sleepycat transaction, an error is logged.  If this transaction is the {@link
+    * com.sleepycat.collections.CurrentTransaction#getTransaction()}  current transaction}, it calls {@link
+    * BdbjeCacheStore#completeCurrentTransaction(boolean)} passing the argument <code>commit</code>.  Otherwise, {@link
+    * BdbjeCacheStore#completeTransaction(com.sleepycat.je.Transaction, boolean) completeTransaction} is called, passing
+    * the SleepyCat transaction and <code>commit</code> as arguments.
+    *
+    * @param tx     java transaction used to lookup a SleepyCat transaction
+    * @param commit true to commit false to abort
+    * @throws CacheLoaderException if there are problems committing or aborting the transaction
+    */
+   protected void completeTransaction(javax.transaction.Transaction tx, boolean commit) throws CacheLoaderException {
+      checkOpen();
+      checkNonNull(tx, "tx");
+      Transaction txn = txnMap.remove(tx);
+      if (txn != null) {
+         log.debug("transaction {0} == sleepycat transaction {1}", tx, txn);
+         if (currentTransaction.getTransaction() == txn) {
+            completeCurrentTransaction(commit);
+         } else {
+            completeTransaction(txn, commit);
+         }
+      } else {
+         log.error("no sleepycat transaction associated  transaction {0}", tx);
+      }
+   }
+
+   /**
+    * commits or aborts the {@link  Transaction}
+    *
+    * @param commit true to commit, false to abort
+    * @throws CacheLoaderException if there was a problem completing the transaction
+    */
+   private void completeTransaction(Transaction txn, boolean commit) throws CacheLoaderException {
+      try {
+         log.debug("{0} sleepycat transaction {1}", commit ? "committing" : "aborting", txn);
+         if (commit)
+            txn.commit();
+         else
+            txn.abort();
+      } catch (DatabaseException e) {
+         throw new CacheLoaderException("Problem completing transaction", e);
+      }
+   }
+
+   /**
+    * commits or aborts the {@link com.sleepycat.collections.CurrentTransaction#getTransaction() current transaction}
+    *
+    * @param commit true to commit, false to abort
+    * @throws CacheLoaderException if there was a problem completing the transaction
+    */
+   private void completeCurrentTransaction(boolean commit) throws CacheLoaderException {
+      try {
+         log.debug("{0} current sleepycat transaction {1}", commit ? "committing" : "aborting", currentTransaction.getTransaction());
+         if (commit)
+            currentTransaction.commitTransaction();
+         else
+            currentTransaction.abortTransaction();
+      } catch (DatabaseException e) {
+         throw new CacheLoaderException("Problem completing transaction", e);
+      }
+   }
+
+   /**
+    * {@inheritDoc} This implementation delegates to {@link BdbjeCacheStore#load(Object)}, to ensure that a response is
+    * returned only if the entry is not expired.
+    */
+   public boolean containsKey(Object key) throws CacheLoaderException {
+      return load(key) != null;
+   }
+
+   /**
+    * {@inheritDoc} This implementation delegates to {@link StoredMap#remove(Object)}
+    */
+   public boolean remove(Object key) throws CacheLoaderException {
+      checkOpen();
+      checkNonNull(key, "key");
+      log.trace("Removing key {0}", key);
+      try {
+         if (cacheMap.containsKey(key)) {
+            cacheMap.remove(key);
+            return true;
+         }
+      } catch (Exception caught) {
+         caught = ExceptionUnwrapper.unwrap(caught);
+         throw new CacheLoaderException("error removing key " + key, caught);
+      }
+      return false;
+   }
+
+   /**
+    * {@inheritDoc} This implementation removes the <code>keys</code> atomically by creating a list of {@link Remove}
+    * modifications and passing that to {@link BdbjeCacheStore#onePhaseCommit(java.util.List)}.
+    */
+   public void removeAll(Set<Object> keys) throws CacheLoaderException {
+      checkOpen();
+      checkNonNull(keys, "keys");
+      List<Remove> toRemove = new ArrayList<Remove>();
+      for (Object key : keys) {
+         toRemove.add(new Remove(key));
+      }
+      onePhaseCommit(toRemove);
+   }
+
+   /**
+    * {@inheritDoc} This implementation delegates to {@link StoredMap#get(Object)}.  If the object is expired, it will
+    * not be returned.
+    */
+   public StoredEntry load(Object key) throws CacheLoaderException {
+      checkOpen();
+      checkNonNull(key, "key");
+      log.trace("Loading key {0}", key);
+      try {
+         StoredEntry s = cacheMap.get(key);
+         if (s == null)
+            return null;
+         if (!s.isExpired())
+            return s;
+         else
+            cacheMap.remove(key);
+      } catch (Exception caught) {
+         caught = ExceptionUnwrapper.unwrap(caught);
+         throw new CacheLoaderException("error loading key " + key, caught);
+      }
+      return null;
+   }
+
+   /**
+    * {@inheritDoc} This implementation delegates to {@link StoredMap#put(Object, Object)}
+    */
+   public void store(StoredEntry ed) throws CacheLoaderException {
+      checkOpen();
+      checkNonNull(ed, "entry");
+      log.trace("Storing entry {0}", ed);
+      try {
+         cacheMap.put(ed.getKey(), ed);
+      } catch (Exception caught) {
+         caught = ExceptionUnwrapper.unwrap(caught);
+         throw new CacheLoaderException("error storing entry " + ed, caught);
+      }
+   }
+
+   /**
+    * {@inheritDoc} This implementation delegates to {@link StoredMap#clear()}
+    */
+   public void clear() throws CacheLoaderException {
+      checkOpen();
+      log.trace("Clearing store");
+      try {cacheMap.clear(); } catch (Exception caught) {
+         caught = ExceptionUnwrapper.unwrap(caught);
+         throw new CacheLoaderException("error clearing store", caught);
+      }
+   }
+
+   /**
+    * {@inheritDoc} This implementation returns a Set from {@link StoredMap#values()}
+    */
+   public Set<StoredEntry> loadAll() throws CacheLoaderException {
+      checkOpen();
+      log.trace("Loading all entries");
+      try {
+         return new HashSet(cacheMap.values());
+      } catch (Exception caught) {
+         caught = ExceptionUnwrapper.unwrap(caught);
+         throw new CacheLoaderException("error loading all entries ", caught);
+      }
+   }
+
+   /**
+    * {@inheritDoc} This implementation reads the number of entries to load from the stream, then begins a transaction.
+    * During that tranasaction, the cachestore is cleared and replaced with entries from the stream.  If there are any
+    * errors during the process, the entire transaction is rolled back.  Deadlock handling is not addressed, as there is
+    * no means to rollback reads from the inputstream.
+    *
+    * @see BdbjeCacheStore#toStream(java.io.ObjectOutput)
+    */
+   public void fromStream(ObjectInput ois) throws CacheLoaderException {
+      checkOpen();
+      log.warn("Clearing all entries and loading from input");
+      try {
+         long recordCount = ois.readLong();
+         log.info("reading {0} records from stream", recordCount);
+         log.info("clearing all records");
+         currentTransaction.beginTransaction(null);
+         cacheMap.clear();
+         Cursor cursor = null;
+         try {
+            cursor = cacheDb.openCursor(currentTransaction.getTransaction(), null);
+            for (int i = 0; i < recordCount; i++) {
+               byte[] keyBytes = (byte[]) ois.readObject();
+               byte[] dataBytes = (byte[]) ois.readObject();
+
+               DatabaseEntry key = new DatabaseEntry(keyBytes);
+               DatabaseEntry data = new DatabaseEntry(dataBytes);
+               cursor.put(key, data);
+            }
+         } finally {
+            if (cursor != null) cursor.close();
+         }
+         completeCurrentTransaction(true);
+      }
+      catch (Exception caught) {
+         completeCurrentTransaction(false);
+         caught = ExceptionUnwrapper.unwrap(caught);
+         CacheLoaderException cle = (caught instanceof CacheLoaderException) ? (CacheLoaderException) caught :
+               new CacheLoaderException("Problems reading from stream", caught);
+         throw cle;
+      }
+   }
+
+   /**
+    * @{inheritDoc} Writes the current count of cachestore entries followed by a pair of byte[]s corresponding to the
+    * SleepyCat binary representation of {@link org.horizon.loader.StoredEntry#getKey() key} {@link StoredEntry value}.
+    * <p/>
+    * This implementation holds a transaction open to ensure that we see no new records added while iterating.
+    */
+   public void toStream(ObjectOutput oos) throws CacheLoaderException {
+      checkOpen();
+      log.trace("dumping current database to outputstream");
+      Cursor cursor = null;
+      try {
+         currentTransaction.beginTransaction(null);
+         long recordCount = cacheDb.count();
+         log.debug("writing {0} records to stream", recordCount);
+         oos.writeLong(recordCount);
+
+         cursor = cacheDb.openCursor(currentTransaction.getTransaction(), null);
+         DatabaseEntry key = new DatabaseEntry();
+         DatabaseEntry data = new DatabaseEntry();
+         int recordsWritten = 0;
+         while (cursor.getNext(key, data, null) ==
+               OperationStatus.SUCCESS) {
+            oos.writeObject(key.getData());
+            oos.writeObject(data.getData());
+            recordsWritten++;
+         }
+         log.debug("wrote {0} records to stream", recordsWritten);
+         if (recordsWritten != recordCount)
+            log.warn("expected to write {0} records, but wrote {1}", recordCount, recordsWritten);
+         cursor.close();
+         cursor = null;
+         currentTransaction.commitTransaction();
+      } catch (Exception caught) {
+         try {
+            currentTransaction.abortTransaction();
+         } catch (DatabaseException e) {
+            log.error("error aborting transaction", e);
+         }
+         caught = ExceptionUnwrapper.unwrap(caught);
+         CacheLoaderException cle = (caught instanceof CacheLoaderException) ? (CacheLoaderException) caught :
+               new CacheLoaderException("Problems writing to stream", caught);
+         throw cle;
+      }
+      finally {
+         if (cursor != null) try {
+            cursor.close();
+         } catch (DatabaseException e) {
+            throw new CacheLoaderException("Error closing cursor", e);
+         }
+      }
+   }
+
+   /**
+    * {@inheritDoc} If there is a {@link com.sleepycat.collections.CurrentTransaction#getTransaction() transaction in
+    * progress}, this method will invoke {@link #doPurgeExpired()} Otherwise, it will purge expired entries,
+    * autocommitting each.
+    *
+    * @see org.horizon.loader.bdbje.BdbjeCacheStoreConfig#isPurgeSynchronously()
+    * @see org.horizon.loader.bdbje.BdbjeCacheStoreConfig#setMaxTxRetries(int)
+    */
+   public void purgeExpired() throws CacheLoaderException {
+      checkOpen();
+      if (currentTransaction.getTransaction() != null) {
+         doPurgeExpired();
+      } else {
+         purgerService.execute(new Runnable() {
+            public void run() {
+               try {
+                  doPurgeExpired();
+               } catch (Exception e) {
+                  log.error("error purging expired entries", e);
+               }
+            }
+         });
+      }
+   }
+
+   /**
+    * Iterate through {@link com.sleepycat.collections.StoredMap#entrySet()} and remove, if expired.
+    */
+   private void doPurgeExpired() {
+      log.info("purging expired from database");
+      Iterator<Map.Entry<Object, StoredEntry>> i = cacheMap.entrySet().iterator();
+      while (i.hasNext()) {
+         if (i.next().getValue().isExpired())
+            i.remove();
+      }
+   }
+
+   /**
+    * @return the name of the SleepyCat database persisting this  store
+    */
+   public String getCacheDbName() {
+      return cacheDbName;
+   }
+
+   /**
+    * @return the name of the SleepyCat database persisting the class information for objects in this store
+    */
+   public String getCatalogDbName() {
+      return catalogDbName;
+   }
+
+   /**
+    * prints terms of use for Berkeley DB JE
+    */
+   public void printLicense() {
+      String license = "\n*************************************************************************************\n" +
+            "Berkeley DB Java Edition version: " + JEVersion.CURRENT_VERSION.toString() + "\n" +
+            "JBoss Cache can use Berkeley DB Java Edition from Oracle \n" +
+            "(http://www.oracle.com/database/berkeley-db/je/index.html)\n" +
+            "for persistent, reliable and transaction-protected data storage.\n" +
+            "If you choose to use Berkeley DB Java Edition with JBoss Cache, you must comply with the terms\n" +
+            "of Oracle's public license, included in the file LICENSE.txt.\n" +
+            "If you prefer not to release the source code for your own application in order to comply\n" +
+            "with the Oracle public license, you may purchase a different license for use of\n" +
+            "Berkeley DB Java Edition with JBoss Cache.\n" +
+            "See http://www.oracle.com/database/berkeley-db/je/index.html for pricing and license terms\n" +
+            "*************************************************************************************";
+      System.out.println(license);
+   }
+
+   /**
+    * Throws an exception if the environment is not open.
+    */
+   private void checkNotOpen() {
+      if (env != null) {
+         throw new IllegalStateException(
+               "Operation not allowed after calling create()");
+      }
+   }
+
+   /**
+    * Throws an exception if the environment is not open.
+    */
+   private void checkOpen() {
+      if (env == null) {
+         throw new IllegalStateException(
+               "Operation not allowed before calling create()");
+      }
+   }
+
+   /**
+    * Throws an exception if the parameter is null.
+    */
+   private void checkNonNull(Object param, String paramName) {
+      if (param == null) {
+         throw new NullPointerException(
+               "Parameter must not be null: " + paramName);
+      }
+   }
+
+}

Added: core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java	2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,66 @@
+package org.horizon.loader.bdbje;
+
+import org.horizon.loader.AbstractCacheLoaderConfig;
+
+/**
+ * Configures {@link org.horizon.loader.bdbje.BdbjeCacheStore}.  This allows you to tune a number of characteristics of
+ * the {@link BdbjeCacheStore}.
+ * <p/>
+ * <ul> <li><tt>location</tt> - a location on disk where the store can write databases.  This defaults to
+ * <tt>${java.io.tmpdir}</tt></li> <li><tt>purgeSynchronously</tt> - whether {@link
+ * org.horizon.loader.CacheStore#purgeExpired()} calls happen synchronously or not.  By default, this is set to
+ * <tt>false</tt>.</li> <li><tt>lockAcquistionTimeoutMicros</tt> - the length of time, in microseconds, to wait for
+ * locks before timing out and throwing an exception.  By default, this is set to <tt>60000000</tt>.</li>
+ * <li><tt>maxTxRetries</tt> - the number of times transaction prepares will attempt to resolve a deadlock before
+ * throwing an exception.  By default, this is set to <tt>5</tt>.</li> </ul>
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class BdbjeCacheStoreConfig extends AbstractCacheLoaderConfig {
+   private String location = System.getProperty("java.io.tmpdir");
+   private boolean purgeSynchronously;
+   private long lockAcquistionTimeoutMicros = 60 * 1000 * 1000;
+   private int maxTxRetries = 5;
+
+
+   public BdbjeCacheStoreConfig() {
+      setClassName(BdbjeCacheStore.class.getName());
+   }
+
+   public int getMaxTxRetries() {
+      return maxTxRetries;
+   }
+
+   public void setMaxTxRetries(int maxTxRetries) {
+      this.maxTxRetries = maxTxRetries;
+   }
+
+
+   public long getLockAcquistionTimeoutMicros() {
+      return lockAcquistionTimeoutMicros;
+   }
+
+   public void setLockAcquistionTimeoutMicros(long lockAcquistionTimeoutMicros) {
+      this.lockAcquistionTimeoutMicros = lockAcquistionTimeoutMicros;
+   }
+
+   public String getLocation() {
+      return location;
+   }
+
+   public void setLocation(String location) {
+      testImmutability("location");
+      this.location = location;
+   }
+
+   public boolean isPurgeSynchronously() {
+      return purgeSynchronously;
+   }
+
+   public void setPurgeSynchronously(boolean purgeSynchronously) {
+      this.purgeSynchronously = purgeSynchronously;
+   }
+
+}

Added: core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java	2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,60 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.collections.TransactionWorker;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+
+import java.util.List;
+
+/**
+ * Adapter that allows a list of {@link Modification}s to be performed atomically via {@link
+ * com.sleepycat.collections.TransactionRunner}.
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class ModificationsTransactionWorker implements TransactionWorker {
+   private List<? extends Modification> mods;
+   private CacheStore cs;
+
+   /**
+    * Associates {@link Modification}s that will be applied to the supplied {@link CacheStore}
+    *
+    * @param store what to affect
+    * @param mods  actions to take
+    */
+   public ModificationsTransactionWorker(CacheStore store, List<? extends Modification> mods) {
+      this.cs = store;
+      this.mods = mods;
+   }
+
+   /**
+    * {@inheritDoc} This implementation iterates through a list of work represented by {@link Modification} objects and
+    * executes it against the {@link CacheStore}.<p/> Current commands supported are: <ul> <li>STORE</li> <li>CLEAR</li>
+    * <li>REMOVE</li> <li>PURGE_EXPIRED</li> </ul>
+    */
+   public void doWork() throws Exception {
+      for (Modification modification : mods)
+         switch (modification.getType()) {
+            case STORE:
+               Store s = (Store) modification;
+               cs.store(s.getStoredEntry());
+               break;
+            case CLEAR:
+               cs.clear();
+               break;
+            case REMOVE:
+               Remove r = (Remove) modification;
+               cs.remove(r.getKey());
+               break;
+            case PURGE_EXPIRED:
+               cs.purgeExpired();
+               break;
+            default:
+               throw new IllegalArgumentException("Unknown modification type " + modification.getType());
+         }
+   }
+}

Added: core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java	2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,95 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.TransactionRunner;
+import com.sleepycat.collections.TransactionWorker;
+import com.sleepycat.compat.DbCompat;
+import com.sleepycat.je.DeadlockException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import com.sleepycat.util.ExceptionUnwrapper;
+
+/**
+ * Adapted version of {@link TransactionRunner}, which allows us to prepare a transaction without committing it.<p/> The
+ * transaction prepared is accessible via {@link com.sleepycat.collections.CurrentTransaction#getTransaction()}
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class PreparableTransactionRunner extends TransactionRunner {
+   CurrentTransaction currentTxn;
+
+   /**
+    * Delegates to the {@link  TransactionRunner#TransactionRunner(com.sleepycat.je.Environment, int,
+    * com.sleepycat.je.TransactionConfig) superclass} and caches a current reference to {@link CurrentTransaction}.
+    *
+    * @see TransactionRunner#TransactionRunner(com.sleepycat.je.Environment, int, com.sleepycat.je.TransactionConfig)
+    */
+   public PreparableTransactionRunner(Environment env, int maxRetries, TransactionConfig config) {
+      super(env, maxRetries, config);
+      this.currentTxn = CurrentTransaction.getInstance(env);
+   }
+
+   /**
+    * Delegates to the {@link  TransactionRunner#TransactionRunner(com.sleepycat.je.Environment) superclass} and caches
+    * a current reference to {@link CurrentTransaction}.
+    *
+    * @see TransactionRunner#TransactionRunner(com.sleepycat.je.Environment)
+    */
+   public PreparableTransactionRunner(Environment env) {
+      super(env);
+      this.currentTxn = CurrentTransaction.getInstance(env);
+   }
+
+   /**
+    * Same behaviour as {@link TransactionRunner#run(com.sleepycat.collections.TransactionWorker) run}, except that the
+    * transaction is not committed on success.
+    *
+    * @see TransactionRunner#run(com.sleepycat.collections.TransactionWorker)
+    */
+   public void prepare(TransactionWorker worker) throws Exception {
+      for (int currentTries = 0; ; currentTries++) {
+         Transaction txn = null;
+         try {
+            txn = currentTxn.beginTransaction(getTransactionConfig());
+            worker.doWork();
+            return;
+         } catch (Throwable caught) {
+            currentTries = abortOverflowingCurrentTriesOnError(txn, currentTries);
+            caught = ExceptionUnwrapper.unwrapAny(caught);
+            rethrowIfNotDeadLock(caught);
+            if (currentTries >= getMaxRetries())
+               throw (DeadlockException) caught;
+         }
+      }
+   }
+
+   int abortOverflowingCurrentTriesOnError(Transaction toAbort, int currentTries) {
+      if (toAbort != null && toAbort == currentTxn.getTransaction()) {
+         try {
+            currentTxn.abortTransaction();
+         } catch (Throwable problemAborting) {
+            /* superclass prints to stderr, so we will also */
+            if (DbCompat.TRANSACTION_RUNNER_PRINT_STACK_TRACES) {
+               problemAborting.printStackTrace();
+            }
+            /* Force the original exception to be thrown. */
+            return Integer.MAX_VALUE;
+         }
+      }
+      return currentTries;
+   }
+
+   void rethrowIfNotDeadLock(Throwable caught) throws Exception {
+      if (!(caught instanceof DeadlockException)) {
+         if (caught instanceof Exception) {
+            throw (Exception) caught;
+         } else {
+            throw (Error) caught;
+         }
+      }
+   }
+
+}

Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java	2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,74 @@
+package org.horizon.loader.bdbje;
+
+import org.horizon.loader.CacheLoaderException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests that cover {@link  BdbjeCacheStoreConfig }
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+ at Test(groups = "unit", enabled = true, testName = "loader.bdbje.BdbjeCacheStoreConfigTest")
+public class BdbjeCacheStoreConfigTest {
+
+   private BdbjeCacheStoreConfig config;
+
+   @BeforeMethod
+   public void setUp() throws Exception {
+      config = new BdbjeCacheStoreConfig();
+   }
+
+   @AfterMethod
+   public void tearDown() throws CacheLoaderException {
+      config = null;
+   }
+
+
+   @Test
+   public void testGetClassNameDefault() {
+      assert config.getClassName().equals(BdbjeCacheStore.class.getName());
+   }
+
+   @Test
+   public void testgetMaxTxRetries() {
+      assert config.getMaxTxRetries() == 5;
+   }
+
+   @Test
+   public void testSetMaxTxRetries() {
+      config.setMaxTxRetries(1);
+      assert config.getMaxTxRetries() == 1;
+   }
+
+   @Test
+   public void testGetLockAcquistionTimeoutMicros() {
+      assert config.getLockAcquistionTimeoutMicros() == 60 * 1000 * 1000;
+   }
+
+   @Test
+   public void testSetLockAcquistionTimeoutMicros() {
+      config.setLockAcquistionTimeoutMicros(1);
+      assert config.getLockAcquistionTimeoutMicros() == 1;
+   }
+
+   @Test
+   public void testSetLocation() {
+      config.setLocation("foo");
+      assert config.getLocation().equals("foo");
+   }
+
+   @Test
+   public void testIsPurgeSynchronously() {
+      assert !config.isPurgeSynchronously();
+   }
+
+   @Test
+   public void testSetPurgeSynchronously() {
+      config.setPurgeSynchronously(true);
+      assert config.isPurgeSynchronously();
+   }
+}

Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java	2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,40 @@
+package org.horizon.loader.bdbje;
+
+import org.horizon.loader.BaseCacheStoreTest;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheStore;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+/**
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+ at Test(groups = "unit", enabled = true, testName = "loader.bdbje.BdbjeCacheStoreIntegrationTest")
+public class BdbjeCacheStoreIntegrationTest extends BaseCacheStoreTest {
+   protected CacheStore createCacheStore() throws CacheLoaderException {
+      CacheStore cs = new BdbjeCacheStore();
+      String tmpDir = TestingUtil.TEST_FILES;
+      String tmpCLLoc = tmpDir + "/Horizon-BdbjeCacheStoreIntegrationTest";
+      TestingUtil.recursiveFileRemove(tmpCLLoc);
+
+      BdbjeCacheStoreConfig cfg = new BdbjeCacheStoreConfig();
+      cfg.setLocation(tmpCLLoc);
+      cfg.setPurgeSynchronously(true);
+      cs.init(cfg, getCache(), getMarshaller());
+      cs.start();
+      return cs;
+   }
+
+   @Override
+   public void testTwoPhaseCommitReadCommitted() throws CacheLoaderException {
+      // this depends on READ_COMMTTED, which is not supported on sleepycat
+   }
+
+   @Override
+   public void testRollbackReadCommitted() throws CacheLoaderException {
+      // this depends on READ_COMMTTED, which is not supported on sleepycat
+   }
+
+}

Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java	2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,733 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.serial.SerialBinding;
+import com.sleepycat.bind.serial.StoredClassCatalog;
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.StoredMap;
+import com.sleepycat.collections.TransactionRunner;
+import com.sleepycat.collections.TransactionWorker;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.OperationStatus;
+import org.easymock.EasyMock;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Clear;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Learning tests for SleepyCat JE.  Behaviour here is used in BdbjeCacheLoader.  When there are upgrades to bdbje, this
+ * test may warrant updating.
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+ at Test(groups = "unit", enabled = true, testName = "loader.bdbje.BdbjeLearningTest")
+public class BdbjeLearningTest {
+   String dbHome = TestingUtil.TEST_FILES + "/Horizon-BdbjeLearningTest";
+   Environment env;
+
+   private static final String CLASS_CATALOG = "java_class_catalog";
+   private StoredClassCatalog javaCatalog;
+
+   private static final String STORED_ENTRIES = "storedEntriesDb";
+   private Database storedEntriesDb;
+   private StoredMap<Object, StoredEntry> cacheMap;
+
+
+   @BeforeMethod
+   public void setUp() throws Exception {
+      new File(dbHome).mkdirs();
+      System.out.println("Opening environment in: " + dbHome);
+
+      EnvironmentConfig envConfig = new EnvironmentConfig();
+      envConfig.setTransactional(true);
+      envConfig.setAllowCreate(true);
+
+      env = new Environment(new File(dbHome), envConfig);
+      DatabaseConfig dbConfig = new DatabaseConfig();
+      dbConfig.setTransactional(true);
+      dbConfig.setAllowCreate(true);
+
+      Database catalogDb = env.openDatabase(null, CLASS_CATALOG, dbConfig);
+
+      javaCatalog = new StoredClassCatalog(catalogDb);
+
+      EntryBinding storedEntryKeyBinding =
+            new SerialBinding(javaCatalog, Object.class);
+      EntryBinding storedEntryValueBinding =
+            new SerialBinding(javaCatalog, StoredEntry.class);
+
+      storedEntriesDb = env.openDatabase(null, STORED_ENTRIES, dbConfig);
+
+      cacheMap =
+            new StoredMap<Object, StoredEntry>(storedEntriesDb,
+                                               storedEntryKeyBinding, storedEntryValueBinding, true);
+
+
+   }
+
+   public void testTransactionWorker() throws Exception {
+      TransactionRunner runner = new TransactionRunner(env);
+      runner.run(new PopulateDatabase());
+      runner.run(new PrintDatabase());
+
+   }
+
+
+   private class PopulateDatabase implements TransactionWorker {
+      public void doWork()
+            throws Exception {
+      }
+   }
+
+   private class PrintDatabase implements TransactionWorker {
+      public void doWork()
+            throws Exception {
+      }
+   }
+
+
+   @AfterMethod
+   public void tearDown() throws Exception {
+      storedEntriesDb.close();
+      javaCatalog.close();
+      env.close();
+
+      TestingUtil.recursiveFileRemove(dbHome);
+   }
+
+
+   private void store(StoredEntry se) {
+      cacheMap.put(se.getKey(), se);
+   }
+
+
+   private StoredEntry load(Object key) {
+      StoredEntry s = cacheMap.get(key);
+      if (s == null)
+         return null;
+      if (!s.isExpired())
+         return s;
+      else
+         cacheMap.remove(key);
+      return null;
+   }
+
+   private Set loadAll() {
+      return new HashSet(cacheMap.values());
+   }
+
+   private void purgeExpired() {
+      Iterator<Map.Entry<Object, StoredEntry>> i = cacheMap.entrySet().iterator();
+      while (i.hasNext()) {
+         if (i.next().getValue().isExpired())
+            i.remove();
+      }
+   }
+
+   private static final Log log = LogFactory.getLog(BdbjeLearningTest.class);
+
+   private void toStream(OutputStream outputStream) throws CacheLoaderException {
+      ObjectOutputStream oos = null;
+      Cursor cursor = null;
+
+      try {
+         oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream) outputStream :
+               new ObjectOutputStream(outputStream);
+         long recordCount = storedEntriesDb.count();
+         log.trace("writing {0} records to stream", recordCount);
+         oos.writeLong(recordCount);
+
+         cursor = storedEntriesDb.openCursor(null, null);
+         DatabaseEntry key = new DatabaseEntry();
+         DatabaseEntry data = new DatabaseEntry();
+         while (cursor.getNext(key, data, null) ==
+               OperationStatus.SUCCESS) {
+            oos.writeObject(key.getData());
+            oos.writeObject(data.getData());
+         }
+      } catch (IOException e) {
+         throw new CacheLoaderException("Error writing to object stream", e);
+      } catch (DatabaseException e) {
+         throw new CacheLoaderException("Error accessing database", e);
+      }
+      finally {
+         if (cursor != null) try {
+            cursor.close();
+         } catch (DatabaseException e) {
+            throw new CacheLoaderException("Error closing cursor", e);
+         }
+      }
+
+   }
+
+   private void fromStream(InputStream inputStream) throws CacheLoaderException {
+      ObjectInputStream ois = null;
+      try {
+         ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream) inputStream :
+               new ObjectInputStream(inputStream);
+         long recordCount = ois.readLong();
+         log.info("reading {0} records from stream", recordCount);
+         log.info("clearing all records");
+         cacheMap.clear();
+         Cursor cursor = null;
+         com.sleepycat.je.Transaction txn = env.beginTransaction(null, null);
+         try {
+            cursor = storedEntriesDb.openCursor(txn, null);
+            for (int i = 0; i < recordCount; i++) {
+               byte[] keyBytes = (byte[]) ois.readObject();
+               byte[] dataBytes = (byte[]) ois.readObject();
+
+               DatabaseEntry key = new DatabaseEntry(keyBytes);
+               DatabaseEntry data = new DatabaseEntry(dataBytes);
+               cursor.put(key, data);
+            }
+            cursor.close();
+            cursor = null;
+            txn.commit();
+         } finally {
+            if (cursor != null) cursor.close();
+         }
+
+      }
+      catch (Exception e) {
+         CacheLoaderException cle = (e instanceof CacheLoaderException) ? (CacheLoaderException) e :
+               new CacheLoaderException("Problems reading from stream", e);
+         throw cle;
+      }
+   }
+
+   class StoreTransactionWorker implements TransactionWorker {
+      StoreTransactionWorker(StoredEntry entry) {
+         this.entry = entry;
+      }
+
+      private StoredEntry entry;
+
+      public void doWork() throws Exception {
+         store(entry);
+      }
+   }
+
+   class ClearTransactionWorker implements TransactionWorker {
+
+      public void doWork() throws Exception {
+         cacheMap.clear();
+      }
+   }
+
+   class RemoveTransactionWorker implements TransactionWorker {
+      RemoveTransactionWorker(Object key) {
+         this.key = key;
+      }
+
+      Object key;
+
+      public void doWork() throws Exception {
+         cacheMap.remove(key);
+      }
+   }
+
+   class PurgeExpiredTransactionWorker implements TransactionWorker {
+      public void doWork() throws Exception {
+         purgeExpired();
+      }
+   }
+
+   class ModificationsTransactionWorker implements TransactionWorker {
+      private List<? extends Modification> mods;
+
+      ModificationsTransactionWorker(List<? extends Modification> mods) {
+         this.mods = mods;
+      }
+
+      public void doWork() throws Exception {
+         for (Modification modification : mods)
+            switch (modification.getType()) {
+               case STORE:
+                  Store s = (Store) modification;
+                  store(s.getStoredEntry());
+                  break;
+               case CLEAR:
+                  cacheMap.clear();
+                  break;
+               case REMOVE:
+                  Remove r = (Remove) modification;
+                  cacheMap.remove(r.getKey());
+                  break;
+               case PURGE_EXPIRED:
+                  purgeExpired();
+                  break;
+               default:
+                  throw new IllegalArgumentException("Unknown modification type " + modification.getType());
+            }
+      }
+   }
+
+
+   private void prepare(List<Modification> mods, Transaction tx, boolean isOnePhase) throws CacheLoaderException {
+      if (isOnePhase) {
+         TransactionRunner runner = new TransactionRunner(env);
+         try {
+            runner.run(new ModificationsTransactionWorker(mods));
+         } catch (Exception e) {
+            e.printStackTrace();  // TODO: Manik: Customise this generated block
+         }
+      } else {
+         PreparableTransactionRunner runner = new PreparableTransactionRunner(env);
+         com.sleepycat.je.Transaction txn = null;
+         try {
+            runner.prepare(new ModificationsTransactionWorker(mods));
+            txn = CurrentTransaction.getInstance(env).getTransaction();
+            txnMap.put(tx, txn);
+         } catch (Exception e) {
+            e.printStackTrace();  // TODO: Manik: Customise this generated block
+         }
+      }
+
+   }
+
+   Map<Transaction, com.sleepycat.je.Transaction> txnMap = new HashMap<Transaction, com.sleepycat.je.Transaction>();
+
+   private void commit(Transaction tx) {
+      com.sleepycat.je.Transaction txn = txnMap.remove(tx);
+      CurrentTransaction currentTransaction = CurrentTransaction.getInstance(env);
+      if (txn != null) {
+         if (currentTransaction.getTransaction() == txn) {
+            try {
+               currentTransaction.commitTransaction();
+            } catch (DatabaseException e) {
+               e.printStackTrace();  // TODO: Manik: Customise this generated block
+            }
+         } else {
+            log.error("Transactions must be committed on the same thread");
+         }
+      }
+   }
+
+   private void rollback(Transaction tx) {
+      com.sleepycat.je.Transaction txn = txnMap.remove(tx);
+      CurrentTransaction currentTransaction = CurrentTransaction.getInstance(env);
+      if (txn != null) {
+         if (currentTransaction.getTransaction() == txn) {
+            try {
+               currentTransaction.abortTransaction();
+            } catch (DatabaseException e) {
+               e.printStackTrace();  // TODO: Manik: Customise this generated block
+            }
+         } else {
+            log.error("Transactions must be committed on the same thread");
+         }
+      }
+   }
+
+   public void testLoadAndStore() throws InterruptedException, CacheLoaderException {
+      assert !cacheMap.containsKey("k");
+      StoredEntry se = new StoredEntry("k", "v", -1, -1);
+      store(se);
+
+      assert load("k").getValue().equals("v");
+      assert load("k").getLifespan() == -1;
+      assert !load("k").isExpired();
+      assert cacheMap.containsKey("k");
+
+      long now = System.currentTimeMillis();
+      long lifespan = 120000;
+      se = new StoredEntry("k", "v", now, now + lifespan);
+      store(se);
+
+      assert load("k").getValue().equals("v");
+      assert load("k").getLifespan() == lifespan;
+      assert !load("k").isExpired();
+      assert cacheMap.containsKey("k");
+
+      now = System.currentTimeMillis();
+      lifespan = 1;
+      se = new StoredEntry("k", "v", now, now + lifespan);
+      store(se);
+      Thread.sleep(100);
+      assert se.isExpired();
+      assert load("k") == null;
+      assert !cacheMap.containsKey("k");
+   }
+
+
+   public void testOnePhaseCommit() throws CacheLoaderException {
+      List<Modification> mods = new ArrayList<Modification>();
+      mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+      mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+      mods.add(new Remove("k1"));
+      Transaction tx = EasyMock.createNiceMock(Transaction.class);
+      prepare(mods, tx, true);
+
+      Set s = loadAll();
+
+      assert load("k2").getValue().equals("v2");
+      assert !cacheMap.containsKey("k1");
+
+      cacheMap.clear();
+
+      mods = new ArrayList<Modification>();
+      mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+      mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+      mods.add(new Clear());
+      mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+      prepare(mods, tx, true);
+      assert !cacheMap.containsKey("k1");
+      assert !cacheMap.containsKey("k2");
+      assert cacheMap.containsKey("k3");
+   }
+
+
+   public void testTwoPhaseCommit() throws Throwable {
+      final List<Throwable> throwables = new ArrayList<Throwable>();
+      List<Modification> mods = new ArrayList<Modification>();
+      mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+      mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+      mods.add(new Remove("k1"));
+      Transaction tx = EasyMock.createNiceMock(Transaction.class);
+      prepare(mods, tx, false);
+
+
+      Thread gets1 = new Thread(
+            new Runnable() {
+               public void run() {
+                  try {
+                     assert load("k2").getValue().equals("v2");
+                     assert !cacheMap.containsKey("k1");
+                  } catch (Throwable e) {
+                     throwables.add(e);
+                  }
+               }
+            }
+      );
+
+      gets1.start();
+      commit(tx);
+
+      gets1.join();
+
+      if (!throwables.isEmpty()) throw throwables.get(0);
+
+
+      cacheMap.clear();
+
+      mods = new ArrayList<Modification>();
+      mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+      mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+      mods.add(new Clear());
+      mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+      prepare(mods, tx, false);
+
+      Thread gets2 = new Thread(
+            new Runnable() {
+               public void run() {
+                  try {
+                     assert !cacheMap.containsKey("k1");
+                     assert !cacheMap.containsKey("k2");
+                     assert cacheMap.containsKey("k3");
+
+                  } catch (Throwable e) {
+                     throwables.add(e);
+                  }
+               }
+            }
+      );
+
+      gets2.start();
+
+
+      commit(tx);
+      gets2.join();
+
+      if (!throwables.isEmpty()) throw throwables.get(0);
+      assert !cacheMap.containsKey("k1");
+      assert !cacheMap.containsKey("k2");
+      assert cacheMap.containsKey("k3");
+   }
+
+
+   public void testRollback() throws Throwable {
+
+      store(new StoredEntry("old", "old", -1, -1));
+
+      List<Modification> mods = new ArrayList<Modification>();
+      mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+      mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+      mods.add(new Remove("k1"));
+      mods.add(new Remove("old"));
+      Transaction tx = EasyMock.createNiceMock(Transaction.class);
+      prepare(mods, tx, false);
+
+//      final List<Throwable> throwables = new ArrayList<Throwable>();
+//
+//      Thread gets1 = new Thread(
+//            new Runnable() {
+//               public void run() {
+//                  try {
+//                     assert !cacheMap.containsKey("k1");
+//                     assert !cacheMap.containsKey("k2");
+//                     assert cacheMap.containsKey("old");
+//                  } catch (Throwable e) {
+//                     throwables.add(e);
+//                  }
+//               }
+//            }
+//      );
+//
+//      gets1.start();
+//      gets1.join();
+//
+//      if (!throwables.isEmpty()) throw throwables.get(0);
+
+      rollback(tx);
+
+      assert !cacheMap.containsKey("k1");
+      assert !cacheMap.containsKey("k2");
+      assert cacheMap.containsKey("old");
+
+      mods = new ArrayList<Modification>();
+      mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+      mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+      mods.add(new Clear());
+      mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+      prepare(mods, tx, false);
+
+//      Thread gets2 = new Thread(
+//            new Runnable() {
+//               public void run() {
+//                  try {
+//                     assert !cacheMap.containsKey("k1");
+//                     assert !cacheMap.containsKey("k2");
+//                     assert !cacheMap.containsKey("k3");
+//                  } catch (Throwable e) {
+//                     throwables.add(e);
+//                  }
+//               }
+//            }
+//      );
+//
+//      gets2.start();
+//      gets2.join();
+//
+//      if (!throwables.isEmpty()) throw throwables.get(0);
+
+
+      rollback(tx);
+
+      assert !cacheMap.containsKey("k1");
+      assert !cacheMap.containsKey("k2");
+      assert !cacheMap.containsKey("k3");
+      assert cacheMap.containsKey("old");
+   }
+
+
+   public void testCommitAndRollbackWithoutPrepare() throws CacheLoaderException {
+      store(new StoredEntry("old", "old", -1, -1));
+      Transaction tx = EasyMock.createNiceMock(Transaction.class);
+      commit(tx);
+      store(new StoredEntry("old", "old", -1, -1));
+      rollback(tx);
+
+      assert cacheMap.containsKey("old");
+   }
+
+   public void testPreload() throws CacheLoaderException {
+      store(new StoredEntry("k1", "v1", -1, -1));
+      store(new StoredEntry("k2", "v2", -1, -1));
+      store(new StoredEntry("k3", "v3", -1, -1));
+
+      Set<StoredEntry> set = loadAll();
+
+      assert set.size() == 3;
+      Set expected = new HashSet();
+      expected.add("k1");
+      expected.add("k2");
+      expected.add("k3");
+      for (StoredEntry se : set) assert expected.remove(se.getKey());
+      assert expected.isEmpty();
+   }
+
+   public void testPurgeExpired() throws Exception {
+      long now = System.currentTimeMillis();
+      long lifespan = 1000;
+      store(new StoredEntry("k1", "v1", now, now + lifespan));
+      store(new StoredEntry("k2", "v2", now, now + lifespan));
+      store(new StoredEntry("k3", "v3", now, now + lifespan));
+      assert cacheMap.containsKey("k1");
+      assert cacheMap.containsKey("k2");
+      assert cacheMap.containsKey("k3");
+      Thread.sleep(lifespan + 100);
+      purgeExpired();
+      assert !cacheMap.containsKey("k1");
+      assert !cacheMap.containsKey("k2");
+      assert !cacheMap.containsKey("k3");
+   }
+
+
+   public void testStreamingAPI() throws IOException, ClassNotFoundException, CacheLoaderException {
+      store(new StoredEntry("k1", "v1", -1, -1));
+      store(new StoredEntry("k2", "v2", -1, -1));
+      store(new StoredEntry("k3", "v3", -1, -1));
+
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      toStream(out);
+      out.close();
+      cacheMap.clear();
+      fromStream(new ByteArrayInputStream(out.toByteArray()));
+
+      Set<StoredEntry> set = loadAll();
+
+      assert set.size() == 3;
+      Set expected = new HashSet();
+      expected.add("k1");
+      expected.add("k2");
+      expected.add("k3");
+      for (StoredEntry se : set) assert expected.remove(se.getKey());
+      assert expected.isEmpty();
+   }
+
+
+   public void testStreamingAPIReusingStreams() throws IOException, ClassNotFoundException, CacheLoaderException {
+      store(new StoredEntry("k1", "v1", -1, -1));
+      store(new StoredEntry("k2", "v2", -1, -1));
+      store(new StoredEntry("k3", "v3", -1, -1));
+
+      ByteArrayOutputStream out = new ByteArrayOutputStream();
+      byte[] dummyStartBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+      byte[] dummyEndBytes = {8, 7, 6, 5, 4, 3, 2, 1};
+      out.write(dummyStartBytes);
+      toStream(out);
+      out.write(dummyEndBytes);
+      out.close();
+      cacheMap.clear();
+
+      // first pop the start bytes
+      byte[] dummy = new byte[8];
+      ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+      int bytesRead = in.read(dummy, 0, 8);
+      assert bytesRead == 8;
+      for (int i = 1; i < 9; i++) assert dummy[i - 1] == i : "Start byte stream corrupted!";
+      fromStream(in);
+      bytesRead = in.read(dummy, 0, 8);
+      assert bytesRead == 8;
+      for (int i = 8; i > 0; i--) assert dummy[8 - i] == i : "Start byte stream corrupted!";
+
+      Set<StoredEntry> set = loadAll();
+
+      assert set.size() == 3;
+      Set expected = new HashSet();
+      expected.add("k1");
+      expected.add("k2");
+      expected.add("k3");
+      for (StoredEntry se : set) assert expected.remove(se.getKey());
+      assert expected.isEmpty();
+   }
+
+   public void testConcurrency() throws Throwable {
+      int numThreads = 3;
+      final int loops = 500;
+      final String[] keys = new String[10];
+      final String[] values = new String[10];
+      for (int i = 0; i < 10; i++) keys[i] = "k" + i;
+      for (int i = 0; i < 10; i++) values[i] = "v" + i;
+
+
+      final Random r = new Random();
+      final List<Throwable> throwables = new LinkedList<Throwable>();
+
+      final Runnable store = new Runnable() {
+         public void run() {
+            try {
+               int randomInt = r.nextInt(10);
+               store(new StoredEntry(keys[randomInt], values[randomInt]));
+            } catch (Throwable e) {
+               throwables.add(e);
+            }
+         }
+      };
+
+      final Runnable remove = new Runnable() {
+         public void run() {
+            try {
+               cacheMap.remove(keys[r.nextInt(10)]);
+            } catch (Throwable e) {
+               throwables.add(e);
+            }
+         }
+      };
+
+      final Runnable get = new Runnable() {
+         public void run() {
+            try {
+               int randomInt = r.nextInt(10);
+               StoredEntry se = load(keys[randomInt]);
+               assert se == null || se.getValue().equals(values[randomInt]);
+               loadAll();
+            } catch (Throwable e) {
+               throwables.add(e);
+            }
+         }
+      };
+
+      Thread[] threads = new Thread[numThreads];
+
+      for (int i = 0; i < numThreads; i++) {
+         threads[i] = new Thread(getClass().getSimpleName() + "-" + i) {
+            public void run() {
+               for (int i = 0; i < loops; i++) {
+                  store.run();
+                  remove.run();
+                  get.run();
+               }
+            }
+         };
+      }
+
+      for (Thread t : threads) t.start();
+      for (Thread t : threads) t.join();
+
+      if (!throwables.isEmpty()) throw throwables.get(0);
+   }
+
+
+}
+

Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java	2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,111 @@
+package org.horizon.loader.bdbje;
+
+import static org.easymock.classextension.EasyMock.*;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Clear;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.PurgeExpired;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+
+/**
+ * Unit tests that cover {@link  ModificationsTransactionWorker }
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+ at Test(groups = "unit", enabled = true, testName = "loader.bdbje.ModificationsTransactionWorkerTest")
+public class ModificationsTransactionWorkerTest {
+
+   @Test
+   public void testDoWorkOnUnsupportedModification() {
+      //TODO: we currently support all modifications...
+   }
+
+   @Test
+   public void testDoWorkOnStore() throws Exception {
+      CacheStore cs = createMock(CacheStore.class);
+      Store store = createMock(Store.class);
+      StoredEntry entry = new StoredEntry("1", "2");
+      expect(store.getType()).andReturn(Modification.Type.STORE);
+      expect(store.getStoredEntry()).andReturn(entry);
+      cs.store(entry);
+      replay(cs);
+      replay(store);
+
+      ModificationsTransactionWorker worker =
+            new ModificationsTransactionWorker(cs,
+                                               Collections.singletonList(store));
+      worker.doWork();
+      verify(cs);
+      verify(store);
+
+   }
+
+   @Test
+   public void testDoWorkOnRemove() throws Exception {
+      CacheStore cs = createMock(CacheStore.class);
+      Remove store = createMock(Remove.class);
+      expect(store.getType()).andReturn(Modification.Type.REMOVE);
+      expect(store.getKey()).andReturn("1");
+      expect(cs.remove("1")).andReturn(true);
+      replay(cs);
+      replay(store);
+
+      ModificationsTransactionWorker worker =
+            new ModificationsTransactionWorker(cs,
+                                               Collections.singletonList(store));
+      worker.doWork();
+      verify(cs);
+      verify(store);
+
+   }
+
+   @Test
+   public void testDoWorkOnClear() throws Exception {
+      CacheStore cs = createMock(CacheStore.class);
+      Clear clear = createMock(Clear.class);
+      expect(clear.getType()).andReturn(Modification.Type.CLEAR);
+      cs.clear();
+      replay(cs);
+      replay(clear);
+
+      ModificationsTransactionWorker worker =
+            new ModificationsTransactionWorker(cs,
+                                               Collections.singletonList(clear));
+      worker.doWork();
+      verify(cs);
+      verify(clear);
+   }
+
+   @Test
+   public void testDoWorkOnPurgeExpired() throws Exception {
+      CacheStore cs = createMock(CacheStore.class);
+      PurgeExpired purge = createMock(PurgeExpired.class);
+      expect(purge.getType()).andReturn(Modification.Type.PURGE_EXPIRED);
+      cs.purgeExpired();
+      replay(cs);
+      replay(purge);
+
+      ModificationsTransactionWorker worker =
+            new ModificationsTransactionWorker(cs,
+                                               Collections.singletonList(purge));
+      worker.doWork();
+      verify(cs);
+      verify(purge);
+   }
+
+
+//   case REMOVE:
+//      Remove r = (Remove) modification;
+//      cs.remove(r.getKey());
+//      break;
+//   default:
+//      throw new IllegalArgumentException("Unknown modification type " + modification.getType());
+
+}

Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java	2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,239 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.TransactionWorker;
+import com.sleepycat.je.DeadlockException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.classextension.EasyMock.*;
+import org.horizon.loader.CacheLoaderException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests that cover {@link  PreparableTransactionRunner }
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+ at Test(groups = "unit", enabled = true, testName = "loader.bdbje.PreparableTransactionRunnerTest")
+public class PreparableTransactionRunnerTest {
+   PreparableTransactionRunner runner;
+   Environment env;
+   EnvironmentConfig config;
+   TransactionWorker worker;
+   Transaction transaction;
+
+   @BeforeMethod
+   public void setUp() throws Exception {
+      config = createMock(EnvironmentConfig.class);
+      expect(config.getTransactional()).andReturn(true);
+      expect(config.getLocking()).andReturn(true);
+      transaction = createMock(Transaction.class);
+      env = createMock(Environment.class);
+      expect(env.getConfig()).andReturn(config);
+      expect(env.beginTransaction(null, null)).andReturn(transaction);
+      worker = createMock(TransactionWorker.class);
+   }
+
+   @AfterMethod
+   public void tearDown() throws CacheLoaderException {
+      runner = null;
+      env = null;
+      config = null;
+   }
+
+
+   @Test
+   public void testMoreDeadlocks() throws Exception {
+      worker.doWork();
+      expectLastCall().andThrow(new DeadlockException());
+      transaction.abort();
+      expect(env.beginTransaction(null, null)).andReturn(transaction);
+      worker.doWork();
+      expectLastCall().andThrow(new DeadlockException());
+      transaction.abort();
+      expect(env.beginTransaction(null, null)).andReturn(transaction);
+      worker.doWork();
+      expectLastCall().andThrow(new DeadlockException());
+      transaction.abort();
+      replayAll();
+      runner = new PreparableTransactionRunner(env, 2, null);
+      try {
+         runner.prepare(worker);
+         assert false : "should have gotten a deadlock exception";
+      } catch (DeadlockException e) {
+
+      }
+      verifyAll();
+   }
+
+   @Test
+   public void testPrepare() throws Exception {
+
+      worker.doWork();
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+      runner.prepare(worker);
+      verifyAll();
+   }
+
+   @Test
+   public void testRun() throws Exception {
+      transaction.commit();
+      worker.doWork();
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+      runner.run(worker);
+      verifyAll();
+   }
+
+
+   @Test
+   public void testOneArgConstructorSetsCurrentTxn() throws Exception {
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+      assert CurrentTransaction.getInstance(env) == runner.currentTxn;
+   }
+
+   @Test
+   public void testSetMaxRetries() throws Exception {
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+      runner.setMaxRetries(1);
+      assert runner.getMaxRetries() == 1;
+   }
+
+   @Test
+   public void testSetAllowNestedTransactions() throws Exception {
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+      runner.setAllowNestedTransactions(false);
+      assert !runner.getAllowNestedTransactions();
+      try {
+         runner.setAllowNestedTransactions(true);
+         assert false : "should have gotten Exception";
+      } catch (UnsupportedOperationException e) {}
+   }
+
+   @Test
+   public void testGetTransactionConfig() throws Exception {
+      replayAll();
+      TransactionConfig config = new TransactionConfig();
+      runner = new PreparableTransactionRunner(env);
+      runner.setTransactionConfig(config);
+      assert runner.getTransactionConfig().equals(config);
+   }
+
+
+   @Test
+   public void testExceptionThrownInPrepare() throws Exception {
+
+      worker.doWork();
+      expectLastCall().andThrow(new RuntimeException());
+      transaction.abort();
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+
+      try {
+         runner.prepare(worker);
+         assert false : "should have gotten an exception";
+      } catch (RuntimeException e) {
+
+      }
+      verifyAll();
+   }
+
+   @Test
+   public void testErrorThrownInPrepare() throws Exception {
+
+      worker.doWork();
+      expectLastCall().andThrow(new Error());
+      transaction.abort();
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+
+      try {
+         runner.prepare(worker);
+         assert false : "should have gotten an exception";
+      } catch (Error e) {
+
+      }
+      verifyAll();
+   }
+
+
+   @Test
+   public void testExceptionThrownInRun() throws Exception {
+
+      worker.doWork();
+      expectLastCall().andThrow(new RuntimeException());
+      transaction.abort();
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+
+      try {
+         runner.prepare(worker);
+         assert false : "should have gotten an exception";
+      } catch (RuntimeException e) {
+
+      }
+      verifyAll();
+   }
+
+   @Test
+   public void testErrorThrownInRun() throws Exception {
+
+      worker.doWork();
+      expectLastCall().andThrow(new Error());
+      transaction.abort();
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+
+      try {
+         runner.run(worker);
+         assert false : "should have gotten an exception";
+      } catch (Error e) {
+
+      }
+      verifyAll();
+   }
+
+
+   public void testRethrowIfNotDeadLockDoesntThrowWhenGivenDeadlockException() throws Exception {
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+      runner.rethrowIfNotDeadLock(createNiceMock(DeadlockException.class));
+   }
+
+   public void testThrowableDuringAbort() throws Exception {
+      transaction.abort();
+      expectLastCall().andThrow(new RuntimeException());
+      replayAll();
+      runner = new PreparableTransactionRunner(env);
+      CurrentTransaction.getInstance(env).beginTransaction(null);
+      int max = runner.abortOverflowingCurrentTriesOnError(transaction, 2);
+      assert max == Integer.MAX_VALUE : "should have overflowed max tries, but got " + max;
+      verifyAll();
+   }
+
+   private void replayAll() {
+      replay(config);
+      replay(env);
+      replay(transaction);
+      replay(worker);
+   }
+
+   private void verifyAll() {
+      verify(config);
+      verify(env);
+      verify(transaction);
+      verify(worker);
+   }
+}




More information about the jbosscache-commits mailing list