JBoss Cache SVN: r7818 - core/branches/flat/src/test/java/org/horizon/loader.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-02 05:33:55 -0500 (Mon, 02 Mar 2009)
New Revision: 7818
Modified:
core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
Log:
added removeAll testcase
Modified: core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-02 10:18:44 UTC (rev 7817)
+++ core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-02 10:33:55 UTC (rev 7818)
@@ -216,7 +216,7 @@
final Transaction tx = EasyMock.createNiceMock(Transaction.class);
cs.prepare(mods, tx, false);
- Thread t = new Thread(new Runnable(){
+ Thread t = new Thread(new Runnable() {
public void run() {
cs.rollback(tx);
}
@@ -237,7 +237,7 @@
cs.prepare(mods, tx, false);
- Thread t2 = new Thread(new Runnable(){
+ Thread t2 = new Thread(new Runnable() {
public void run() {
cs.rollback(tx);
}
@@ -277,6 +277,37 @@
assert expected.isEmpty();
}
+ @Test
+ public void testStoreAndRemoveAll() throws CacheLoaderException {
+ cs.store(new StoredEntry("k1", "v1", -1, -1));
+ cs.store(new StoredEntry("k2", "v2", -1, -1));
+ cs.store(new StoredEntry("k3", "v3", -1, -1));
+ cs.store(new StoredEntry("k4", "v4", -1, -1));
+
+
+ Set<StoredEntry> set = cs.loadAll();
+
+ assert set.size() == 4;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ expected.add("k4");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+
+ Set toRemove = new HashSet();
+ toRemove.add("k1");
+ toRemove.add("k2");
+ toRemove.add("k3");
+ cs.removeAll(toRemove);
+
+ set = cs.loadAll();
+ assert set.size() == 1;
+ set.remove("k4");
+ assert expected.isEmpty();
+ }
+
public void testPurgeExpired() throws Exception {
long now = System.currentTimeMillis();
long lifespan = 1000;
15 years, 10 months
JBoss Cache SVN: r7817 - in core/branches/flat/src/test/java/org/horizon/loader: bdbje and 1 other directory.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-02 05:18:44 -0500 (Mon, 02 Mar 2009)
New Revision: 7817
Modified:
core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java
Log:
reverted redundant test case definition
Modified: core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-02 09:46:47 UTC (rev 7816)
+++ core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-02 10:18:44 UTC (rev 7817)
@@ -131,38 +131,10 @@
mods.add(new Remove("k1"));
Transaction tx = EasyMock.createNiceMock(Transaction.class);
cs.prepare(mods, tx, false);
- cs.commit(tx);
- assert cs.load("k2").getValue().equals("v2");
assert !cs.containsKey("k1");
-
- cs.clear();
-
- mods = new ArrayList<Modification>();
- mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
- mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
- mods.add(new Clear());
- mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
-
- cs.prepare(mods, tx, false);
- cs.commit(tx);
-
- assert !cs.containsKey("k1");
assert !cs.containsKey("k2");
- assert cs.containsKey("k3");
- }
- public void testTwoPhaseCommitReadCommitted() throws CacheLoaderException {
- List<Modification> mods = new ArrayList<Modification>();
- mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
- mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
- mods.add(new Remove("k1"));
- Transaction tx = EasyMock.createNiceMock(Transaction.class);
- cs.prepare(mods, tx, false);
-
- assert !cs.containsKey("k1");
- assert !cs.containsKey("k2");
-
cs.commit(tx);
assert cs.load("k2").getValue().equals("v2");
@@ -189,6 +161,7 @@
assert cs.containsKey("k3");
}
+
public void testRollback() throws CacheLoaderException {
cs.store(new StoredEntry("old", "old", -1, -1));
@@ -200,49 +173,17 @@
mods.add(new Remove("old"));
Transaction tx = EasyMock.createNiceMock(Transaction.class);
cs.prepare(mods, tx, false);
- cs.rollback(tx);
assert !cs.containsKey("k1");
assert !cs.containsKey("k2");
assert cs.containsKey("old");
- mods = new ArrayList<Modification>();
- mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
- mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
- mods.add(new Clear());
- mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
-
- cs.prepare(mods, tx, false);
cs.rollback(tx);
assert !cs.containsKey("k1");
assert !cs.containsKey("k2");
- assert !cs.containsKey("k3");
assert cs.containsKey("old");
- }
- public void testRollbackReadCommitted() throws CacheLoaderException {
-
- cs.store(new StoredEntry("old", "old", -1, -1));
-
- List<Modification> mods = new ArrayList<Modification>();
- mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
- mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
- mods.add(new Remove("k1"));
- mods.add(new Remove("old"));
- Transaction tx = EasyMock.createNiceMock(Transaction.class);
- cs.prepare(mods, tx, false);
-
- assert !cs.containsKey("k1");
- assert !cs.containsKey("k2");
- assert cs.containsKey("old");
-
- cs.rollback(tx);
-
- assert !cs.containsKey("k1");
- assert !cs.containsKey("k2");
- assert cs.containsKey("old");
-
mods = new ArrayList<Modification>();
mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
Modified: core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java 2009-03-02 09:46:47 UTC (rev 7816)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java 2009-03-02 10:18:44 UTC (rev 7817)
@@ -1,11 +1,21 @@
package org.horizon.loader.bdbje;
+import org.easymock.EasyMock;
import org.horizon.loader.BaseCacheStoreTest;
import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Clear;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
import org.horizon.test.TestingUtil;
import org.testng.annotations.Test;
+import javax.transaction.Transaction;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* @author Adrian Cole
* @version $Id: $
@@ -27,14 +37,72 @@
return cs;
}
+ /**
+ * this is the same as the superclass, except that it doesn't attempt read-committed
+ */
@Override
- public void testTwoPhaseCommitReadCommitted() throws CacheLoaderException {
- // this depends on READ_COMMTTED, which is not supported on sleepycat
+ public void testTwoPhaseCommit() throws CacheLoaderException {
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ cs.prepare(mods, tx, false);
+ cs.commit(tx);
+
+ assert cs.load("k2").getValue().equals("v2");
+ assert !cs.containsKey("k1");
+
+ cs.clear();
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ cs.prepare(mods, tx, false);
+ cs.commit(tx);
+
+ assert !cs.containsKey("k1");
+ assert !cs.containsKey("k2");
+ assert cs.containsKey("k3");
}
+ /**
+ * this is the same as the superclass, except that it doesn't attempt read-committed
+ */
@Override
- public void testRollbackReadCommitted() throws CacheLoaderException {
- // this depends on READ_COMMTTED, which is not supported on sleepycat
+ public void testRollback() throws CacheLoaderException {
+
+ cs.store(new StoredEntry("old", "old", -1, -1));
+
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ mods.add(new Remove("old"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ cs.prepare(mods, tx, false);
+ cs.rollback(tx);
+
+ assert !cs.containsKey("k1");
+ assert !cs.containsKey("k2");
+ assert cs.containsKey("old");
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ cs.prepare(mods, tx, false);
+ cs.rollback(tx);
+
+ assert !cs.containsKey("k1");
+ assert !cs.containsKey("k2");
+ assert !cs.containsKey("k3");
+ assert cs.containsKey("old");
}
}
15 years, 10 months
JBoss Cache SVN: r7816 - core/branches/flat/src/main/java/org/horizon/logging.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-02 04:46:47 -0500 (Mon, 02 Mar 2009)
New Revision: 7816
Modified:
core/branches/flat/src/main/java/org/horizon/logging/Log.java
Log:
Javadocs
Modified: core/branches/flat/src/main/java/org/horizon/logging/Log.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/logging/Log.java 2009-03-02 09:32:32 UTC (rev 7815)
+++ core/branches/flat/src/main/java/org/horizon/logging/Log.java 2009-03-02 09:46:47 UTC (rev 7816)
@@ -16,13 +16,11 @@
* <p/>
* ... could be replaced with ...
* <p/>
- * <code> log.trace("This is a message {0} and some other value is {1}", message, value); </code>
+ * <code> if (log.isTraceEnabled()) log.trace("This is a message {0} and some other value is {1}", message, value);
+ * </code>
* <p/>
- * with no performance penalty for when trace is not enabled. This greatly enhances code readability.
+ * This greatly enhances code readability.
* <p/>
- * However, if <tt>message</tt> or <tt>value</tt> need to be calculated especially for the log message, then the
- * <tt>isTraceEnabled()</tt> paradigm should still be used.
- * <p/>
* If you are passing a <tt>Throwable</tt>, note that this should be passed in <i>before</i> the vararg parameter list.
* <p/>
*
15 years, 10 months
JBoss Cache SVN: r7815 - core/branches/flat/src/main/java/org/horizon/loader.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-02 04:32:32 -0500 (Mon, 02 Mar 2009)
New Revision: 7815
Modified:
core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
Log:
Javadocs
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-03-02 00:43:57 UTC (rev 7814)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-03-02 09:32:32 UTC (rev 7815)
@@ -37,6 +37,9 @@
* markers or some other mechanism to prevent the store from reading too much information should be employed when
* writing to the stream in {@link #fromStream(java.io.ObjectInput)} to prevent data corruption.
* <p/>
+ * It can be assumed that the stream passed in already performs buffering such that the cache store implementation
+ * doesn't have to.
+ * <p/>
*
* @param inputStream stream to read from
* @throws CacheLoaderException in the event of problems writing to the store
@@ -57,6 +60,10 @@
* markers or some other mechanism to prevent the store from reading too much information in {@link
* #fromStream(java.io.ObjectInput)} should be employed, to prevent data corruption.
* <p/>
+ * <p/>
+ * It can be assumed that the stream passed in already performs buffering such that the cache store implementation
+ * doesn't have to.
+ * <p/>
*
* @param outputStream stream to write to
* @throws CacheLoaderException in the event of problems reading from the store
15 years, 10 months
JBoss Cache SVN: r7814 - in core/branches/flat/src: main/java/org/horizon/loader/bdbje and 2 other directories.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-01 19:43:57 -0500 (Sun, 01 Mar 2009)
New Revision: 7814
Added:
core/branches/flat/src/main/java/org/horizon/loader/bdbje/
core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java
core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java
core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java
Log:
added bdbje loader support
Added: core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java 2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,706 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.serial.SerialBinding;
+import com.sleepycat.bind.serial.StoredClassCatalog;
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.StoredMap;
+import com.sleepycat.je.*;
+import com.sleepycat.util.ExceptionUnwrapper;
+import org.horizon.Cache;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
+import org.horizon.util.ReflectionUtil;
+import org.horizon.util.concurrent.WithinThreadExecutor;
+
+import java.io.File;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * An Oracle SleepyCat JE implementation of a {@link org.horizon.loader.CacheStore}. <p/>This implementation uses two
+ * databases <ol> <li>stored entry database: <tt>/{location}/CacheInstance-{@link org.horizon.Cache#getName()
+ * name}</tt></li> {@link StoredEntry stored entries} are stored here, keyed on {@link
+ * org.horizon.loader.StoredEntry#getKey()} <li>class catalog database: <tt>/{location}/CacheInstance-{@link
+ * org.horizon.Cache#getName() name}_class_catalog</tt></li> class descriptions are stored here for efficiency reasons.
+ * </ol>
+ * <p/>
+ * <p/>
+ * All data access is transactional. Any attempted reads to locked records will block. The maximum duration of this is
+ * set in microseconds via the parameter {@link org.horizon.loader.bdbje.BdbjeCacheStoreConfig#getLockAcquistionTimeoutMicros()}.
+ * Calls to {@link BdbjeCacheStore#prepare(java.util.List, javax.transaction.Transaction, boolean) prepare} will attempt
+ * to resolve deadlocks, retrying up to {@link org.horizon.loader.bdbje.BdbjeCacheStoreConfig#getMaxTxRetries()}
+ * attempts.
+ * <p/>
+ * Unlike the C version of SleepyCat, JE does not support MVCC or READ_COMMITTED isolation. In other words, readers
+ * will block on any data held by a pending transaction. As such, it is best practice to keep the duration between
+ * <code>prepare</code> and <code>commit</code> as short as possible.
+ * <p/>
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class BdbjeCacheStore implements CacheStore {
+
+ private static final Log log = LogFactory.getLog(BdbjeCacheStore.class);
+
+ private BdbjeCacheStoreConfig cfg;
+ private Cache cache;
+ private Marshaller m;
+
+ private Environment env;
+ private String cacheDbName;
+ private String catalogDbName;
+ private StoredClassCatalog catalog;
+ private Database cacheDb;
+ private StoredMap<Object, StoredEntry> cacheMap;
+
+ private PreparableTransactionRunner transactionRunner;
+ private Map<javax.transaction.Transaction, Transaction> txnMap;
+ private ExecutorService purgerService;
+ private CurrentTransaction currentTransaction;
+
+ /**
+ * {@inheritDoc} This implementation expects config to be an instance of {@link BdbjeCacheStoreConfig}
+ *
+ * @see BdbjeCacheStoreConfig
+ */
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ log.trace("initializing BdbjeCacheStore");
+ printLicense();
+ this.cfg = (BdbjeCacheStoreConfig) config;
+ this.cache = cache;
+ this.m = m;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return {@link BdbjeCacheStoreConfig}
+ */
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return BdbjeCacheStoreConfig.class;
+ }
+
+ /**
+ * {@inheritDoc} Validates configuration, configures and opens the {@link Environment}, then {@link
+ * org.horizon.loader.bdbje.BdbjeCacheStore#openDatabases() opens the databases}. When this is finished,
+ * transactional services are instantiated.
+ */
+ public void start() throws CacheLoaderException {
+ log.trace("starting BdbjeCacheStore");
+ checkNotOpen();
+
+ if (cache == null) {
+ throw new IllegalStateException(
+ "A non-null Cache property (CacheSPI object) is required");
+ }
+
+ String configStr = cfg.getLocation();
+
+ if (cfg.isPurgeSynchronously()) {
+ purgerService = new WithinThreadExecutor();
+ } else {
+ purgerService = Executors.newSingleThreadExecutor();
+ }
+
+ // JBCACHE-1448 db name parsing fix courtesy of Ciro Cavani
+ /* Parse config string. */
+ int offset = configStr.indexOf('#');
+ if (offset >= 0 && offset < configStr.length() - 1) {
+ cacheDbName = configStr.substring(offset + 1);
+ configStr = configStr.substring(0, offset);
+ } else {
+ cacheDbName = cache.getName();
+ if (cacheDbName == null) cacheDbName = "CacheInstance-" + System.identityHashCode(cache);
+ }
+
+ // datafile location
+ File location = new File(configStr);
+ if (!location.exists()) {
+ boolean created = location.mkdirs();
+ if (!created) throw new CacheLoaderException("Unable to create cache loader location " + location);
+
+ }
+ if (!location.isDirectory()) {
+ throw new CacheLoaderException("Cache loader location [" + location + "] is not a directory!");
+ }
+
+ catalogDbName = cacheDbName + "_class_catalog";
+
+ try {
+ /* Open the environment, creating it if it doesn't exist. */
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setAllowCreate(true);
+ envConfig.setTransactional(true);
+ envConfig.setLockTimeout(cfg.getLockAcquistionTimeoutMicros());
+ if (log.isDebugEnabled()) {
+ envConfig.setConfigParam("je.txn.deadlockStackTrace", "true");
+ envConfig.setConfigParam("je.txn.dumpLocks", "true");
+ }
+ log.trace("creating je environment with home dir {0}", location);
+ env = new Environment(location, envConfig);
+ log.debug("created je environment {0} for cache store {1}", env, this);
+ /* Open cache and catalog databases. */
+ openDatabases();
+ }
+ catch (DatabaseException e) {
+ throw new CacheLoaderException("could not open the sleepycat database", e);
+ }
+ txnMap = new ConcurrentHashMap<javax.transaction.Transaction, Transaction>();
+ currentTransaction = CurrentTransaction.getInstance(env);
+ transactionRunner = new PreparableTransactionRunner(env);
+ transactionRunner.setMaxRetries(cfg.getMaxTxRetries());
+ log.debug("started cache store {1}", this);
+ }
+
+
+ /**
+ * Opens all databases and initializes database related information. A {@link StoredMap} instance is {@link
+ * BdbjeCacheStore#createStoredMapViewOfDatabase(com.sleepycat.je.Database, com.sleepycat.bind.serial.StoredClassCatalog)
+ * associated} with the stored entry and class catalog databases.
+ */
+ private void openDatabases() throws DatabaseException {
+ log.trace("opening databases");
+ /* Use a generic database config, with no duplicates allowed. */
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+
+ log.trace("opening or creating stored entry database {0}", cacheDbName);
+ cacheDb = env.openDatabase(null, cacheDbName, dbConfig);
+ log.debug("opened stored entry database {0}", cacheDbName);
+
+ log.trace("opening or creating class catalog database {0}", catalogDbName);
+ Database catalogDb = env.openDatabase(null, catalogDbName, dbConfig);
+ catalog = new StoredClassCatalog(catalogDb);
+ log.debug("created stored class catalog from database {0}", catalogDbName);
+
+ cacheMap = createStoredMapViewOfDatabase(cacheDb, catalog);
+ }
+
+ /**
+ * create a {@link StoredMap} persisted by the <code>database</code>
+ *
+ * @param database where entries in the StoredMap are persisted
+ * @param classCatalog location to store class descriptions
+ * @return StoredMap backed by the database and classCatalog
+ * @throws DatabaseException if the StoredMap cannot be opened.
+ */
+ private StoredMap createStoredMapViewOfDatabase(Database database, StoredClassCatalog classCatalog) throws DatabaseException {
+ EntryBinding storedEntryKeyBinding =
+ new SerialBinding(classCatalog, Object.class);
+ EntryBinding storedEntryValueBinding =
+ new SerialBinding(classCatalog, StoredEntry.class);
+ try {
+ return new StoredMap<Object, StoredEntry>(database,
+ storedEntryKeyBinding, storedEntryValueBinding, true);
+ } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new DatabaseException("error opening stored map", caught);
+ }
+ }
+
+ /**
+ * Closes the JE databases and their associated {@link StoredMap}, and nulls references to them. The databases are
+ * not removed from the file system. Exceptions during close are ignored.
+ */
+ private void closeDatabases() {
+ log.trace("closing databases");
+ try {
+ cacheDb.close();
+ catalog.close();
+ } catch (DatabaseException e) {
+ log.error("Error closing databases", e);
+ }
+ cacheDb = null;
+ catalog = null;
+ cacheMap = null;
+ }
+
+
+ /**
+ * {@link org.horizon.loader.bdbje.BdbjeCacheStore#closeDatabases() Closes the JE databases} and the {@link
+ * Environment}. The environment and databases are not removed from the file system. Exceptions during close are
+ * ignored.
+ */
+ public void stop() throws CacheLoaderException {
+ checkOpen();
+ log.trace("stopping BdbjeCacheStore");
+ transactionRunner = null;
+ currentTransaction = null;
+ txnMap = null;
+ closeDatabases();
+ if (env != null) {
+ try {
+ env.close();
+ }
+ catch (Exception shouldNotOccur) {
+ log.warn("Unexpected exception closing cacheStore", shouldNotOccur);
+ }
+ }
+ env = null;
+ }
+
+ /**
+ * {@inheritDoc} delegates to {@link BdbjeCacheStore#onePhaseCommit(java.util.List)}, if <code>isOnePhase</code>.
+ * Otherwise, delegates to {@link BdbjeCacheStore#prepare(java.util.List, javax.transaction.Transaction) prepare}.
+ */
+ public void prepare(List<? extends Modification> mods, javax.transaction.Transaction tx, boolean isOnePhase) throws CacheLoaderException {
+ if (isOnePhase) {
+ onePhaseCommit(mods);
+ } else {
+ prepare(mods, tx);
+ }
+ }
+
+ /**
+ * Perform the <code>mods</code> atomically by creating a {@link ModificationsTransactionWorker worker} and invoking
+ * them in {@link PreparableTransactionRunner#run(com.sleepycat.collections.TransactionWorker)}.
+ *
+ * @param mods actions to perform atomically
+ * @throws CacheLoaderException on problems during the transaction
+ */
+ protected void onePhaseCommit(List<? extends Modification> mods) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(mods, "modifications");
+ log.debug("performing one phase transaction");
+ try {
+ transactionRunner.run(new ModificationsTransactionWorker(this, mods));
+ } catch (Exception e) {
+ throw new CacheLoaderException("Problem committing modifications: " + mods, e);
+ }
+ }
+
+ /**
+ * Looks up the {@link Transaction SleepyCat transaction} associated with <code>tx</code>. Creates a {@link
+ * org.horizon.loader.bdbje.ModificationsTransactionWorker} instance from <code>mods</code>. Then prepares the
+ * transaction via {@link PreparableTransactionRunner#prepare(com.sleepycat.collections.TransactionWorker)}.
+ *
+ * @param mods modifications to be applied
+ * @param tx transaction identifier
+ * @throws CacheLoaderException in the event of problems writing to the store
+ */
+ protected void prepare(List<? extends Modification> mods, javax.transaction.Transaction tx) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(mods, "modifications");
+ checkNonNull(tx, "tx");
+ log.debug("preparing transaction {0}", tx);
+ try {
+ transactionRunner.prepare(new ModificationsTransactionWorker(this, mods));
+ Transaction txn = currentTransaction.getTransaction();
+ log.trace("transaction {0} == sleepycat transaction {1}", tx, txn);
+ txnMap.put(tx, txn);
+ ReflectionUtil.setValue(currentTransaction, "localTrans", new ThreadLocal());
+ } catch (Exception e) {
+ throw new CacheLoaderException("Problem preparing transaction", e);
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * This implementation calls {@link BdbjeCacheStore#completeTransaction(javax.transaction.Transaction, boolean)
+ * completeTransaction} with an argument of false.
+ */
+ public void rollback(javax.transaction.Transaction tx) {
+ try {
+ completeTransaction(tx, false);
+ } catch (Exception e) {
+ log.error("Error rolling back transaction", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * This implementation calls {@link BdbjeCacheStore#completeTransaction(javax.transaction.Transaction, boolean)
+ * completeTransaction} with an argument of true.
+ */
+ public void commit(javax.transaction.Transaction tx) throws CacheLoaderException {
+ completeTransaction(tx, true);
+ }
+
+ /**
+ * Looks up the SleepyCat transaction associated with the parameter <code>tx</code>. If there is no associated
+ * sleepycat transaction, an error is logged. If this transaction is the {@link
+ * com.sleepycat.collections.CurrentTransaction#getTransaction()} current transaction}, it calls {@link
+ * BdbjeCacheStore#completeCurrentTransaction(boolean)} passing the argument <code>commit</code>. Otherwise, {@link
+ * BdbjeCacheStore#completeTransaction(com.sleepycat.je.Transaction, boolean) completeTransaction} is called, passing
+ * the SleepyCat transaction and <code>commit</code> as arguments.
+ *
+ * @param tx java transaction used to lookup a SleepyCat transaction
+ * @param commit true to commit false to abort
+ * @throws CacheLoaderException if there are problems committing or aborting the transaction
+ */
+ protected void completeTransaction(javax.transaction.Transaction tx, boolean commit) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(tx, "tx");
+ Transaction txn = txnMap.remove(tx);
+ if (txn != null) {
+ log.debug("transaction {0} == sleepycat transaction {1}", tx, txn);
+ if (currentTransaction.getTransaction() == txn) {
+ completeCurrentTransaction(commit);
+ } else {
+ completeTransaction(txn, commit);
+ }
+ } else {
+ log.error("no sleepycat transaction associated transaction {0}", tx);
+ }
+ }
+
+ /**
+ * commits or aborts the {@link Transaction}
+ *
+ * @param commit true to commit, false to abort
+ * @throws CacheLoaderException if there was a problem completing the transaction
+ */
+ private void completeTransaction(Transaction txn, boolean commit) throws CacheLoaderException {
+ try {
+ log.debug("{0} sleepycat transaction {1}", commit ? "committing" : "aborting", txn);
+ if (commit)
+ txn.commit();
+ else
+ txn.abort();
+ } catch (DatabaseException e) {
+ throw new CacheLoaderException("Problem completing transaction", e);
+ }
+ }
+
+ /**
+ * commits or aborts the {@link com.sleepycat.collections.CurrentTransaction#getTransaction() current transaction}
+ *
+ * @param commit true to commit, false to abort
+ * @throws CacheLoaderException if there was a problem completing the transaction
+ */
+ private void completeCurrentTransaction(boolean commit) throws CacheLoaderException {
+ try {
+ log.debug("{0} current sleepycat transaction {1}", commit ? "committing" : "aborting", currentTransaction.getTransaction());
+ if (commit)
+ currentTransaction.commitTransaction();
+ else
+ currentTransaction.abortTransaction();
+ } catch (DatabaseException e) {
+ throw new CacheLoaderException("Problem completing transaction", e);
+ }
+ }
+
+ /**
+ * {@inheritDoc} This implementation delegates to {@link BdbjeCacheStore#load(Object)}, to ensure that a response is
+ * returned only if the entry is not expired.
+ */
+ public boolean containsKey(Object key) throws CacheLoaderException {
+ return load(key) != null;
+ }
+
+ /**
+ * {@inheritDoc} This implementation delegates to {@link StoredMap#remove(Object)}
+ */
+ public boolean remove(Object key) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(key, "key");
+ log.trace("Removing key {0}", key);
+ try {
+ if (cacheMap.containsKey(key)) {
+ cacheMap.remove(key);
+ return true;
+ }
+ } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new CacheLoaderException("error removing key " + key, caught);
+ }
+ return false;
+ }
+
+ /**
+ * {@inheritDoc} This implementation removes the <code>keys</code> atomically by creating a list of {@link Remove}
+ * modifications and passing that to {@link BdbjeCacheStore#onePhaseCommit(java.util.List)}.
+ */
+ public void removeAll(Set<Object> keys) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(keys, "keys");
+ List<Remove> toRemove = new ArrayList<Remove>();
+ for (Object key : keys) {
+ toRemove.add(new Remove(key));
+ }
+ onePhaseCommit(toRemove);
+ }
+
+ /**
+ * {@inheritDoc} This implementation delegates to {@link StoredMap#get(Object)}. If the object is expired, it will
+ * not be returned.
+ */
+ public StoredEntry load(Object key) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(key, "key");
+ log.trace("Loading key {0}", key);
+ try {
+ StoredEntry s = cacheMap.get(key);
+ if (s == null)
+ return null;
+ if (!s.isExpired())
+ return s;
+ else
+ cacheMap.remove(key);
+ } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new CacheLoaderException("error loading key " + key, caught);
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc} This implementation delegates to {@link StoredMap#put(Object, Object)}
+ */
+ public void store(StoredEntry ed) throws CacheLoaderException {
+ checkOpen();
+ checkNonNull(ed, "entry");
+ log.trace("Storing entry {0}", ed);
+ try {
+ cacheMap.put(ed.getKey(), ed);
+ } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new CacheLoaderException("error storing entry " + ed, caught);
+ }
+ }
+
+ /**
+ * {@inheritDoc} This implementation delegates to {@link StoredMap#clear()}
+ */
+ public void clear() throws CacheLoaderException {
+ checkOpen();
+ log.trace("Clearing store");
+ try {cacheMap.clear(); } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new CacheLoaderException("error clearing store", caught);
+ }
+ }
+
+ /**
+ * {@inheritDoc} This implementation returns a Set from {@link StoredMap#values()}
+ */
+ public Set<StoredEntry> loadAll() throws CacheLoaderException {
+ checkOpen();
+ log.trace("Loading all entries");
+ try {
+ return new HashSet(cacheMap.values());
+ } catch (Exception caught) {
+ caught = ExceptionUnwrapper.unwrap(caught);
+ throw new CacheLoaderException("error loading all entries ", caught);
+ }
+ }
+
+ /**
+ * {@inheritDoc} This implementation reads the number of entries to load from the stream, then begins a transaction.
+ * During that tranasaction, the cachestore is cleared and replaced with entries from the stream. If there are any
+ * errors during the process, the entire transaction is rolled back. Deadlock handling is not addressed, as there is
+ * no means to rollback reads from the inputstream.
+ *
+ * @see BdbjeCacheStore#toStream(java.io.ObjectOutput)
+ */
+ public void fromStream(ObjectInput ois) throws CacheLoaderException {
+ checkOpen();
+ log.warn("Clearing all entries and loading from input");
+ try {
+ long recordCount = ois.readLong();
+ log.info("reading {0} records from stream", recordCount);
+ log.info("clearing all records");
+ currentTransaction.beginTransaction(null);
+ cacheMap.clear();
+ Cursor cursor = null;
+ try {
+ cursor = cacheDb.openCursor(currentTransaction.getTransaction(), null);
+ for (int i = 0; i < recordCount; i++) {
+ byte[] keyBytes = (byte[]) ois.readObject();
+ byte[] dataBytes = (byte[]) ois.readObject();
+
+ DatabaseEntry key = new DatabaseEntry(keyBytes);
+ DatabaseEntry data = new DatabaseEntry(dataBytes);
+ cursor.put(key, data);
+ }
+ } finally {
+ if (cursor != null) cursor.close();
+ }
+ completeCurrentTransaction(true);
+ }
+ catch (Exception caught) {
+ completeCurrentTransaction(false);
+ caught = ExceptionUnwrapper.unwrap(caught);
+ CacheLoaderException cle = (caught instanceof CacheLoaderException) ? (CacheLoaderException) caught :
+ new CacheLoaderException("Problems reading from stream", caught);
+ throw cle;
+ }
+ }
+
+ /**
+ * @{inheritDoc} Writes the current count of cachestore entries followed by a pair of byte[]s corresponding to the
+ * SleepyCat binary representation of {@link org.horizon.loader.StoredEntry#getKey() key} {@link StoredEntry value}.
+ * <p/>
+ * This implementation holds a transaction open to ensure that we see no new records added while iterating.
+ */
+ public void toStream(ObjectOutput oos) throws CacheLoaderException {
+ checkOpen();
+ log.trace("dumping current database to outputstream");
+ Cursor cursor = null;
+ try {
+ currentTransaction.beginTransaction(null);
+ long recordCount = cacheDb.count();
+ log.debug("writing {0} records to stream", recordCount);
+ oos.writeLong(recordCount);
+
+ cursor = cacheDb.openCursor(currentTransaction.getTransaction(), null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ int recordsWritten = 0;
+ while (cursor.getNext(key, data, null) ==
+ OperationStatus.SUCCESS) {
+ oos.writeObject(key.getData());
+ oos.writeObject(data.getData());
+ recordsWritten++;
+ }
+ log.debug("wrote {0} records to stream", recordsWritten);
+ if (recordsWritten != recordCount)
+ log.warn("expected to write {0} records, but wrote {1}", recordCount, recordsWritten);
+ cursor.close();
+ cursor = null;
+ currentTransaction.commitTransaction();
+ } catch (Exception caught) {
+ try {
+ currentTransaction.abortTransaction();
+ } catch (DatabaseException e) {
+ log.error("error aborting transaction", e);
+ }
+ caught = ExceptionUnwrapper.unwrap(caught);
+ CacheLoaderException cle = (caught instanceof CacheLoaderException) ? (CacheLoaderException) caught :
+ new CacheLoaderException("Problems writing to stream", caught);
+ throw cle;
+ }
+ finally {
+ if (cursor != null) try {
+ cursor.close();
+ } catch (DatabaseException e) {
+ throw new CacheLoaderException("Error closing cursor", e);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc} If there is a {@link com.sleepycat.collections.CurrentTransaction#getTransaction() transaction in
+ * progress}, this method will invoke {@link #doPurgeExpired()} Otherwise, it will purge expired entries,
+ * autocommitting each.
+ *
+ * @see org.horizon.loader.bdbje.BdbjeCacheStoreConfig#isPurgeSynchronously()
+ * @see org.horizon.loader.bdbje.BdbjeCacheStoreConfig#setMaxTxRetries(int)
+ */
+ public void purgeExpired() throws CacheLoaderException {
+ checkOpen();
+ if (currentTransaction.getTransaction() != null) {
+ doPurgeExpired();
+ } else {
+ purgerService.execute(new Runnable() {
+ public void run() {
+ try {
+ doPurgeExpired();
+ } catch (Exception e) {
+ log.error("error purging expired entries", e);
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Iterate through {@link com.sleepycat.collections.StoredMap#entrySet()} and remove, if expired.
+ */
+ private void doPurgeExpired() {
+ log.info("purging expired from database");
+ Iterator<Map.Entry<Object, StoredEntry>> i = cacheMap.entrySet().iterator();
+ while (i.hasNext()) {
+ if (i.next().getValue().isExpired())
+ i.remove();
+ }
+ }
+
+ /**
+ * @return the name of the SleepyCat database persisting this store
+ */
+ public String getCacheDbName() {
+ return cacheDbName;
+ }
+
+ /**
+ * @return the name of the SleepyCat database persisting the class information for objects in this store
+ */
+ public String getCatalogDbName() {
+ return catalogDbName;
+ }
+
+ /**
+ * prints terms of use for Berkeley DB JE
+ */
+ public void printLicense() {
+ String license = "\n*************************************************************************************\n" +
+ "Berkeley DB Java Edition version: " + JEVersion.CURRENT_VERSION.toString() + "\n" +
+ "JBoss Cache can use Berkeley DB Java Edition from Oracle \n" +
+ "(http://www.oracle.com/database/berkeley-db/je/index.html)\n" +
+ "for persistent, reliable and transaction-protected data storage.\n" +
+ "If you choose to use Berkeley DB Java Edition with JBoss Cache, you must comply with the terms\n" +
+ "of Oracle's public license, included in the file LICENSE.txt.\n" +
+ "If you prefer not to release the source code for your own application in order to comply\n" +
+ "with the Oracle public license, you may purchase a different license for use of\n" +
+ "Berkeley DB Java Edition with JBoss Cache.\n" +
+ "See http://www.oracle.com/database/berkeley-db/je/index.html for pricing and license terms\n" +
+ "*************************************************************************************";
+ System.out.println(license);
+ }
+
+ /**
+ * Throws an exception if the environment is not open.
+ */
+ private void checkNotOpen() {
+ if (env != null) {
+ throw new IllegalStateException(
+ "Operation not allowed after calling create()");
+ }
+ }
+
+ /**
+ * Throws an exception if the environment is not open.
+ */
+ private void checkOpen() {
+ if (env == null) {
+ throw new IllegalStateException(
+ "Operation not allowed before calling create()");
+ }
+ }
+
+ /**
+ * Throws an exception if the parameter is null.
+ */
+ private void checkNonNull(Object param, String paramName) {
+ if (param == null) {
+ throw new NullPointerException(
+ "Parameter must not be null: " + paramName);
+ }
+ }
+
+}
Added: core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfig.java 2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,66 @@
+package org.horizon.loader.bdbje;
+
+import org.horizon.loader.AbstractCacheLoaderConfig;
+
+/**
+ * Configures {@link org.horizon.loader.bdbje.BdbjeCacheStore}. This allows you to tune a number of characteristics of
+ * the {@link BdbjeCacheStore}.
+ * <p/>
+ * <ul> <li><tt>location</tt> - a location on disk where the store can write databases. This defaults to
+ * <tt>${java.io.tmpdir}</tt></li> <li><tt>purgeSynchronously</tt> - whether {@link
+ * org.horizon.loader.CacheStore#purgeExpired()} calls happen synchronously or not. By default, this is set to
+ * <tt>false</tt>.</li> <li><tt>lockAcquistionTimeoutMicros</tt> - the length of time, in microseconds, to wait for
+ * locks before timing out and throwing an exception. By default, this is set to <tt>60000000</tt>.</li>
+ * <li><tt>maxTxRetries</tt> - the number of times transaction prepares will attempt to resolve a deadlock before
+ * throwing an exception. By default, this is set to <tt>5</tt>.</li> </ul>
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class BdbjeCacheStoreConfig extends AbstractCacheLoaderConfig {
+ private String location = System.getProperty("java.io.tmpdir");
+ private boolean purgeSynchronously;
+ private long lockAcquistionTimeoutMicros = 60 * 1000 * 1000;
+ private int maxTxRetries = 5;
+
+
+ public BdbjeCacheStoreConfig() {
+ setClassName(BdbjeCacheStore.class.getName());
+ }
+
+ public int getMaxTxRetries() {
+ return maxTxRetries;
+ }
+
+ public void setMaxTxRetries(int maxTxRetries) {
+ this.maxTxRetries = maxTxRetries;
+ }
+
+
+ public long getLockAcquistionTimeoutMicros() {
+ return lockAcquistionTimeoutMicros;
+ }
+
+ public void setLockAcquistionTimeoutMicros(long lockAcquistionTimeoutMicros) {
+ this.lockAcquistionTimeoutMicros = lockAcquistionTimeoutMicros;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public void setLocation(String location) {
+ testImmutability("location");
+ this.location = location;
+ }
+
+ public boolean isPurgeSynchronously() {
+ return purgeSynchronously;
+ }
+
+ public void setPurgeSynchronously(boolean purgeSynchronously) {
+ this.purgeSynchronously = purgeSynchronously;
+ }
+
+}
Added: core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/bdbje/ModificationsTransactionWorker.java 2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,60 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.collections.TransactionWorker;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+
+import java.util.List;
+
+/**
+ * Adapter that allows a list of {@link Modification}s to be performed atomically via {@link
+ * com.sleepycat.collections.TransactionRunner}.
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class ModificationsTransactionWorker implements TransactionWorker {
+ private List<? extends Modification> mods;
+ private CacheStore cs;
+
+ /**
+ * Associates {@link Modification}s that will be applied to the supplied {@link CacheStore}
+ *
+ * @param store what to affect
+ * @param mods actions to take
+ */
+ public ModificationsTransactionWorker(CacheStore store, List<? extends Modification> mods) {
+ this.cs = store;
+ this.mods = mods;
+ }
+
+ /**
+ * {@inheritDoc} This implementation iterates through a list of work represented by {@link Modification} objects and
+ * executes it against the {@link CacheStore}.<p/> Current commands supported are: <ul> <li>STORE</li> <li>CLEAR</li>
+ * <li>REMOVE</li> <li>PURGE_EXPIRED</li> </ul>
+ */
+ public void doWork() throws Exception {
+ for (Modification modification : mods)
+ switch (modification.getType()) {
+ case STORE:
+ Store s = (Store) modification;
+ cs.store(s.getStoredEntry());
+ break;
+ case CLEAR:
+ cs.clear();
+ break;
+ case REMOVE:
+ Remove r = (Remove) modification;
+ cs.remove(r.getKey());
+ break;
+ case PURGE_EXPIRED:
+ cs.purgeExpired();
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown modification type " + modification.getType());
+ }
+ }
+}
Added: core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/bdbje/PreparableTransactionRunner.java 2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,95 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.TransactionRunner;
+import com.sleepycat.collections.TransactionWorker;
+import com.sleepycat.compat.DbCompat;
+import com.sleepycat.je.DeadlockException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import com.sleepycat.util.ExceptionUnwrapper;
+
+/**
+ * Adapted version of {@link TransactionRunner}, which allows us to prepare a transaction without committing it.<p/> The
+ * transaction prepared is accessible via {@link com.sleepycat.collections.CurrentTransaction#getTransaction()}
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+public class PreparableTransactionRunner extends TransactionRunner {
+ CurrentTransaction currentTxn;
+
+ /**
+ * Delegates to the {@link TransactionRunner#TransactionRunner(com.sleepycat.je.Environment, int,
+ * com.sleepycat.je.TransactionConfig) superclass} and caches a current reference to {@link CurrentTransaction}.
+ *
+ * @see TransactionRunner#TransactionRunner(com.sleepycat.je.Environment, int, com.sleepycat.je.TransactionConfig)
+ */
+ public PreparableTransactionRunner(Environment env, int maxRetries, TransactionConfig config) {
+ super(env, maxRetries, config);
+ this.currentTxn = CurrentTransaction.getInstance(env);
+ }
+
+ /**
+ * Delegates to the {@link TransactionRunner#TransactionRunner(com.sleepycat.je.Environment) superclass} and caches
+ * a current reference to {@link CurrentTransaction}.
+ *
+ * @see TransactionRunner#TransactionRunner(com.sleepycat.je.Environment)
+ */
+ public PreparableTransactionRunner(Environment env) {
+ super(env);
+ this.currentTxn = CurrentTransaction.getInstance(env);
+ }
+
+ /**
+ * Same behaviour as {@link TransactionRunner#run(com.sleepycat.collections.TransactionWorker) run}, except that the
+ * transaction is not committed on success.
+ *
+ * @see TransactionRunner#run(com.sleepycat.collections.TransactionWorker)
+ */
+ public void prepare(TransactionWorker worker) throws Exception {
+ for (int currentTries = 0; ; currentTries++) {
+ Transaction txn = null;
+ try {
+ txn = currentTxn.beginTransaction(getTransactionConfig());
+ worker.doWork();
+ return;
+ } catch (Throwable caught) {
+ currentTries = abortOverflowingCurrentTriesOnError(txn, currentTries);
+ caught = ExceptionUnwrapper.unwrapAny(caught);
+ rethrowIfNotDeadLock(caught);
+ if (currentTries >= getMaxRetries())
+ throw (DeadlockException) caught;
+ }
+ }
+ }
+
+ int abortOverflowingCurrentTriesOnError(Transaction toAbort, int currentTries) {
+ if (toAbort != null && toAbort == currentTxn.getTransaction()) {
+ try {
+ currentTxn.abortTransaction();
+ } catch (Throwable problemAborting) {
+ /* superclass prints to stderr, so we will also */
+ if (DbCompat.TRANSACTION_RUNNER_PRINT_STACK_TRACES) {
+ problemAborting.printStackTrace();
+ }
+ /* Force the original exception to be thrown. */
+ return Integer.MAX_VALUE;
+ }
+ }
+ return currentTries;
+ }
+
+ void rethrowIfNotDeadLock(Throwable caught) throws Exception {
+ if (!(caught instanceof DeadlockException)) {
+ if (caught instanceof Exception) {
+ throw (Exception) caught;
+ } else {
+ throw (Error) caught;
+ }
+ }
+ }
+
+}
Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreConfigTest.java 2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,74 @@
+package org.horizon.loader.bdbje;
+
+import org.horizon.loader.CacheLoaderException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests that cover {@link BdbjeCacheStoreConfig }
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName = "loader.bdbje.BdbjeCacheStoreConfigTest")
+public class BdbjeCacheStoreConfigTest {
+
+ private BdbjeCacheStoreConfig config;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ config = new BdbjeCacheStoreConfig();
+ }
+
+ @AfterMethod
+ public void tearDown() throws CacheLoaderException {
+ config = null;
+ }
+
+
+ @Test
+ public void testGetClassNameDefault() {
+ assert config.getClassName().equals(BdbjeCacheStore.class.getName());
+ }
+
+ @Test
+ public void testgetMaxTxRetries() {
+ assert config.getMaxTxRetries() == 5;
+ }
+
+ @Test
+ public void testSetMaxTxRetries() {
+ config.setMaxTxRetries(1);
+ assert config.getMaxTxRetries() == 1;
+ }
+
+ @Test
+ public void testGetLockAcquistionTimeoutMicros() {
+ assert config.getLockAcquistionTimeoutMicros() == 60 * 1000 * 1000;
+ }
+
+ @Test
+ public void testSetLockAcquistionTimeoutMicros() {
+ config.setLockAcquistionTimeoutMicros(1);
+ assert config.getLockAcquistionTimeoutMicros() == 1;
+ }
+
+ @Test
+ public void testSetLocation() {
+ config.setLocation("foo");
+ assert config.getLocation().equals("foo");
+ }
+
+ @Test
+ public void testIsPurgeSynchronously() {
+ assert !config.isPurgeSynchronously();
+ }
+
+ @Test
+ public void testSetPurgeSynchronously() {
+ config.setPurgeSynchronously(true);
+ assert config.isPurgeSynchronously();
+ }
+}
Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeCacheStoreIntegrationTest.java 2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,40 @@
+package org.horizon.loader.bdbje;
+
+import org.horizon.loader.BaseCacheStoreTest;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheStore;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+/**
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName = "loader.bdbje.BdbjeCacheStoreIntegrationTest")
+public class BdbjeCacheStoreIntegrationTest extends BaseCacheStoreTest {
+ protected CacheStore createCacheStore() throws CacheLoaderException {
+ CacheStore cs = new BdbjeCacheStore();
+ String tmpDir = TestingUtil.TEST_FILES;
+ String tmpCLLoc = tmpDir + "/Horizon-BdbjeCacheStoreIntegrationTest";
+ TestingUtil.recursiveFileRemove(tmpCLLoc);
+
+ BdbjeCacheStoreConfig cfg = new BdbjeCacheStoreConfig();
+ cfg.setLocation(tmpCLLoc);
+ cfg.setPurgeSynchronously(true);
+ cs.init(cfg, getCache(), getMarshaller());
+ cs.start();
+ return cs;
+ }
+
+ @Override
+ public void testTwoPhaseCommitReadCommitted() throws CacheLoaderException {
+ // this depends on READ_COMMTTED, which is not supported on sleepycat
+ }
+
+ @Override
+ public void testRollbackReadCommitted() throws CacheLoaderException {
+ // this depends on READ_COMMTTED, which is not supported on sleepycat
+ }
+
+}
Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/BdbjeLearningTest.java 2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,733 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.bind.EntryBinding;
+import com.sleepycat.bind.serial.SerialBinding;
+import com.sleepycat.bind.serial.StoredClassCatalog;
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.StoredMap;
+import com.sleepycat.collections.TransactionRunner;
+import com.sleepycat.collections.TransactionWorker;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.OperationStatus;
+import org.easymock.EasyMock;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Clear;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Learning tests for SleepyCat JE. Behaviour here is used in BdbjeCacheLoader. When there are upgrades to bdbje, this
+ * test may warrant updating.
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName = "loader.bdbje.BdbjeLearningTest")
+public class BdbjeLearningTest {
+ String dbHome = TestingUtil.TEST_FILES + "/Horizon-BdbjeLearningTest";
+ Environment env;
+
+ private static final String CLASS_CATALOG = "java_class_catalog";
+ private StoredClassCatalog javaCatalog;
+
+ private static final String STORED_ENTRIES = "storedEntriesDb";
+ private Database storedEntriesDb;
+ private StoredMap<Object, StoredEntry> cacheMap;
+
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ new File(dbHome).mkdirs();
+ System.out.println("Opening environment in: " + dbHome);
+
+ EnvironmentConfig envConfig = new EnvironmentConfig();
+ envConfig.setTransactional(true);
+ envConfig.setAllowCreate(true);
+
+ env = new Environment(new File(dbHome), envConfig);
+ DatabaseConfig dbConfig = new DatabaseConfig();
+ dbConfig.setTransactional(true);
+ dbConfig.setAllowCreate(true);
+
+ Database catalogDb = env.openDatabase(null, CLASS_CATALOG, dbConfig);
+
+ javaCatalog = new StoredClassCatalog(catalogDb);
+
+ EntryBinding storedEntryKeyBinding =
+ new SerialBinding(javaCatalog, Object.class);
+ EntryBinding storedEntryValueBinding =
+ new SerialBinding(javaCatalog, StoredEntry.class);
+
+ storedEntriesDb = env.openDatabase(null, STORED_ENTRIES, dbConfig);
+
+ cacheMap =
+ new StoredMap<Object, StoredEntry>(storedEntriesDb,
+ storedEntryKeyBinding, storedEntryValueBinding, true);
+
+
+ }
+
+ public void testTransactionWorker() throws Exception {
+ TransactionRunner runner = new TransactionRunner(env);
+ runner.run(new PopulateDatabase());
+ runner.run(new PrintDatabase());
+
+ }
+
+
+ private class PopulateDatabase implements TransactionWorker {
+ public void doWork()
+ throws Exception {
+ }
+ }
+
+ private class PrintDatabase implements TransactionWorker {
+ public void doWork()
+ throws Exception {
+ }
+ }
+
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ storedEntriesDb.close();
+ javaCatalog.close();
+ env.close();
+
+ TestingUtil.recursiveFileRemove(dbHome);
+ }
+
+
+ private void store(StoredEntry se) {
+ cacheMap.put(se.getKey(), se);
+ }
+
+
+ private StoredEntry load(Object key) {
+ StoredEntry s = cacheMap.get(key);
+ if (s == null)
+ return null;
+ if (!s.isExpired())
+ return s;
+ else
+ cacheMap.remove(key);
+ return null;
+ }
+
+ private Set loadAll() {
+ return new HashSet(cacheMap.values());
+ }
+
+ private void purgeExpired() {
+ Iterator<Map.Entry<Object, StoredEntry>> i = cacheMap.entrySet().iterator();
+ while (i.hasNext()) {
+ if (i.next().getValue().isExpired())
+ i.remove();
+ }
+ }
+
+ private static final Log log = LogFactory.getLog(BdbjeLearningTest.class);
+
+ private void toStream(OutputStream outputStream) throws CacheLoaderException {
+ ObjectOutputStream oos = null;
+ Cursor cursor = null;
+
+ try {
+ oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream) outputStream :
+ new ObjectOutputStream(outputStream);
+ long recordCount = storedEntriesDb.count();
+ log.trace("writing {0} records to stream", recordCount);
+ oos.writeLong(recordCount);
+
+ cursor = storedEntriesDb.openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ while (cursor.getNext(key, data, null) ==
+ OperationStatus.SUCCESS) {
+ oos.writeObject(key.getData());
+ oos.writeObject(data.getData());
+ }
+ } catch (IOException e) {
+ throw new CacheLoaderException("Error writing to object stream", e);
+ } catch (DatabaseException e) {
+ throw new CacheLoaderException("Error accessing database", e);
+ }
+ finally {
+ if (cursor != null) try {
+ cursor.close();
+ } catch (DatabaseException e) {
+ throw new CacheLoaderException("Error closing cursor", e);
+ }
+ }
+
+ }
+
+ private void fromStream(InputStream inputStream) throws CacheLoaderException {
+ ObjectInputStream ois = null;
+ try {
+ ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream) inputStream :
+ new ObjectInputStream(inputStream);
+ long recordCount = ois.readLong();
+ log.info("reading {0} records from stream", recordCount);
+ log.info("clearing all records");
+ cacheMap.clear();
+ Cursor cursor = null;
+ com.sleepycat.je.Transaction txn = env.beginTransaction(null, null);
+ try {
+ cursor = storedEntriesDb.openCursor(txn, null);
+ for (int i = 0; i < recordCount; i++) {
+ byte[] keyBytes = (byte[]) ois.readObject();
+ byte[] dataBytes = (byte[]) ois.readObject();
+
+ DatabaseEntry key = new DatabaseEntry(keyBytes);
+ DatabaseEntry data = new DatabaseEntry(dataBytes);
+ cursor.put(key, data);
+ }
+ cursor.close();
+ cursor = null;
+ txn.commit();
+ } finally {
+ if (cursor != null) cursor.close();
+ }
+
+ }
+ catch (Exception e) {
+ CacheLoaderException cle = (e instanceof CacheLoaderException) ? (CacheLoaderException) e :
+ new CacheLoaderException("Problems reading from stream", e);
+ throw cle;
+ }
+ }
+
+ class StoreTransactionWorker implements TransactionWorker {
+ StoreTransactionWorker(StoredEntry entry) {
+ this.entry = entry;
+ }
+
+ private StoredEntry entry;
+
+ public void doWork() throws Exception {
+ store(entry);
+ }
+ }
+
+ class ClearTransactionWorker implements TransactionWorker {
+
+ public void doWork() throws Exception {
+ cacheMap.clear();
+ }
+ }
+
+ class RemoveTransactionWorker implements TransactionWorker {
+ RemoveTransactionWorker(Object key) {
+ this.key = key;
+ }
+
+ Object key;
+
+ public void doWork() throws Exception {
+ cacheMap.remove(key);
+ }
+ }
+
+ class PurgeExpiredTransactionWorker implements TransactionWorker {
+ public void doWork() throws Exception {
+ purgeExpired();
+ }
+ }
+
+ class ModificationsTransactionWorker implements TransactionWorker {
+ private List<? extends Modification> mods;
+
+ ModificationsTransactionWorker(List<? extends Modification> mods) {
+ this.mods = mods;
+ }
+
+ public void doWork() throws Exception {
+ for (Modification modification : mods)
+ switch (modification.getType()) {
+ case STORE:
+ Store s = (Store) modification;
+ store(s.getStoredEntry());
+ break;
+ case CLEAR:
+ cacheMap.clear();
+ break;
+ case REMOVE:
+ Remove r = (Remove) modification;
+ cacheMap.remove(r.getKey());
+ break;
+ case PURGE_EXPIRED:
+ purgeExpired();
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown modification type " + modification.getType());
+ }
+ }
+ }
+
+
+ private void prepare(List<Modification> mods, Transaction tx, boolean isOnePhase) throws CacheLoaderException {
+ if (isOnePhase) {
+ TransactionRunner runner = new TransactionRunner(env);
+ try {
+ runner.run(new ModificationsTransactionWorker(mods));
+ } catch (Exception e) {
+ e.printStackTrace(); // TODO: Manik: Customise this generated block
+ }
+ } else {
+ PreparableTransactionRunner runner = new PreparableTransactionRunner(env);
+ com.sleepycat.je.Transaction txn = null;
+ try {
+ runner.prepare(new ModificationsTransactionWorker(mods));
+ txn = CurrentTransaction.getInstance(env).getTransaction();
+ txnMap.put(tx, txn);
+ } catch (Exception e) {
+ e.printStackTrace(); // TODO: Manik: Customise this generated block
+ }
+ }
+
+ }
+
+ Map<Transaction, com.sleepycat.je.Transaction> txnMap = new HashMap<Transaction, com.sleepycat.je.Transaction>();
+
+ private void commit(Transaction tx) {
+ com.sleepycat.je.Transaction txn = txnMap.remove(tx);
+ CurrentTransaction currentTransaction = CurrentTransaction.getInstance(env);
+ if (txn != null) {
+ if (currentTransaction.getTransaction() == txn) {
+ try {
+ currentTransaction.commitTransaction();
+ } catch (DatabaseException e) {
+ e.printStackTrace(); // TODO: Manik: Customise this generated block
+ }
+ } else {
+ log.error("Transactions must be committed on the same thread");
+ }
+ }
+ }
+
+ private void rollback(Transaction tx) {
+ com.sleepycat.je.Transaction txn = txnMap.remove(tx);
+ CurrentTransaction currentTransaction = CurrentTransaction.getInstance(env);
+ if (txn != null) {
+ if (currentTransaction.getTransaction() == txn) {
+ try {
+ currentTransaction.abortTransaction();
+ } catch (DatabaseException e) {
+ e.printStackTrace(); // TODO: Manik: Customise this generated block
+ }
+ } else {
+ log.error("Transactions must be committed on the same thread");
+ }
+ }
+ }
+
+ public void testLoadAndStore() throws InterruptedException, CacheLoaderException {
+ assert !cacheMap.containsKey("k");
+ StoredEntry se = new StoredEntry("k", "v", -1, -1);
+ store(se);
+
+ assert load("k").getValue().equals("v");
+ assert load("k").getLifespan() == -1;
+ assert !load("k").isExpired();
+ assert cacheMap.containsKey("k");
+
+ long now = System.currentTimeMillis();
+ long lifespan = 120000;
+ se = new StoredEntry("k", "v", now, now + lifespan);
+ store(se);
+
+ assert load("k").getValue().equals("v");
+ assert load("k").getLifespan() == lifespan;
+ assert !load("k").isExpired();
+ assert cacheMap.containsKey("k");
+
+ now = System.currentTimeMillis();
+ lifespan = 1;
+ se = new StoredEntry("k", "v", now, now + lifespan);
+ store(se);
+ Thread.sleep(100);
+ assert se.isExpired();
+ assert load("k") == null;
+ assert !cacheMap.containsKey("k");
+ }
+
+
+ public void testOnePhaseCommit() throws CacheLoaderException {
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ prepare(mods, tx, true);
+
+ Set s = loadAll();
+
+ assert load("k2").getValue().equals("v2");
+ assert !cacheMap.containsKey("k1");
+
+ cacheMap.clear();
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ prepare(mods, tx, true);
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert cacheMap.containsKey("k3");
+ }
+
+
+ public void testTwoPhaseCommit() throws Throwable {
+ final List<Throwable> throwables = new ArrayList<Throwable>();
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ prepare(mods, tx, false);
+
+
+ Thread gets1 = new Thread(
+ new Runnable() {
+ public void run() {
+ try {
+ assert load("k2").getValue().equals("v2");
+ assert !cacheMap.containsKey("k1");
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ }
+ }
+ );
+
+ gets1.start();
+ commit(tx);
+
+ gets1.join();
+
+ if (!throwables.isEmpty()) throw throwables.get(0);
+
+
+ cacheMap.clear();
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ prepare(mods, tx, false);
+
+ Thread gets2 = new Thread(
+ new Runnable() {
+ public void run() {
+ try {
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert cacheMap.containsKey("k3");
+
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ }
+ }
+ );
+
+ gets2.start();
+
+
+ commit(tx);
+ gets2.join();
+
+ if (!throwables.isEmpty()) throw throwables.get(0);
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert cacheMap.containsKey("k3");
+ }
+
+
+ public void testRollback() throws Throwable {
+
+ store(new StoredEntry("old", "old", -1, -1));
+
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ mods.add(new Remove("old"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ prepare(mods, tx, false);
+
+// final List<Throwable> throwables = new ArrayList<Throwable>();
+//
+// Thread gets1 = new Thread(
+// new Runnable() {
+// public void run() {
+// try {
+// assert !cacheMap.containsKey("k1");
+// assert !cacheMap.containsKey("k2");
+// assert cacheMap.containsKey("old");
+// } catch (Throwable e) {
+// throwables.add(e);
+// }
+// }
+// }
+// );
+//
+// gets1.start();
+// gets1.join();
+//
+// if (!throwables.isEmpty()) throw throwables.get(0);
+
+ rollback(tx);
+
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert cacheMap.containsKey("old");
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ prepare(mods, tx, false);
+
+// Thread gets2 = new Thread(
+// new Runnable() {
+// public void run() {
+// try {
+// assert !cacheMap.containsKey("k1");
+// assert !cacheMap.containsKey("k2");
+// assert !cacheMap.containsKey("k3");
+// } catch (Throwable e) {
+// throwables.add(e);
+// }
+// }
+// }
+// );
+//
+// gets2.start();
+// gets2.join();
+//
+// if (!throwables.isEmpty()) throw throwables.get(0);
+
+
+ rollback(tx);
+
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert !cacheMap.containsKey("k3");
+ assert cacheMap.containsKey("old");
+ }
+
+
+ public void testCommitAndRollbackWithoutPrepare() throws CacheLoaderException {
+ store(new StoredEntry("old", "old", -1, -1));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ commit(tx);
+ store(new StoredEntry("old", "old", -1, -1));
+ rollback(tx);
+
+ assert cacheMap.containsKey("old");
+ }
+
+ public void testPreload() throws CacheLoaderException {
+ store(new StoredEntry("k1", "v1", -1, -1));
+ store(new StoredEntry("k2", "v2", -1, -1));
+ store(new StoredEntry("k3", "v3", -1, -1));
+
+ Set<StoredEntry> set = loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+ public void testPurgeExpired() throws Exception {
+ long now = System.currentTimeMillis();
+ long lifespan = 1000;
+ store(new StoredEntry("k1", "v1", now, now + lifespan));
+ store(new StoredEntry("k2", "v2", now, now + lifespan));
+ store(new StoredEntry("k3", "v3", now, now + lifespan));
+ assert cacheMap.containsKey("k1");
+ assert cacheMap.containsKey("k2");
+ assert cacheMap.containsKey("k3");
+ Thread.sleep(lifespan + 100);
+ purgeExpired();
+ assert !cacheMap.containsKey("k1");
+ assert !cacheMap.containsKey("k2");
+ assert !cacheMap.containsKey("k3");
+ }
+
+
+ public void testStreamingAPI() throws IOException, ClassNotFoundException, CacheLoaderException {
+ store(new StoredEntry("k1", "v1", -1, -1));
+ store(new StoredEntry("k2", "v2", -1, -1));
+ store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ toStream(out);
+ out.close();
+ cacheMap.clear();
+ fromStream(new ByteArrayInputStream(out.toByteArray()));
+
+ Set<StoredEntry> set = loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+
+ public void testStreamingAPIReusingStreams() throws IOException, ClassNotFoundException, CacheLoaderException {
+ store(new StoredEntry("k1", "v1", -1, -1));
+ store(new StoredEntry("k2", "v2", -1, -1));
+ store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] dummyStartBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+ byte[] dummyEndBytes = {8, 7, 6, 5, 4, 3, 2, 1};
+ out.write(dummyStartBytes);
+ toStream(out);
+ out.write(dummyEndBytes);
+ out.close();
+ cacheMap.clear();
+
+ // first pop the start bytes
+ byte[] dummy = new byte[8];
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ int bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i = 1; i < 9; i++) assert dummy[i - 1] == i : "Start byte stream corrupted!";
+ fromStream(in);
+ bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i = 8; i > 0; i--) assert dummy[8 - i] == i : "Start byte stream corrupted!";
+
+ Set<StoredEntry> set = loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
+ public void testConcurrency() throws Throwable {
+ int numThreads = 3;
+ final int loops = 500;
+ final String[] keys = new String[10];
+ final String[] values = new String[10];
+ for (int i = 0; i < 10; i++) keys[i] = "k" + i;
+ for (int i = 0; i < 10; i++) values[i] = "v" + i;
+
+
+ final Random r = new Random();
+ final List<Throwable> throwables = new LinkedList<Throwable>();
+
+ final Runnable store = new Runnable() {
+ public void run() {
+ try {
+ int randomInt = r.nextInt(10);
+ store(new StoredEntry(keys[randomInt], values[randomInt]));
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ }
+ };
+
+ final Runnable remove = new Runnable() {
+ public void run() {
+ try {
+ cacheMap.remove(keys[r.nextInt(10)]);
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ }
+ };
+
+ final Runnable get = new Runnable() {
+ public void run() {
+ try {
+ int randomInt = r.nextInt(10);
+ StoredEntry se = load(keys[randomInt]);
+ assert se == null || se.getValue().equals(values[randomInt]);
+ loadAll();
+ } catch (Throwable e) {
+ throwables.add(e);
+ }
+ }
+ };
+
+ Thread[] threads = new Thread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ threads[i] = new Thread(getClass().getSimpleName() + "-" + i) {
+ public void run() {
+ for (int i = 0; i < loops; i++) {
+ store.run();
+ remove.run();
+ get.run();
+ }
+ }
+ };
+ }
+
+ for (Thread t : threads) t.start();
+ for (Thread t : threads) t.join();
+
+ if (!throwables.isEmpty()) throw throwables.get(0);
+ }
+
+
+}
+
Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/ModificationsTransactionWorkerTest.java 2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,111 @@
+package org.horizon.loader.bdbje;
+
+import static org.easymock.classextension.EasyMock.*;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Clear;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.PurgeExpired;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+
+/**
+ * Unit tests that cover {@link ModificationsTransactionWorker }
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName = "loader.bdbje.ModificationsTransactionWorkerTest")
+public class ModificationsTransactionWorkerTest {
+
+ @Test
+ public void testDoWorkOnUnsupportedModification() {
+ //TODO: we currently support all modifications...
+ }
+
+ @Test
+ public void testDoWorkOnStore() throws Exception {
+ CacheStore cs = createMock(CacheStore.class);
+ Store store = createMock(Store.class);
+ StoredEntry entry = new StoredEntry("1", "2");
+ expect(store.getType()).andReturn(Modification.Type.STORE);
+ expect(store.getStoredEntry()).andReturn(entry);
+ cs.store(entry);
+ replay(cs);
+ replay(store);
+
+ ModificationsTransactionWorker worker =
+ new ModificationsTransactionWorker(cs,
+ Collections.singletonList(store));
+ worker.doWork();
+ verify(cs);
+ verify(store);
+
+ }
+
+ @Test
+ public void testDoWorkOnRemove() throws Exception {
+ CacheStore cs = createMock(CacheStore.class);
+ Remove store = createMock(Remove.class);
+ expect(store.getType()).andReturn(Modification.Type.REMOVE);
+ expect(store.getKey()).andReturn("1");
+ expect(cs.remove("1")).andReturn(true);
+ replay(cs);
+ replay(store);
+
+ ModificationsTransactionWorker worker =
+ new ModificationsTransactionWorker(cs,
+ Collections.singletonList(store));
+ worker.doWork();
+ verify(cs);
+ verify(store);
+
+ }
+
+ @Test
+ public void testDoWorkOnClear() throws Exception {
+ CacheStore cs = createMock(CacheStore.class);
+ Clear clear = createMock(Clear.class);
+ expect(clear.getType()).andReturn(Modification.Type.CLEAR);
+ cs.clear();
+ replay(cs);
+ replay(clear);
+
+ ModificationsTransactionWorker worker =
+ new ModificationsTransactionWorker(cs,
+ Collections.singletonList(clear));
+ worker.doWork();
+ verify(cs);
+ verify(clear);
+ }
+
+ @Test
+ public void testDoWorkOnPurgeExpired() throws Exception {
+ CacheStore cs = createMock(CacheStore.class);
+ PurgeExpired purge = createMock(PurgeExpired.class);
+ expect(purge.getType()).andReturn(Modification.Type.PURGE_EXPIRED);
+ cs.purgeExpired();
+ replay(cs);
+ replay(purge);
+
+ ModificationsTransactionWorker worker =
+ new ModificationsTransactionWorker(cs,
+ Collections.singletonList(purge));
+ worker.doWork();
+ verify(cs);
+ verify(purge);
+ }
+
+
+// case REMOVE:
+// Remove r = (Remove) modification;
+// cs.remove(r.getKey());
+// break;
+// default:
+// throw new IllegalArgumentException("Unknown modification type " + modification.getType());
+
+}
Added: core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/bdbje/PreparableTransactionRunnerTest.java 2009-03-02 00:43:57 UTC (rev 7814)
@@ -0,0 +1,239 @@
+package org.horizon.loader.bdbje;
+
+import com.sleepycat.collections.CurrentTransaction;
+import com.sleepycat.collections.TransactionWorker;
+import com.sleepycat.je.DeadlockException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.classextension.EasyMock.*;
+import org.horizon.loader.CacheLoaderException;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests that cover {@link PreparableTransactionRunner }
+ *
+ * @author Adrian Cole
+ * @version $Id: $
+ * @since 1.0
+ */
+@Test(groups = "unit", enabled = true, testName = "loader.bdbje.PreparableTransactionRunnerTest")
+public class PreparableTransactionRunnerTest {
+ PreparableTransactionRunner runner;
+ Environment env;
+ EnvironmentConfig config;
+ TransactionWorker worker;
+ Transaction transaction;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ config = createMock(EnvironmentConfig.class);
+ expect(config.getTransactional()).andReturn(true);
+ expect(config.getLocking()).andReturn(true);
+ transaction = createMock(Transaction.class);
+ env = createMock(Environment.class);
+ expect(env.getConfig()).andReturn(config);
+ expect(env.beginTransaction(null, null)).andReturn(transaction);
+ worker = createMock(TransactionWorker.class);
+ }
+
+ @AfterMethod
+ public void tearDown() throws CacheLoaderException {
+ runner = null;
+ env = null;
+ config = null;
+ }
+
+
+ @Test
+ public void testMoreDeadlocks() throws Exception {
+ worker.doWork();
+ expectLastCall().andThrow(new DeadlockException());
+ transaction.abort();
+ expect(env.beginTransaction(null, null)).andReturn(transaction);
+ worker.doWork();
+ expectLastCall().andThrow(new DeadlockException());
+ transaction.abort();
+ expect(env.beginTransaction(null, null)).andReturn(transaction);
+ worker.doWork();
+ expectLastCall().andThrow(new DeadlockException());
+ transaction.abort();
+ replayAll();
+ runner = new PreparableTransactionRunner(env, 2, null);
+ try {
+ runner.prepare(worker);
+ assert false : "should have gotten a deadlock exception";
+ } catch (DeadlockException e) {
+
+ }
+ verifyAll();
+ }
+
+ @Test
+ public void testPrepare() throws Exception {
+
+ worker.doWork();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ runner.prepare(worker);
+ verifyAll();
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ transaction.commit();
+ worker.doWork();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ runner.run(worker);
+ verifyAll();
+ }
+
+
+ @Test
+ public void testOneArgConstructorSetsCurrentTxn() throws Exception {
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ assert CurrentTransaction.getInstance(env) == runner.currentTxn;
+ }
+
+ @Test
+ public void testSetMaxRetries() throws Exception {
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ runner.setMaxRetries(1);
+ assert runner.getMaxRetries() == 1;
+ }
+
+ @Test
+ public void testSetAllowNestedTransactions() throws Exception {
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ runner.setAllowNestedTransactions(false);
+ assert !runner.getAllowNestedTransactions();
+ try {
+ runner.setAllowNestedTransactions(true);
+ assert false : "should have gotten Exception";
+ } catch (UnsupportedOperationException e) {}
+ }
+
+ @Test
+ public void testGetTransactionConfig() throws Exception {
+ replayAll();
+ TransactionConfig config = new TransactionConfig();
+ runner = new PreparableTransactionRunner(env);
+ runner.setTransactionConfig(config);
+ assert runner.getTransactionConfig().equals(config);
+ }
+
+
+ @Test
+ public void testExceptionThrownInPrepare() throws Exception {
+
+ worker.doWork();
+ expectLastCall().andThrow(new RuntimeException());
+ transaction.abort();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+
+ try {
+ runner.prepare(worker);
+ assert false : "should have gotten an exception";
+ } catch (RuntimeException e) {
+
+ }
+ verifyAll();
+ }
+
+ @Test
+ public void testErrorThrownInPrepare() throws Exception {
+
+ worker.doWork();
+ expectLastCall().andThrow(new Error());
+ transaction.abort();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+
+ try {
+ runner.prepare(worker);
+ assert false : "should have gotten an exception";
+ } catch (Error e) {
+
+ }
+ verifyAll();
+ }
+
+
+ @Test
+ public void testExceptionThrownInRun() throws Exception {
+
+ worker.doWork();
+ expectLastCall().andThrow(new RuntimeException());
+ transaction.abort();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+
+ try {
+ runner.prepare(worker);
+ assert false : "should have gotten an exception";
+ } catch (RuntimeException e) {
+
+ }
+ verifyAll();
+ }
+
+ @Test
+ public void testErrorThrownInRun() throws Exception {
+
+ worker.doWork();
+ expectLastCall().andThrow(new Error());
+ transaction.abort();
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+
+ try {
+ runner.run(worker);
+ assert false : "should have gotten an exception";
+ } catch (Error e) {
+
+ }
+ verifyAll();
+ }
+
+
+ public void testRethrowIfNotDeadLockDoesntThrowWhenGivenDeadlockException() throws Exception {
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ runner.rethrowIfNotDeadLock(createNiceMock(DeadlockException.class));
+ }
+
+ public void testThrowableDuringAbort() throws Exception {
+ transaction.abort();
+ expectLastCall().andThrow(new RuntimeException());
+ replayAll();
+ runner = new PreparableTransactionRunner(env);
+ CurrentTransaction.getInstance(env).beginTransaction(null);
+ int max = runner.abortOverflowingCurrentTriesOnError(transaction, 2);
+ assert max == Integer.MAX_VALUE : "should have overflowed max tries, but got " + max;
+ verifyAll();
+ }
+
+ private void replayAll() {
+ replay(config);
+ replay(env);
+ replay(transaction);
+ replay(worker);
+ }
+
+ private void verifyAll() {
+ verify(config);
+ verify(env);
+ verify(transaction);
+ verify(worker);
+ }
+}
15 years, 10 months
JBoss Cache SVN: r7813 - core/branches/flat.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-01 19:35:05 -0500 (Sun, 01 Mar 2009)
New Revision: 7813
Modified:
core/branches/flat/pom.xml
Log:
changed to easymockclassextension so that we can mock classes that do not implement interfaces
Modified: core/branches/flat/pom.xml
===================================================================
--- core/branches/flat/pom.xml 2009-03-02 00:34:22 UTC (rev 7812)
+++ core/branches/flat/pom.xml 2009-03-02 00:35:05 UTC (rev 7813)
@@ -98,7 +98,7 @@
<dependency>
<groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
+ <artifactId>easymockclassextension</artifactId>
<version>2.4</version>
<scope>test</scope>
</dependency>
15 years, 10 months
JBoss Cache SVN: r7812 - core/branches/flat/src/test/java/org/horizon/loader.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-01 19:34:22 -0500 (Sun, 01 Mar 2009)
New Revision: 7812
Modified:
core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
Log:
added test to ensure we can commit from a different thread, and added 2PC tests that do not rely on MVCC
Modified: core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-01 12:10:33 UTC (rev 7811)
+++ core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-02 00:34:22 UTC (rev 7812)
@@ -131,10 +131,38 @@
mods.add(new Remove("k1"));
Transaction tx = EasyMock.createNiceMock(Transaction.class);
cs.prepare(mods, tx, false);
+ cs.commit(tx);
+ assert cs.load("k2").getValue().equals("v2");
assert !cs.containsKey("k1");
+
+ cs.clear();
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ cs.prepare(mods, tx, false);
+ cs.commit(tx);
+
+ assert !cs.containsKey("k1");
assert !cs.containsKey("k2");
+ assert cs.containsKey("k3");
+ }
+ public void testTwoPhaseCommitReadCommitted() throws CacheLoaderException {
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ cs.prepare(mods, tx, false);
+
+ assert !cs.containsKey("k1");
+ assert !cs.containsKey("k2");
+
cs.commit(tx);
assert cs.load("k2").getValue().equals("v2");
@@ -172,17 +200,49 @@
mods.add(new Remove("old"));
Transaction tx = EasyMock.createNiceMock(Transaction.class);
cs.prepare(mods, tx, false);
+ cs.rollback(tx);
assert !cs.containsKey("k1");
assert !cs.containsKey("k2");
assert cs.containsKey("old");
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ cs.prepare(mods, tx, false);
cs.rollback(tx);
assert !cs.containsKey("k1");
assert !cs.containsKey("k2");
+ assert !cs.containsKey("k3");
assert cs.containsKey("old");
+ }
+ public void testRollbackReadCommitted() throws CacheLoaderException {
+
+ cs.store(new StoredEntry("old", "old", -1, -1));
+
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ mods.add(new Remove("old"));
+ Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ cs.prepare(mods, tx, false);
+
+ assert !cs.containsKey("k1");
+ assert !cs.containsKey("k2");
+ assert cs.containsKey("old");
+
+ cs.rollback(tx);
+
+ assert !cs.containsKey("k1");
+ assert !cs.containsKey("k2");
+ assert cs.containsKey("old");
+
mods = new ArrayList<Modification>();
mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
@@ -203,6 +263,53 @@
assert cs.containsKey("old");
}
+ public void testRollbackFromADifferentThreadReusingTransactionKey() throws CacheLoaderException, InterruptedException {
+
+ cs.store(new StoredEntry("old", "old", -1, -1));
+
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Remove("k1"));
+ mods.add(new Remove("old"));
+ final Transaction tx = EasyMock.createNiceMock(Transaction.class);
+ cs.prepare(mods, tx, false);
+
+ Thread t = new Thread(new Runnable(){
+ public void run() {
+ cs.rollback(tx);
+ }
+ });
+
+ t.start();
+ t.join();
+
+ assert !cs.containsKey("k1");
+ assert !cs.containsKey("k2");
+ assert cs.containsKey("old");
+
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
+ mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
+ mods.add(new Clear());
+ mods.add(new Store(new StoredEntry("k3", "v3", -1, -1)));
+
+ cs.prepare(mods, tx, false);
+
+ Thread t2 = new Thread(new Runnable(){
+ public void run() {
+ cs.rollback(tx);
+ }
+ });
+
+ t2.start();
+ t2.join();
+ assert !cs.containsKey("k1");
+ assert !cs.containsKey("k2");
+ assert !cs.containsKey("k3");
+ assert cs.containsKey("old");
+ }
+
public void testCommitAndRollbackWithoutPrepare() throws CacheLoaderException {
cs.store(new StoredEntry("old", "old", -1, -1));
Transaction tx = EasyMock.createNiceMock(Transaction.class);
15 years, 10 months
JBoss Cache SVN: r7811 - in core/branches/flat/src/test/java/org/horizon/loader: decorators and 1 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-01 07:10:33 -0500 (Sun, 01 Mar 2009)
New Revision: 7811
Modified:
core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/ChainingCacheLoaderTest.java
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java
Log:
Updated tests to use unclosable streams
Modified: core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-01 12:05:53 UTC (rev 7810)
+++ core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-01 12:10:33 UTC (rev 7811)
@@ -2,6 +2,8 @@
import org.easymock.EasyMock;
import org.horizon.Cache;
+import org.horizon.io.UnclosableObjectInputStream;
+import org.horizon.io.UnclosableObjectOutputStream;
import org.horizon.loader.modifications.Clear;
import org.horizon.loader.modifications.Modification;
import org.horizon.loader.modifications.Remove;
@@ -250,13 +252,13 @@
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(out);
- cs.toStream(oos);
+ cs.toStream(new UnclosableObjectOutputStream(oos));
oos.flush();
oos.close();
out.close();
cs.clear();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(out.toByteArray()));
- cs.fromStream(ois);
+ cs.fromStream(new UnclosableObjectInputStream(ois));
Set<StoredEntry> set = cs.loadAll();
@@ -279,7 +281,7 @@
byte[] dummyEndBytes = {8, 7, 6, 5, 4, 3, 2, 1};
out.write(dummyStartBytes);
ObjectOutputStream oos = new ObjectOutputStream(out);
- cs.toStream(oos);
+ cs.toStream(new UnclosableObjectOutputStream(oos));
oos.flush();
oos.close();
out.write(dummyEndBytes);
@@ -292,7 +294,7 @@
int bytesRead = in.read(dummy, 0, 8);
assert bytesRead == 8;
for (int i = 1; i < 9; i++) assert dummy[i - 1] == i : "Start byte stream corrupted!";
- cs.fromStream(new ObjectInputStream(in));
+ cs.fromStream(new UnclosableObjectInputStream(new ObjectInputStream(in)));
bytesRead = in.read(dummy, 0, 8);
assert bytesRead == 8;
for (int i = 8; i > 0; i--) assert dummy[8 - i] == i : "Start byte stream corrupted!";
Modified: core/branches/flat/src/test/java/org/horizon/loader/decorators/ChainingCacheLoaderTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/decorators/ChainingCacheLoaderTest.java 2009-03-01 12:05:53 UTC (rev 7810)
+++ core/branches/flat/src/test/java/org/horizon/loader/decorators/ChainingCacheLoaderTest.java 2009-03-01 12:10:33 UTC (rev 7811)
@@ -1,6 +1,8 @@
package org.horizon.loader.decorators;
import org.easymock.EasyMock;
+import org.horizon.io.UnclosableObjectInputStream;
+import org.horizon.io.UnclosableObjectOutputStream;
import org.horizon.loader.BaseCacheStoreTest;
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.CacheLoaderException;
@@ -237,7 +239,7 @@
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(byteStream);
- cs.toStream(oos);
+ cs.toStream(new UnclosableObjectOutputStream(oos));
oos.close();
byteStream.close();
cs.clear();
@@ -249,7 +251,7 @@
assert !s.containsKey("k2");
}
- cs.fromStream(new ObjectInputStream(new ByteArrayInputStream(byteStream.toByteArray())));
+ cs.fromStream(new UnclosableObjectInputStream(new ObjectInputStream(new ByteArrayInputStream(byteStream.toByteArray()))));
assert cs.containsKey("k1");
assert cs.containsKey("k2");
Modified: core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java 2009-03-01 12:05:53 UTC (rev 7810)
+++ core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java 2009-03-01 12:10:33 UTC (rev 7811)
@@ -1,5 +1,6 @@
package org.horizon.loader.file;
+import org.horizon.io.UnclosableObjectOutputStream;
import org.horizon.loader.BaseCacheStoreTest;
import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
@@ -91,7 +92,7 @@
ByteArrayOutputStream out = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(out);
- cs.toStream(oos);
+ cs.toStream(new UnclosableObjectOutputStream(oos));
oos.flush();
oos.close();
out.close();
15 years, 10 months
JBoss Cache SVN: r7810 - in core/branches/flat/src: main/java/org/horizon/loader and 7 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-01 07:05:53 -0500 (Sun, 01 Mar 2009)
New Revision: 7810
Added:
core/branches/flat/src/main/java/org/horizon/io/UnclosableObjectInputStream.java
core/branches/flat/src/main/java/org/horizon/io/UnclosableObjectOutputStream.java
Modified:
core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java
core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java
core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java
core/branches/flat/src/test/java/org/horizon/marshall/ObjectStreamMarshaller.java
Log:
CacheStore stream API and Marshaller API to use ObjectInput and ObjectOutput interfaces rather than ObjectInputStream and ObjectOutputStream impls.
Also added UnclosableOIS/OOS impls.
Added: core/branches/flat/src/main/java/org/horizon/io/UnclosableObjectInputStream.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/io/UnclosableObjectInputStream.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/io/UnclosableObjectInputStream.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -0,0 +1,106 @@
+package org.horizon.io;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+
+/**
+ * A delegating {@link java.io.ObjectInput} that delegates all methods except {@link ObjectInput#close()}.
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public class UnclosableObjectInputStream implements ObjectInput {
+ private final ObjectInput delegate;
+
+ public UnclosableObjectInputStream(ObjectInput delegate) {
+ this.delegate = delegate;
+ }
+
+ public final Object readObject() throws ClassNotFoundException, IOException {
+ return delegate.readObject();
+ }
+
+ public final int read() throws IOException {
+ return delegate.read();
+ }
+
+ public final int read(byte[] b) throws IOException {
+ return delegate.read(b);
+ }
+
+ public final int read(byte[] b, int off, int len) throws IOException {
+ return delegate.read(b, off, len);
+ }
+
+ public final long skip(long n) throws IOException {
+ return delegate.skip(n);
+ }
+
+ public final int available() throws IOException {
+ return delegate.available();
+ }
+
+ public final void close() throws IOException {
+ throw new UnsupportedOperationException("close() is not supported in an UnclosableObjectInputStream!");
+ }
+
+ public final void readFully(byte[] b) throws IOException {
+ delegate.readFully(b);
+ }
+
+ public final void readFully(byte[] b, int off, int len) throws IOException {
+ delegate.readFully(b, off, len);
+ }
+
+ public final int skipBytes(int n) throws IOException {
+ return delegate.skipBytes(n);
+ }
+
+ public final boolean readBoolean() throws IOException {
+ return delegate.readBoolean();
+ }
+
+ public final byte readByte() throws IOException {
+ return delegate.readByte();
+ }
+
+ public final int readUnsignedByte() throws IOException {
+ return delegate.readUnsignedByte();
+ }
+
+ public final short readShort() throws IOException {
+ return delegate.readShort();
+ }
+
+ public final int readUnsignedShort() throws IOException {
+ return delegate.readUnsignedShort();
+ }
+
+ public final char readChar() throws IOException {
+ return delegate.readChar();
+ }
+
+ public final int readInt() throws IOException {
+ return delegate.readInt();
+ }
+
+ public final long readLong() throws IOException {
+ return delegate.readLong();
+ }
+
+ public final float readFloat() throws IOException {
+ return delegate.readFloat();
+ }
+
+ public final double readDouble() throws IOException {
+ return delegate.readDouble();
+ }
+
+ public final String readLine() throws IOException {
+ return delegate.readLine();
+ }
+
+ public final String readUTF() throws IOException {
+ return delegate.readUTF();
+ }
+}
Added: core/branches/flat/src/main/java/org/horizon/io/UnclosableObjectOutputStream.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/io/UnclosableObjectOutputStream.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/io/UnclosableObjectOutputStream.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -0,0 +1,88 @@
+package org.horizon.io;
+
+import java.io.IOException;
+import java.io.ObjectOutput;
+
+/**
+ * An unclosable version of an {@link java.io.ObjectOutput}. This delegates all methods except {@link #flush()} and
+ * {@link #close()}.
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public class UnclosableObjectOutputStream implements ObjectOutput {
+
+ private final ObjectOutput delegate;
+
+ public UnclosableObjectOutputStream(ObjectOutput delegate) {
+ this.delegate = delegate;
+ }
+
+ public final void writeObject(Object obj) throws IOException {
+ delegate.writeObject(obj);
+ }
+
+ public final void write(int b) throws IOException {
+ delegate.write(b);
+ }
+
+ public final void write(byte[] b) throws IOException {
+ delegate.write(b);
+ }
+
+ public final void write(byte[] b, int off, int len) throws IOException {
+ delegate.write(b, off, len);
+ }
+
+ public final void writeBoolean(boolean v) throws IOException {
+ delegate.writeBoolean(v);
+ }
+
+ public final void writeByte(int v) throws IOException {
+ delegate.writeByte(v);
+ }
+
+ public final void writeShort(int v) throws IOException {
+ delegate.writeShort(v);
+ }
+
+ public final void writeChar(int v) throws IOException {
+ delegate.writeChar(v);
+ }
+
+ public final void writeInt(int v) throws IOException {
+ delegate.writeInt(v);
+ }
+
+ public final void writeLong(long v) throws IOException {
+ delegate.writeLong(v);
+ }
+
+ public final void writeFloat(float v) throws IOException {
+ delegate.writeFloat(v);
+ }
+
+ public final void writeDouble(double v) throws IOException {
+ delegate.writeDouble(v);
+ }
+
+ public final void writeBytes(String s) throws IOException {
+ delegate.writeBytes(s);
+ }
+
+ public final void writeChars(String s) throws IOException {
+ delegate.writeChars(s);
+ }
+
+ public final void writeUTF(String str) throws IOException {
+ delegate.writeUTF(str);
+ }
+
+ public final void flush() throws IOException {
+ throw new UnsupportedOperationException("flush() not supported in an UnclosableObjectOutputStream!");
+ }
+
+ public final void close() throws IOException {
+ throw new UnsupportedOperationException("close() not supported in an UnclosableObjectOutputStream!");
+ }
+}
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -3,8 +3,8 @@
import org.horizon.loader.modifications.Modification;
import javax.transaction.Transaction;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.List;
import java.util.Set;
@@ -26,26 +26,26 @@
/**
* Writes contents of the stream to the store. Implementations should expect that the stream contains data in an
- * implementation-specific format, typically generated using {@link #toStream(java.io.ObjectOutputStream)}. While
- * not a requirement, it is recommended that implementations make use of the {@link org.horizon.marshall.Marshaller}
- * when dealing with the stream to make use of efficient marshalling.
+ * implementation-specific format, typically generated using {@link #toStream(java.io.ObjectOutput)}. While not a
+ * requirement, it is recommended that implementations make use of the {@link org.horizon.marshall.Marshaller} when
+ * dealing with the stream to make use of efficient marshalling.
* <p/>
* It is imperative that implementations <b><i>do not</i></b> close the stream after finishing with it.
* <p/>
* It is also <b><i>recommended</b></i> that implementations use their own start and end markers on the stream since
* other processes may write additional data to the stream after the cache store has written to it. As such, either
* markers or some other mechanism to prevent the store from reading too much information should be employed when
- * writing to the stream in {@link #fromStream(java.io.ObjectInputStream)} to prevent data corruption.
+ * writing to the stream in {@link #fromStream(java.io.ObjectInput)} to prevent data corruption.
* <p/>
*
* @param inputStream stream to read from
* @throws CacheLoaderException in the event of problems writing to the store
*/
- void fromStream(ObjectInputStream inputStream) throws CacheLoaderException;
+ void fromStream(ObjectInput inputStream) throws CacheLoaderException;
/**
* Loads the entire state into a stream, using whichever format is most efficient for the cache loader
- * implementation. Typically read and parsed by {@link #fromStream(java.io.ObjectInputStream)}.
+ * implementation. Typically read and parsed by {@link #fromStream(java.io.ObjectInput)}.
* <p/>
* While not a requirement, it is recommended that implementations make use of the {@link
* org.horizon.marshall.Marshaller} when dealing with the stream to make use of efficient marshalling.
@@ -55,13 +55,13 @@
* It is also <b><i>recommended</b></i> that implementations use their own start and end markers on the stream since
* other processes may write additional data to the stream after the cache store has written to it. As such, either
* markers or some other mechanism to prevent the store from reading too much information in {@link
- * #fromStream(java.io.ObjectInputStream)} should be employed, to prevent data corruption.
+ * #fromStream(java.io.ObjectInput)} should be employed, to prevent data corruption.
* <p/>
*
* @param outputStream stream to write to
* @throws CacheLoaderException in the event of problems reading from the store
*/
- void toStream(ObjectOutputStream outputStream) throws CacheLoaderException;
+ void toStream(ObjectOutput outputStream) throws CacheLoaderException;
/**
* Clears all entries in the store
Modified: core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -11,8 +11,8 @@
import org.horizon.marshall.Marshaller;
import org.horizon.util.concurrent.WithinThreadExecutor;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -126,7 +126,7 @@
}
}
- public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInput inputStream) throws CacheLoaderException {
try {
// first clear all local state
acquireGlobalLock(true);
@@ -137,7 +137,7 @@
}
}
- public void toStream(ObjectOutputStream outputStream) throws CacheLoaderException {
+ public void toStream(ObjectOutput outputStream) throws CacheLoaderException {
try {
acquireGlobalLock(true);
toStreamInternal(outputStream);
@@ -237,9 +237,9 @@
protected abstract Bucket loadBucket(String keyHashCode) throws CacheLoaderException;
- protected abstract void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException;
+ protected abstract void toStreamInternal(ObjectOutput oos) throws CacheLoaderException;
- protected abstract void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException;
+ protected abstract void fromStreamInternal(ObjectInput ois) throws CacheLoaderException;
protected abstract void clearInternal() throws CacheLoaderException;
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -10,8 +10,8 @@
import org.horizon.marshall.Marshaller;
import javax.transaction.Transaction;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.List;
import java.util.Set;
@@ -42,11 +42,11 @@
delegate.store(ed);
}
- public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInput inputStream) throws CacheLoaderException {
delegate.fromStream(inputStream);
}
- public void toStream(ObjectOutputStream outputStream) throws CacheLoaderException {
+ public void toStream(ObjectOutput outputStream) throws CacheLoaderException {
delegate.toStream(outputStream);
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -10,8 +10,8 @@
import org.horizon.marshall.Marshaller;
import javax.transaction.Transaction;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -39,7 +39,7 @@
for (CacheStore s : stores.keySet()) s.store(ed);
}
- public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInput inputStream) throws CacheLoaderException {
// loading and storing state via streams is *only* supported on the *first* store that has fetchPersistentState set.
for (Map.Entry<CacheStore, CacheLoaderConfig> e : stores.entrySet()) {
if (e.getValue().isFetchPersistentState()) {
@@ -50,7 +50,7 @@
}
}
- public void toStream(ObjectOutputStream outputStream) throws CacheLoaderException {
+ public void toStream(ObjectOutput outputStream) throws CacheLoaderException {
// loading and storing state via streams is *only* supported on the *first* store that has fetchPersistentState set.
for (Map.Entry<CacheStore, CacheLoaderConfig> e : stores.entrySet()) {
if (e.getValue().isFetchPersistentState()) {
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -5,7 +5,7 @@
import org.horizon.loader.modifications.Modification;
import javax.transaction.Transaction;
-import java.io.ObjectInputStream;
+import java.io.ObjectInput;
import java.util.List;
/**
@@ -27,7 +27,7 @@
}
@Override
- public void fromStream(ObjectInputStream inputStream) {
+ public void fromStream(ObjectInput inputStream) {
// no-op
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -17,7 +17,7 @@
import org.horizon.remoting.transport.Address;
import javax.transaction.Transaction;
-import java.io.ObjectInputStream;
+import java.io.ObjectInput;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -112,7 +112,7 @@
}
@Override
- public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInput inputStream) throws CacheLoaderException {
if (active) super.fromStream(inputStream);
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -11,14 +11,7 @@
import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.*;
import java.util.HashSet;
import java.util.Set;
@@ -81,7 +74,7 @@
return result;
}
- protected void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException {
+ protected void fromStreamInternal(ObjectInput ois) throws CacheLoaderException {
try {
int numFiles = ois.readInt();
byte[] buffer = new byte[streamBufferSize];
@@ -111,7 +104,7 @@
}
}
- protected void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException {
+ protected void toStreamInternal(ObjectOutput oos) throws CacheLoaderException {
try {
File[] files = root.listFiles();
oos.writeInt(files.length);
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -13,8 +13,8 @@
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -190,7 +190,7 @@
}
}
- protected void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException {
+ protected void fromStreamInternal(ObjectInput ois) throws CacheLoaderException {
Connection conn = null;
PreparedStatement ps = null;
try {
@@ -233,7 +233,7 @@
}
}
- protected void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException {
+ protected void toStreamInternal(ObjectOutput oos) throws CacheLoaderException {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
@@ -375,13 +375,15 @@
ps.addBatch();
deletionCount++;
if (deletionCount % batchSize == 0) {
- if (log.isTraceEnabled()) log.trace("Flushing deletion batch, total deletion count so far is " + deletionCount);
+ if (log.isTraceEnabled())
+ log.trace("Flushing deletion batch, total deletion count so far is " + deletionCount);
ps.executeBatch();
}
}
if (deletionCount % batchSize != 0) {
int[] batchResult = ps.executeBatch();
- if (log.isTraceEnabled()) log.trace("Flushed the batch and received following results: " + Arrays.toString(batchResult));
+ if (log.isTraceEnabled())
+ log.trace("Flushed the batch and received following results: " + Arrays.toString(batchResult));
}
} catch (SQLException ex) {
//if something happens make sure buckets locks are being release
Modified: core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -40,7 +40,8 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Array;
@@ -111,7 +112,7 @@
return bytes;
}
- protected void marshallObject(Object o, ObjectOutputStream out, Map<Object, Integer> refMap) throws IOException {
+ protected void marshallObject(Object o, ObjectOutput out, Map<Object, Integer> refMap) throws IOException {
if (o != null && o.getClass().isArray() && isKnownType(o.getClass().getComponentType())) {
marshallArray(o, out, refMap);
} else {
@@ -202,12 +203,12 @@
}
- protected void marshallString(String s, ObjectOutputStream out) throws IOException {
+ protected void marshallString(String s, ObjectOutput out) throws IOException {
//StringUtil.saveString(out, s);
out.writeObject(s);
}
- private void marshallCommand(ReplicableCommand command, ObjectOutputStream out, Map<Object, Integer> refMap) throws IOException {
+ private void marshallCommand(ReplicableCommand command, ObjectOutput out, Map<Object, Integer> refMap) throws IOException {
out.writeShort(command.getCommandId());
Object[] args = command.getParameters();
byte numArgs = (byte) (args == null ? 0 : args.length);
@@ -224,17 +225,17 @@
return reference;
}
- private void marshallGlobalTransaction(GlobalTransaction globalTransaction, ObjectOutputStream out, Map<Object, Integer> refMap) throws IOException {
+ private void marshallGlobalTransaction(GlobalTransaction globalTransaction, ObjectOutput out, Map<Object, Integer> refMap) throws IOException {
out.writeLong(globalTransaction.getId());
marshallObject(globalTransaction.getAddress(), out, refMap);
}
- private void marshallJGroupsAddress(JGroupsAddress address, ObjectOutputStream out) throws IOException {
+ private void marshallJGroupsAddress(JGroupsAddress address, ObjectOutput out) throws IOException {
address.writeExternal(out);
}
@SuppressWarnings("unchecked")
- private void marshallCollection(Collection c, ObjectOutputStream out, Map refMap) throws IOException {
+ private void marshallCollection(Collection c, ObjectOutput out, Map refMap) throws IOException {
writeUnsignedInt(out, c.size());
for (Object o : c) {
marshallObject(o, out, refMap);
@@ -242,7 +243,7 @@
}
@SuppressWarnings("unchecked")
- private void marshallMap(Map map, ObjectOutputStream out, Map<Object, Integer> refMap) throws IOException {
+ private void marshallMap(Map map, ObjectOutput out, Map<Object, Integer> refMap) throws IOException {
int mapSize = map.size();
writeUnsignedInt(out, mapSize);
if (mapSize == 0) return;
@@ -255,7 +256,7 @@
// --------- Unmarshalling methods
- protected Object unmarshallObject(ObjectInputStream in, ClassLoader loader, UnmarshalledReferences refMap, boolean overrideContextClassloaderOnThread) throws IOException, ClassNotFoundException {
+ protected Object unmarshallObject(ObjectInput in, ClassLoader loader, UnmarshalledReferences refMap, boolean overrideContextClassloaderOnThread) throws IOException, ClassNotFoundException {
if (loader == null) {
return unmarshallObject(in, refMap);
} else {
@@ -272,7 +273,7 @@
}
}
- protected Object unmarshallObject(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ protected Object unmarshallObject(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
byte magicNumber = in.readByte();
int reference = 0;
Object retVal;
@@ -346,17 +347,17 @@
throw new IOException("Unknown magic number " + magicNumber);
}
- private FastCopyHashMap unmarshallFastCopyHashMap(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private FastCopyHashMap unmarshallFastCopyHashMap(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
FastCopyHashMap map = new FastCopyHashMap();
populateFromStream(in, refMap, map);
return map;
}
- protected String unmarshallString(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ protected String unmarshallString(ObjectInput in) throws IOException, ClassNotFoundException {
return (String) in.readObject();
}
- private ReplicableCommand unmarshallCommand(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private ReplicableCommand unmarshallCommand(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
short methodId = in.readShort();
byte numArgs = in.readByte();
Object[] args = null;
@@ -370,7 +371,7 @@
}
- private GlobalTransaction unmarshallGlobalTransaction(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private GlobalTransaction unmarshallGlobalTransaction(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
GlobalTransaction gtx = new GlobalTransaction();
long id = in.readLong();
Object address = unmarshallObject(in, refMap);
@@ -379,74 +380,74 @@
return gtx;
}
- private JGroupsAddress unmarshallJGroupsAddress(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ private JGroupsAddress unmarshallJGroupsAddress(ObjectInput in) throws IOException, ClassNotFoundException {
JGroupsAddress address = new JGroupsAddress();
address.readExternal(in);
return address;
}
- private List unmarshallArrayList(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private List unmarshallArrayList(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
int listSize = readUnsignedInt(in);
List list = new ArrayList(listSize);
populateFromStream(in, refMap, list, listSize);
return list;
}
- private List unmarshallLinkedList(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private List unmarshallLinkedList(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
List list = new LinkedList();
populateFromStream(in, refMap, list, readUnsignedInt(in));
return list;
}
- private List unmarshallSingletonList(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private List unmarshallSingletonList(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
return Collections.singletonList(unmarshallObject(in, refMap));
}
- private Map unmarshallHashMap(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private Map unmarshallHashMap(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
Map map = new HashMap();
populateFromStream(in, refMap, map);
return map;
}
@SuppressWarnings("unchecked")
- private Map unmarshallMapCopy(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private Map unmarshallMapCopy(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
// read in as a HashMap first
Map m = unmarshallHashMap(in, refMap);
return Immutables.immutableMapWrap(m);
}
- private Map unmarshallTreeMap(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private Map unmarshallTreeMap(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
Map map = new TreeMap();
populateFromStream(in, refMap, map);
return map;
}
- private Set unmarshallHashSet(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private Set unmarshallHashSet(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
Set set = new HashSet();
populateFromStream(in, refMap, set);
return set;
}
- private Set unmarshallTreeSet(ObjectInputStream in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
+ private Set unmarshallTreeSet(ObjectInput in, UnmarshalledReferences refMap) throws IOException, ClassNotFoundException {
Set set = new TreeSet();
populateFromStream(in, refMap, set);
return set;
}
@SuppressWarnings("unchecked")
- private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap, Map mapToPopulate) throws IOException, ClassNotFoundException {
+ private void populateFromStream(ObjectInput in, UnmarshalledReferences refMap, Map mapToPopulate) throws IOException, ClassNotFoundException {
int size = readUnsignedInt(in);
for (int i = 0; i < size; i++) mapToPopulate.put(unmarshallObject(in, refMap), unmarshallObject(in, refMap));
}
@SuppressWarnings("unchecked")
- private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap, Set setToPopulate) throws IOException, ClassNotFoundException {
+ private void populateFromStream(ObjectInput in, UnmarshalledReferences refMap, Set setToPopulate) throws IOException, ClassNotFoundException {
int size = readUnsignedInt(in);
for (int i = 0; i < size; i++) setToPopulate.add(unmarshallObject(in, refMap));
}
@SuppressWarnings("unchecked")
- private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap, List listToPopulate, int listSize) throws IOException, ClassNotFoundException {
+ private void populateFromStream(ObjectInput in, UnmarshalledReferences refMap, List listToPopulate, int listSize) throws IOException, ClassNotFoundException {
for (int i = 0; i < listSize; i++) listToPopulate.add(unmarshallObject(in, refMap));
}
@@ -459,7 +460,7 @@
* @throws java.io.IOException propagated from OOS
* @see <a href="http://jira.jboss.org/jira/browse/JBCACHE-1211">JBCACHE-1211</a>
*/
- protected void writeReference(ObjectOutputStream out, int reference) throws IOException {
+ protected void writeReference(ObjectOutput out, int reference) throws IOException {
writeUnsignedInt(out, reference);
}
@@ -472,7 +473,7 @@
* @throws java.io.IOException propagated from OUS
* @see <a href="http://jira.jboss.org/jira/browse/JBCACHE-1211">JBCACHE-1211</a>
*/
- protected int readReference(ObjectInputStream in) throws IOException {
+ protected int readReference(ObjectInput in) throws IOException {
return readUnsignedInt(in);
}
@@ -480,7 +481,7 @@
* Reads an int stored in variable-length format. Reads between one and five bytes. Smaller values take fewer
* bytes. Negative numbers are not supported.
*/
- protected int readUnsignedInt(ObjectInputStream in) throws IOException {
+ protected int readUnsignedInt(ObjectInput in) throws IOException {
byte b = in.readByte();
int i = b & 0x7F;
for (int shift = 7; (b & 0x80) != 0; shift += 7) {
@@ -496,7 +497,7 @@
*
* @param i int to write
*/
- protected void writeUnsignedInt(ObjectOutputStream out, int i) throws IOException {
+ protected void writeUnsignedInt(ObjectOutput out, int i) throws IOException {
while ((i & ~0x7F) != 0) {
out.writeByte((byte) ((i & 0x7f) | 0x80));
i >>>= 7;
@@ -509,7 +510,7 @@
* Reads an int stored in variable-length format. Reads between one and nine bytes. Smaller values take fewer
* bytes. Negative numbers are not supported.
*/
- protected long readUnsignedLong(ObjectInputStream in) throws IOException {
+ protected long readUnsignedLong(ObjectInput in) throws IOException {
byte b = in.readByte();
long i = b & 0x7F;
for (int shift = 7; (b & 0x80) != 0; shift += 7) {
@@ -525,7 +526,7 @@
*
* @param i int to write
*/
- protected void writeUnsignedLong(ObjectOutputStream out, long i) throws IOException {
+ protected void writeUnsignedLong(ObjectOutput out, long i) throws IOException {
while ((i & ~0x7F) != 0) {
out.writeByte((byte) ((i & 0x7f) | 0x80));
i >>>= 7;
@@ -533,7 +534,7 @@
out.writeByte((byte) i);
}
- protected Object unmarshallArray(ObjectInputStream in, UnmarshalledReferences refs) throws IOException, ClassNotFoundException {
+ protected Object unmarshallArray(ObjectInput in, UnmarshalledReferences refs) throws IOException, ClassNotFoundException {
int sz = readUnsignedInt(in);
byte type = in.readByte();
switch (type) {
@@ -650,7 +651,7 @@
}
}
- protected void marshallArray(Object o, ObjectOutputStream out, Map<Object, Integer> refMap) throws IOException {
+ protected void marshallArray(Object o, ObjectOutput out, Map<Object, Integer> refMap) throws IOException {
out.writeByte(MAGICNUMBER_ARRAY);
Class arrayTypeClass = o.getClass().getComponentType();
int sz = Array.getLength(o);
@@ -726,7 +727,7 @@
c.equals(Double.class));
}
- public void objectToObjectStream(Object o, ObjectOutputStream out) throws IOException {
+ public void objectToObjectStream(Object o, ObjectOutput out) throws IOException {
Map<Object, Integer> refMap = useRefs ? new IdentityHashMap<Object, Integer>() : null;
ClassLoader toUse = defaultClassLoader;
Thread current = Thread.currentThread();
@@ -742,7 +743,7 @@
}
}
- public Object objectFromObjectStream(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ public Object objectFromObjectStream(ObjectInput in) throws IOException, ClassNotFoundException {
UnmarshalledReferences refMap = useRefs ? new UnmarshalledReferences() : null;
Object retValue = unmarshallObject(in, defaultClassLoader, refMap, false);
if (trace) log.trace("Unmarshalled object " + retValue);
@@ -755,7 +756,7 @@
public ByteBuffer objectToBuffer(Object o) throws IOException {
ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(128);
- ObjectOutputStream out = new ObjectOutputStream(baos);
+ ObjectOutput out = new ObjectOutputStream(baos);
//now marshall the contents of the object
objectToObjectStream(o, out);
@@ -765,7 +766,7 @@
}
public Object objectFromByteBuffer(byte[] buf, int offset, int length) throws IOException, ClassNotFoundException {
- ObjectInputStream in = new MarshalledValueInputStream(new ByteArrayInputStream(buf, offset, length));
+ ObjectInput in = new MarshalledValueInputStream(new ByteArrayInputStream(buf, offset, length));
return objectFromObjectStream(in);
}
Modified: core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/marshall/Marshaller.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -27,17 +27,16 @@
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
/**
* A marshaller is a class that is able to marshall and unmarshall objects efficiently.
* <p/>
- * The reason why this is implemented specially in Horizon rather than resorting to Java serialization or even the
- * more efficient JBoss serialization is that a lot of efficiency can be gained when a majority of the serialization
- * that occurs has to do with a small set of known types such as {@link org.horizon.transaction.GlobalTransaction} or
- * {@link org.horizon.commands.ReplicableCommand}, and class type information can be replaced with simple magic
- * numbers.
+ * The reason why this is implemented specially in Horizon rather than resorting to Java serialization or even the more
+ * efficient JBoss serialization is that a lot of efficiency can be gained when a majority of the serialization that
+ * occurs has to do with a small set of known types such as {@link org.horizon.transaction.GlobalTransaction} or {@link
+ * org.horizon.commands.ReplicableCommand}, and class type information can be replaced with simple magic numbers.
* <p/>
* Unknown types (typically user data) falls back to JBoss serialization.
* <p/>
@@ -58,19 +57,19 @@
@Scope(Scopes.GLOBAL)
public interface Marshaller {
/**
- * Marshalls an object to a given {@link java.io.ObjectOutputStream}
+ * Marshalls an object to a given {@link java.io.ObjectOutput}
*
* @param obj object to marshall
* @param out stream to marshall to
*/
- void objectToObjectStream(Object obj, ObjectOutputStream out) throws IOException;
+ void objectToObjectStream(Object obj, ObjectOutput out) throws IOException;
/**
- * Unmarshalls an object from an {@link java.io.ObjectInputStream}
+ * Unmarshalls an object from an {@link java.io.ObjectInput}
*
* @param in stream to unmarshall from
*/
- Object objectFromObjectStream(ObjectInputStream in) throws IOException, ClassNotFoundException;
+ Object objectFromObjectStream(ObjectInput in) throws IOException, ClassNotFoundException;
/**
* Unmarshalls an object from an {@link java.io.InputStream}
Modified: core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/main/java/org/horizon/marshall/VersionAwareMarshaller.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -32,7 +32,9 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInput;
import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
/**
@@ -108,13 +110,13 @@
return defaultMarshaller.objectFromObjectStream(in);
}
- public void objectToObjectStream(Object obj, ObjectOutputStream out) throws IOException {
+ public void objectToObjectStream(Object obj, ObjectOutput out) throws IOException {
out.writeShort(VERSION_100);
log.trace("Wrote version {0}", VERSION_100);
defaultMarshaller.objectToObjectStream(obj, out);
}
- public Object objectFromObjectStream(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ public Object objectFromObjectStream(ObjectInput in) throws IOException, ClassNotFoundException {
int versionId;
try {
versionId = in.readShort();
Modified: core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -11,8 +11,8 @@
import org.horizon.marshall.Marshaller;
import org.horizon.marshall.ObjectStreamMarshaller;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@@ -34,7 +34,7 @@
}
@SuppressWarnings("unchecked")
- public void fromStream(ObjectInputStream ois) throws CacheLoaderException {
+ public void fromStream(ObjectInput ois) throws CacheLoaderException {
try {
int numEntries = (Integer) marshaller.objectFromObjectStream(ois);
store.clear();
@@ -47,7 +47,7 @@
}
}
- public void toStream(ObjectOutputStream oos) throws CacheLoaderException {
+ public void toStream(ObjectOutput oos) throws CacheLoaderException {
try {
marshaller.objectToObjectStream(store.size(), oos);
for (StoredEntry se : store.values()) marshaller.objectToObjectStream(se, oos);
Modified: core/branches/flat/src/test/java/org/horizon/marshall/ObjectStreamMarshaller.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/marshall/ObjectStreamMarshaller.java 2009-03-01 11:33:40 UTC (rev 7809)
+++ core/branches/flat/src/test/java/org/horizon/marshall/ObjectStreamMarshaller.java 2009-03-01 12:05:53 UTC (rev 7810)
@@ -6,7 +6,9 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInput;
import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
/**
@@ -15,11 +17,11 @@
* @author Manik Surtani
*/
public class ObjectStreamMarshaller implements Marshaller {
- public void objectToObjectStream(Object obj, ObjectOutputStream out) throws IOException {
+ public void objectToObjectStream(Object obj, ObjectOutput out) throws IOException {
out.writeObject(obj);
}
- public Object objectFromObjectStream(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ public Object objectFromObjectStream(ObjectInput in) throws IOException, ClassNotFoundException {
return in.readObject();
}
15 years, 10 months
JBoss Cache SVN: r7809 - in core/branches/flat/src: main/java/org/horizon/loader/bucket and 5 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-01 06:33:40 -0500 (Sun, 01 Mar 2009)
New Revision: 7809
Modified:
core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java
Log:
Changed CacheStore stream API, added javadocs
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-28 00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-03-01 11:33:40 UTC (rev 7809)
@@ -3,8 +3,8 @@
import org.horizon.loader.modifications.Modification;
import javax.transaction.Transaction;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Set;
@@ -26,40 +26,42 @@
/**
* Writes contents of the stream to the store. Implementations should expect that the stream contains data in an
- * implementation-specific format, typically generated using {@link #toStream(java.io.OutputStream)}. While not a
- * requirement, it is recommended that implementations make use of the {@link org.horizon.marshall.Marshaller} when
- * dealing with the stream to make use of efficient marshalling.
- * <p />
+ * implementation-specific format, typically generated using {@link #toStream(java.io.ObjectOutputStream)}. While
+ * not a requirement, it is recommended that implementations make use of the {@link org.horizon.marshall.Marshaller}
+ * when dealing with the stream to make use of efficient marshalling.
+ * <p/>
* It is imperative that implementations <b><i>do not</i></b> close the stream after finishing with it.
- * <p />
- * It is also <b><i>recommended</b></i> that implementations use their own start and end markers on the stream
- * since other processes may write additional data to the stream after the cache store has written to it. As such,
- * either markers or some other mechanism to prevent the store from reading too much information should be employed
- * when writing to the stream in {@link #fromStream(java.io.InputStream)} to prevent data corruption.
- * <p />
+ * <p/>
+ * It is also <b><i>recommended</b></i> that implementations use their own start and end markers on the stream since
+ * other processes may write additional data to the stream after the cache store has written to it. As such, either
+ * markers or some other mechanism to prevent the store from reading too much information should be employed when
+ * writing to the stream in {@link #fromStream(java.io.ObjectInputStream)} to prevent data corruption.
+ * <p/>
+ *
* @param inputStream stream to read from
* @throws CacheLoaderException in the event of problems writing to the store
*/
- void fromStream(InputStream inputStream) throws CacheLoaderException;
+ void fromStream(ObjectInputStream inputStream) throws CacheLoaderException;
/**
* Loads the entire state into a stream, using whichever format is most efficient for the cache loader
- * implementation. Typically read and parsed by {@link #fromStream(java.io.InputStream)}.
+ * implementation. Typically read and parsed by {@link #fromStream(java.io.ObjectInputStream)}.
* <p/>
* While not a requirement, it is recommended that implementations make use of the {@link
* org.horizon.marshall.Marshaller} when dealing with the stream to make use of efficient marshalling.
- * <p />
- * It is imperative that implementations <b><i>do not</i></b> close the stream after finishing with it.
- * <p />
- * It is also <b><i>recommended</b></i> that implementations use their own start and end markers on the stream
- * since other processes may write additional data to the stream after the cache store has written to it. As such,
- * either markers or some other mechanism to prevent the store from reading too much information in {@link #fromStream(java.io.InputStream)}
- * should be employed, to prevent data corruption.
- * <p />
+ * <p/>
+ * It is imperative that implementations <b><i>do not</i></b> flush or close the stream after finishing with it.
+ * <p/>
+ * It is also <b><i>recommended</b></i> that implementations use their own start and end markers on the stream since
+ * other processes may write additional data to the stream after the cache store has written to it. As such, either
+ * markers or some other mechanism to prevent the store from reading too much information in {@link
+ * #fromStream(java.io.ObjectInputStream)} should be employed, to prevent data corruption.
+ * <p/>
+ *
* @param outputStream stream to write to
* @throws CacheLoaderException in the event of problems reading from the store
*/
- void toStream(OutputStream outputStream) throws CacheLoaderException;
+ void toStream(ObjectOutputStream outputStream) throws CacheLoaderException;
/**
* Clears all entries in the store
@@ -104,7 +106,17 @@
void prepare(List<? extends Modification> modifications, Transaction tx, boolean isOnePhase) throws CacheLoaderException;
/**
- * Commits a transaction that has been previously prepared
+ * Commits a transaction that has been previously prepared.
+ * <p/>
+ * This method <i>may</b> be invoked on a transaction for which there is <i>no</i> prior {@link
+ * #prepare(java.util.List, javax.transaction.Transaction, boolean)}. The implementation would need to deal with
+ * this case acordingly. Typically, this would be a no-op, after ensuring any resources attached to the transaction
+ * are cleared up.
+ * <p/>
+ * Also note that this method <i>may</i> be invoked on a thread which is different from the {@link
+ * #prepare(java.util.List, javax.transaction.Transaction, boolean)} invocation. As such, {@link ThreadLocal}s
+ * should not be relied upon to maintain transaction context.
+ * <p/>
*
* @param tx tx to commit
* @throws CacheLoaderException in the event of problems writing to the store
@@ -113,6 +125,16 @@
/**
* Rolls back a transaction that has been previously prepared
+ * <p/>
+ * This method <i>may</b> be invoked on a transaction for which there is <i>no</i> prior {@link
+ * #prepare(java.util.List, javax.transaction.Transaction, boolean)}. The implementation would need to deal with
+ * this case acordingly. Typically, this would be a no-op, after ensuring any resources attached to the transaction
+ * are cleared up.
+ * <p/>
+ * Also note that this method <i>may</i> be invoked on a thread which is different from the {@link
+ * #prepare(java.util.List, javax.transaction.Transaction, boolean)} invocation. As such, {@link ThreadLocal}s
+ * should not be relied upon to maintain transaction context.
+ * <p/>
*
* @param tx tx to roll back
*/
Modified: core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-02-28 00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-03-01 11:33:40 UTC (rev 7809)
@@ -3,24 +3,21 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.horizon.Cache;
-import org.horizon.util.concurrent.WithinThreadExecutor;
import org.horizon.loader.AbstractCacheStore;
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.StoredEntry;
import org.horizon.lock.StripedLock;
import org.horizon.marshall.Marshaller;
+import org.horizon.util.concurrent.WithinThreadExecutor;
-import java.util.concurrent.TimeUnit;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.ObjectOutputStream;
/**
* //TODO comment this
@@ -129,40 +126,23 @@
}
}
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
- ObjectInputStream ois = null;
+ public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
try {
// first clear all local state
acquireGlobalLock(true);
clear();
- ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream) inputStream :
- new ObjectInputStream(inputStream);
- fromStreamInternal(ois);
- }
- catch (IOException e) {
- throw new CacheLoaderException("Cannot convert to ObjectInputSream", e);
+ fromStreamInternal(inputStream);
} finally {
releaseGlobalLock(true);
- // we should close the stream we created!
- if (inputStream != ois) safeClose(ois);
}
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
- ObjectOutputStream oos = null;
+ public void toStream(ObjectOutputStream outputStream) throws CacheLoaderException {
try {
acquireGlobalLock(true);
- try {
- oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream) outputStream :
- new ObjectOutputStream(outputStream);
- } catch (IOException e) {
- throw new CacheLoaderException(e);
- }
- toStreamInternal(oos);
+ toStreamInternal(outputStream);
} finally {
releaseGlobalLock(true);
- // we should close the stream we created!
- if (oos != outputStream) safeClose(oos);
}
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-02-28 00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-03-01 11:33:40 UTC (rev 7809)
@@ -10,8 +10,8 @@
import org.horizon.marshall.Marshaller;
import javax.transaction.Transaction;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Set;
@@ -42,11 +42,11 @@
delegate.store(ed);
}
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
delegate.fromStream(inputStream);
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
+ public void toStream(ObjectOutputStream outputStream) throws CacheLoaderException {
delegate.toStream(outputStream);
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java 2009-02-28 00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java 2009-03-01 11:33:40 UTC (rev 7809)
@@ -10,8 +10,8 @@
import org.horizon.marshall.Marshaller;
import javax.transaction.Transaction;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -39,7 +39,7 @@
for (CacheStore s : stores.keySet()) s.store(ed);
}
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
// loading and storing state via streams is *only* supported on the *first* store that has fetchPersistentState set.
for (Map.Entry<CacheStore, CacheLoaderConfig> e : stores.entrySet()) {
if (e.getValue().isFetchPersistentState()) {
@@ -50,7 +50,7 @@
}
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
+ public void toStream(ObjectOutputStream outputStream) throws CacheLoaderException {
// loading and storing state via streams is *only* supported on the *first* store that has fetchPersistentState set.
for (Map.Entry<CacheStore, CacheLoaderConfig> e : stores.entrySet()) {
if (e.getValue().isFetchPersistentState()) {
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java 2009-02-28 00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java 2009-03-01 11:33:40 UTC (rev 7809)
@@ -5,7 +5,7 @@
import org.horizon.loader.modifications.Modification;
import javax.transaction.Transaction;
-import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.util.List;
/**
@@ -27,7 +27,7 @@
}
@Override
- public void fromStream(InputStream inputStream) {
+ public void fromStream(ObjectInputStream inputStream) {
// no-op
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-02-28 00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-03-01 11:33:40 UTC (rev 7809)
@@ -17,7 +17,7 @@
import org.horizon.remoting.transport.Address;
import javax.transaction.Transaction;
-import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -112,7 +112,7 @@
}
@Override
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
if (active) super.fromStream(inputStream);
}
Modified: core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-02-28 00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-01 11:33:40 UTC (rev 7809)
@@ -17,6 +17,8 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
@@ -247,10 +249,14 @@
cs.store(new StoredEntry("k3", "v3", -1, -1));
ByteArrayOutputStream out = new ByteArrayOutputStream();
- cs.toStream(out);
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ cs.toStream(oos);
+ oos.flush();
+ oos.close();
out.close();
cs.clear();
- cs.fromStream(new ByteArrayInputStream(out.toByteArray()));
+ ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(out.toByteArray()));
+ cs.fromStream(ois);
Set<StoredEntry> set = cs.loadAll();
@@ -269,10 +275,13 @@
cs.store(new StoredEntry("k3", "v3", -1, -1));
ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] dummyStartBytes = {1,2,3,4,5,6,7,8};
- byte[] dummyEndBytes = {8,7,6,5,4,3,2,1};
+ byte[] dummyStartBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+ byte[] dummyEndBytes = {8, 7, 6, 5, 4, 3, 2, 1};
out.write(dummyStartBytes);
- cs.toStream(out);
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ cs.toStream(oos);
+ oos.flush();
+ oos.close();
out.write(dummyEndBytes);
out.close();
cs.clear();
@@ -282,11 +291,11 @@
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
int bytesRead = in.read(dummy, 0, 8);
assert bytesRead == 8;
- for (int i=1; i<9; i++) assert dummy[i - 1] == i : "Start byte stream corrupted!";
- cs.fromStream(in);
+ for (int i = 1; i < 9; i++) assert dummy[i - 1] == i : "Start byte stream corrupted!";
+ cs.fromStream(new ObjectInputStream(in));
bytesRead = in.read(dummy, 0, 8);
assert bytesRead == 8;
- for (int i=8; i>0; i--) assert dummy[8 - i] == i : "Start byte stream corrupted!";
+ for (int i = 8; i > 0; i--) assert dummy[8 - i] == i : "Start byte stream corrupted!";
Set<StoredEntry> set = cs.loadAll();
Modified: core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java 2009-02-28 00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java 2009-03-01 11:33:40 UTC (rev 7809)
@@ -6,8 +6,6 @@
import org.horizon.loader.StoredEntry;
import org.testng.annotations.Test;
-import java.io.InputStream;
-
@Test(groups = "unit", testName = "loader.decorators.ReadOnlyCacheStoreTest")
public class ReadOnlyCacheStoreTest {
public void testWriteMethods() throws CacheLoaderException {
@@ -21,8 +19,8 @@
store.clear();
store.purgeExpired();
store.remove("key");
- store.store((StoredEntry) null);
- store.fromStream((InputStream) null);
+ store.store(null);
+ store.fromStream(null);
store.prepare(null, null, true);
store.commit(null);
store.rollback(null);
Modified: core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java 2009-02-28 00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java 2009-03-01 11:33:40 UTC (rev 7809)
@@ -11,10 +11,8 @@
import org.horizon.marshall.Marshaller;
import org.horizon.marshall.ObjectStreamMarshaller;
-import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.OutputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@@ -36,11 +34,8 @@
}
@SuppressWarnings("unchecked")
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInputStream ois) throws CacheLoaderException {
try {
- ObjectInputStream ois = inputStream instanceof ObjectInputStream ? (ObjectInputStream) inputStream :
- new ObjectInputStream(inputStream);
-
int numEntries = (Integer) marshaller.objectFromObjectStream(ois);
store.clear();
for (int i = 0; i < numEntries; i++) {
@@ -52,10 +47,8 @@
}
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
+ public void toStream(ObjectOutputStream oos) throws CacheLoaderException {
try {
- ObjectOutputStream oos = outputStream instanceof ObjectOutputStream ? (ObjectOutputStream) outputStream :
- new ObjectOutputStream(outputStream);
marshaller.objectToObjectStream(store.size(), oos);
for (StoredEntry se : store.values()) marshaller.objectToObjectStream(se, oos);
} catch (Exception e) {
Modified: core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java 2009-02-28 00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java 2009-03-01 11:33:40 UTC (rev 7809)
@@ -15,6 +15,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
@Test(groups = "unit", testName = "loader.file.FileCacheStoreTest")
public class FileCacheStoreTest extends BaseCacheStoreTest {
@@ -89,8 +90,10 @@
cs.store(new StoredEntry("k1", "v1", -1, -1));
ByteArrayOutputStream out = new ByteArrayOutputStream();
- cs.toStream(out);
- out.flush();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ cs.toStream(oos);
+ oos.flush();
+ oos.close();
out.close();
ObjectInputStream ois = null;
15 years, 10 months