Author: manik.surtani(a)jboss.com
Date: 2009-02-18 12:13:31 -0500 (Wed, 18 Feb 2009)
New Revision: 7724
Added:
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderException.java
Modified:
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/horizon/interceptors/ActivationInterceptor.java
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JDBCCacheStore.java
core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java
core/branches/flat/src/test/java/org/horizon/loader/PassivationFunctionalTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/ChainingCacheLoaderTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java
Log:
File cache loader is complete
Modified:
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -21,8 +21,10 @@
*/
package org.horizon.container;
+import org.horizon.CacheException;
import org.horizon.factories.annotations.Inject;
import org.horizon.factories.annotations.Stop;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheLoaderManager;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
@@ -59,13 +61,17 @@
private void expire(Object key) {
expirableData.remove(key);
- expireOnCacheLoader(key);
+ expireOnCacheStore(key);
}
- private void expireOnCacheLoader(Object key) {
+ private void expireOnCacheStore(Object key) {
if (cacheStore == null && clm != null) cacheStore = clm.getCacheStore();
if (cacheStore != null) {
- cacheStore.remove(key);
+ try {
+ cacheStore.remove(key);
+ } catch (CacheLoaderException e) {
+ throw new CacheException("Unable to expire entry in cache store",
e);
+ }
}
}
@@ -193,7 +199,7 @@
Map.Entry<Object, ExpirableCachedValue> entry = iter.next();
ExpirableCachedValue cv = entry.getValue();
if (cv.isExpired()) {
- expireOnCacheLoader(entry.getKey());
+ expireOnCacheStore(entry.getKey());
purged.add(entry.getKey());
iter.remove();
}
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/ActivationInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/ActivationInterceptor.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/ActivationInterceptor.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -10,6 +10,7 @@
import org.horizon.factories.annotations.Start;
import org.horizon.jmx.annotations.ManagedAttribute;
import org.horizon.jmx.annotations.ManagedOperation;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
import java.util.concurrent.atomic.AtomicLong;
@@ -64,7 +65,7 @@
return retval;
}
- private void removeFromStore(Object... keys) {
+ private void removeFromStore(Object... keys) throws CacheLoaderException {
for (Object k : keys) store.remove(k);
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -1,10 +1,15 @@
package org.horizon.loader;
/**
- * // TODO: Manik: Document this!
+ * An abstract {@link org.horizon.loader.CacheLoader} that holds common implementations
for some methods
*
* @author Manik Surtani
* @since 1.0
*/
public abstract class AbstractCacheLoader implements CacheLoader {
+
+ public boolean containsKey(Object key) throws CacheLoaderException {
+ return load(key) != null;
+ }
+
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java 2009-02-18
16:06:10 UTC (rev 7723)
+++ core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -5,13 +5,15 @@
import org.horizon.loader.modifications.Store;
import javax.transaction.Transaction;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
- * // TODO: Manik: Document this!
+ * An abstract {@link org.horizon.loader.CacheStore} that holds common implementations
for some methods
*
* @author Manik Surtani
* @since 1.0
@@ -20,7 +22,7 @@
private final Map<Transaction, List<? extends Modification>> transactions
= new ConcurrentHashMap<Transaction, List<? extends Modification>>();
- protected void applyModifications(List<? extends Modification> mods) {
+ protected void applyModifications(List<? extends Modification> mods) throws
CacheLoaderException {
for (Modification m : mods) {
switch (m.getType()) {
case STORE:
@@ -40,7 +42,7 @@
}
}
- public void prepare(List<? extends Modification> mods, Transaction tx, boolean
isOnePhase) {
+ public void prepare(List<? extends Modification> mods, Transaction tx, boolean
isOnePhase) throws CacheLoaderException {
if (isOnePhase) {
applyModifications(mods);
} else {
@@ -52,18 +54,32 @@
transactions.remove(tx);
}
- public void commit(Transaction tx) {
+ public void commit(Transaction tx) throws CacheLoaderException {
List<? extends Modification> list = transactions.remove(tx);
if (list != null && !list.isEmpty()) applyModifications(list);
}
- public void removeAll(Set<Object> keys) {
+ public void removeAll(Set<Object> keys) throws CacheLoaderException {
if (keys != null && !keys.isEmpty()) {
for (Object key : keys) remove(key);
}
}
- public boolean containsKey(Object key) {
- return load(key) != null;
+ protected final void safeClose(InputStream stream) throws CacheLoaderException {
+ if (stream == null) return;
+ try {
+ stream.close();
+ } catch (Exception e) {
+ throw new CacheLoaderException("Problems closing input stream", e);
+ }
}
+
+ protected final void safeClose(OutputStream stream) throws CacheLoaderException {
+ if (stream == null) return;
+ try {
+ stream.close();
+ } catch (Exception e) {
+ throw new CacheLoaderException("Problems closing output stream", e);
+ }
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java 2009-02-18
16:06:10 UTC (rev 7723)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -31,21 +31,24 @@
*
* @param key key
* @return an entry
+ * @throws CacheLoaderException in the event of problems reading from source
*/
- StoredEntry load(Object key);
+ StoredEntry load(Object key) throws CacheLoaderException;
/**
* Loads all entries in the loader. Expired entries are not returned.
*
* @return a set of entries, or an empty set if the loader is emptied.
+ * @throws CacheLoaderException in the event of problems reading from source
*/
- Set<StoredEntry> loadAll();
+ Set<StoredEntry> loadAll() throws CacheLoaderException;
/**
* @param key key to test
* @return true if the key exists, false otherwise
+ * @throws CacheLoaderException in the event of problems reading from source
*/
- boolean containsKey(Object key);
+ boolean containsKey(Object key) throws CacheLoaderException;
/**
Added: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderException.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderException.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderException.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -0,0 +1,24 @@
+package org.horizon.loader;
+
+/**
+ * An exception thrown by a {@link CacheLoader} implementation if there are problems
reading from a loader.
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public class CacheLoaderException extends Exception {
+ public CacheLoaderException() {
+ }
+
+ public CacheLoaderException(String message) {
+ super(message);
+ }
+
+ public CacheLoaderException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CacheLoaderException(Throwable cause) {
+ super(cause);
+ }
+}
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -53,7 +53,11 @@
public void purge() {
CacheStore cs = getCacheStore();
- if (cs != null) cs.clear();
+ if (cs != null) try {
+ cs.clear();
+ } catch (CacheLoaderException e) {
+ throw new CacheException("Unable to purge cache store", e);
+ }
}
private void purgeLoaders(boolean force) throws Exception {
@@ -106,7 +110,12 @@
log.debug("Preloading transient state from cache loader {0}",
loader);
long start = 0, stop = 0, total = 0;
if (log.isDebugEnabled()) start = System.currentTimeMillis();
- Set<StoredEntry> state = loader.loadAll();
+ Set<StoredEntry> state;
+ try {
+ state = loader.loadAll();
+ } catch (CacheLoaderException e) {
+ throw new CacheException("Unable to preload!", e);
+ }
for (StoredEntry se : state)
cache.getAdvancedCache().put(se.getKey(), se.getValue(),
se.getLifespan(),
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-18
16:06:10 UTC (rev 7723)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -3,7 +3,6 @@
import org.horizon.loader.modifications.Modification;
import javax.transaction.Transaction;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
@@ -21,8 +20,9 @@
* Stores an entry
*
* @param ed entry to store
+ * @throws CacheLoaderException in the event of problems writing to the store
*/
- void store(StoredEntry ed);
+ void store(StoredEntry ed) throws CacheLoaderException;
/**
* Writes contents of the stream to the store. Implementations should expect that the
stream contains data in an
@@ -31,10 +31,9 @@
* dealing with the stream to make use of efficient marshalling.
*
* @param inputStream stream to read from
- * @throws java.io.IOException in case of IO problems
- * @throws ClassNotFoundException in case of not being able to read the stream
+ * @throws CacheLoaderException in the event of problems writing to the store
*/
- void store(InputStream inputStream) throws IOException, ClassNotFoundException;
+ void store(InputStream inputStream) throws CacheLoaderException;
/**
* Loads the entire state into a stream, using whichever format is most efficient for
the cache loader
@@ -44,34 +43,40 @@
* org.horizon.marshall.Marshaller} when dealing with the stream to make use of
efficient marshalling.
*
* @param outputStream stream to write to
- * @throws java.io.IOException in the event of problems writing to the stream
+ * @throws CacheLoaderException in the event of problems reading from the store
*/
- void load(OutputStream outputStream) throws IOException;
+ void load(OutputStream outputStream) throws CacheLoaderException;
/**
* Clears all entries in the store
+ *
+ * @throws CacheLoaderException in the event of problems writing to the store
*/
- void clear();
+ void clear() throws CacheLoaderException;
/**
* Removes an entry in the store.
*
* @param key key to remove
* @return true if the entry was removed; false if the entry wasn't found.
+ * @throws CacheLoaderException in the event of problems writing to the store
*/
- boolean remove(Object key);
+ boolean remove(Object key) throws CacheLoaderException;
/**
* Bulk remove operation
*
* @param keys to remove
+ * @throws CacheLoaderException in the event of problems writing to the store
*/
- void removeAll(Set<Object> keys);
+ void removeAll(Set<Object> keys) throws CacheLoaderException;
/**
* Purges expired entries from the store.
+ *
+ * @throws CacheLoaderException in the event of problems writing to the store
*/
- void purgeExpired();
+ void purgeExpired() throws CacheLoaderException;
/**
* Issues a prepare call with a set of modifications to be applied to the cache store
@@ -80,15 +85,17 @@
* @param tx transaction identifier
* @param isOnePhase if true, there will not be a commit or rollback phase and
changes should be flushed
* immediately
+ * @throws CacheLoaderException in the event of problems writing to the store
*/
- void prepare(List<? extends Modification> modifications, Transaction tx, boolean
isOnePhase);
+ void prepare(List<? extends Modification> modifications, Transaction tx, boolean
isOnePhase) throws CacheLoaderException;
/**
* Commits a transaction that has been previously prepared
*
* @param tx tx to commit
+ * @throws CacheLoaderException in the event of problems writing to the store
*/
- void commit(Transaction tx);
+ void commit(Transaction tx) throws CacheLoaderException;
/**
* Rolls back a transaction that has been previously prepared
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -3,13 +3,13 @@
import org.horizon.Cache;
import org.horizon.loader.AbstractCacheStore;
import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
import org.horizon.loader.modifications.Modification;
import org.horizon.marshall.Marshaller;
import javax.transaction.Transaction;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
@@ -38,39 +38,42 @@
return delegate;
}
- public void store(StoredEntry ed) {
+ public void store(StoredEntry ed) throws CacheLoaderException {
delegate.store(ed);
}
- public void store(InputStream inputStream) throws IOException, ClassNotFoundException
{
+ public void store(InputStream inputStream) throws CacheLoaderException {
delegate.store(inputStream);
}
- public void load(OutputStream outputStream) throws IOException {
+ public void load(OutputStream outputStream) throws CacheLoaderException {
delegate.load(outputStream);
}
- public void clear() {
+ public void clear() throws CacheLoaderException {
delegate.clear();
}
- public boolean remove(Object key) {
+ public boolean remove(Object key) throws CacheLoaderException {
return delegate.remove(key);
}
- public void purgeExpired() {
+ public void purgeExpired() throws CacheLoaderException {
delegate.purgeExpired();
}
- public void commit(Transaction tx) {
+ @Override
+ public void commit(Transaction tx) throws CacheLoaderException {
delegate.commit(tx);
}
+ @Override
public void rollback(Transaction tx) {
delegate.rollback(tx);
}
- public void prepare(List<? extends Modification> list, Transaction tx, boolean
isOnePhase) {
+ @Override
+ public void prepare(List<? extends Modification> list, Transaction tx, boolean
isOnePhase) throws CacheLoaderException {
delegate.prepare(list, tx, isOnePhase);
}
@@ -78,19 +81,20 @@
delegate.init(config, cache, m);
}
- public StoredEntry load(Object key) {
+ public StoredEntry load(Object key) throws CacheLoaderException {
return delegate.load(key);
}
- public Set loadAll() {
+ public Set<StoredEntry> loadAll() throws CacheLoaderException {
return delegate.loadAll();
}
- public boolean containsKey(Object key) {
+ @Override
+ public boolean containsKey(Object key) throws CacheLoaderException {
return delegate.containsKey(key);
}
- public Class getConfigurationClass() {
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
return delegate.getConfigurationClass();
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStore.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStore.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -1,6 +1,7 @@
package org.horizon.loader.decorators;
import org.horizon.CacheException;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
import org.horizon.loader.modifications.Clear;
@@ -127,7 +128,7 @@
super.stop();
}
- protected void applyModificationsSync(List<Modification> mods) {
+ protected void applyModificationsSync(List<Modification> mods) throws
CacheLoaderException {
for (Modification m : mods) {
switch (m.getType()) {
case STORE:
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -3,13 +3,13 @@
import org.horizon.Cache;
import org.horizon.loader.CacheLoader;
import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
import org.horizon.loader.modifications.Modification;
import org.horizon.marshall.Marshaller;
import javax.transaction.Transaction;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
@@ -35,11 +35,11 @@
LinkedHashMap<CacheLoader, CacheLoaderConfig> loaders = new
LinkedHashMap<CacheLoader, CacheLoaderConfig>();
LinkedHashMap<CacheStore, CacheLoaderConfig> stores = new
LinkedHashMap<CacheStore, CacheLoaderConfig>();
- public void store(StoredEntry ed) {
+ public void store(StoredEntry ed) throws CacheLoaderException {
for (CacheStore s : stores.keySet()) s.store(ed);
}
- public void store(InputStream inputStream) throws IOException, ClassNotFoundException
{
+ public void store(InputStream inputStream) throws CacheLoaderException {
// loading and storing state via streams is *only* supported on the *first* store
that has fetchPersistentState set.
for (Map.Entry<CacheStore, CacheLoaderConfig> e : stores.entrySet()) {
if (e.getValue().isFetchPersistentState()) {
@@ -50,7 +50,7 @@
}
}
- public void load(OutputStream outputStream) throws IOException {
+ public void load(OutputStream outputStream) throws CacheLoaderException {
// loading and storing state via streams is *only* supported on the *first* store
that has fetchPersistentState set.
for (Map.Entry<CacheStore, CacheLoaderConfig> e : stores.entrySet()) {
if (e.getValue().isFetchPersistentState()) {
@@ -61,25 +61,25 @@
}
}
- public void clear() {
+ public void clear() throws CacheLoaderException {
for (CacheStore s : stores.keySet()) s.clear();
}
- public boolean remove(Object key) {
+ public boolean remove(Object key) throws CacheLoaderException {
boolean r = false;
for (CacheStore s : stores.keySet()) r = s.remove(key) || r;
return r;
}
- public void removeAll(Set<Object> keys) {
+ public void removeAll(Set<Object> keys) throws CacheLoaderException {
for (CacheStore s : stores.keySet()) s.removeAll(keys);
}
- public void purgeExpired() {
+ public void purgeExpired() throws CacheLoaderException {
for (CacheStore s : stores.keySet()) s.purgeExpired();
}
- public void commit(Transaction tx) {
+ public void commit(Transaction tx) throws CacheLoaderException {
for (CacheStore s : stores.keySet()) s.commit(tx);
}
@@ -87,7 +87,7 @@
for (CacheStore s : stores.keySet()) s.rollback(tx);
}
- public void prepare(List<? extends Modification> list, Transaction tx, boolean
isOnePhase) {
+ public void prepare(List<? extends Modification> list, Transaction tx, boolean
isOnePhase) throws CacheLoaderException {
for (CacheStore s : stores.keySet()) s.prepare(list, tx, isOnePhase);
}
@@ -97,7 +97,7 @@
}
}
- public StoredEntry load(Object key) {
+ public StoredEntry load(Object key) throws CacheLoaderException {
StoredEntry se = null;
for (CacheLoader l : loaders.keySet()) {
se = l.load(key);
@@ -106,13 +106,13 @@
return se;
}
- public Set<StoredEntry> loadAll() {
+ public Set<StoredEntry> loadAll() throws CacheLoaderException {
Set<StoredEntry> set = new HashSet<StoredEntry>();
for (CacheStore s : stores.keySet()) set.addAll(s.loadAll());
return set;
}
- public boolean containsKey(Object key) {
+ public boolean containsKey(Object key) throws CacheLoaderException {
for (CacheLoader l : loaders.keySet()) {
if (l.containsKey(key)) return true;
}
@@ -136,7 +136,7 @@
if (loader instanceof CacheStore) stores.put((CacheStore) loader, config);
}
- public void purgeIfNecessary() {
+ public void purgeIfNecessary() throws CacheLoaderException {
for (Map.Entry<CacheStore, CacheLoaderConfig> e : stores.entrySet()) {
if (e.getValue().isPurgeOnStartup()) e.getKey().clear();
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -2,6 +2,7 @@
import org.horizon.Cache;
import org.horizon.container.DataContainer;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
import org.horizon.loader.modifications.Modification;
@@ -16,7 +17,6 @@
import org.horizon.remoting.transport.Address;
import javax.transaction.Transaction;
-import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
@@ -104,7 +104,7 @@
// only delegate if the current instance is active
@Override
- public void store(StoredEntry ed) {
+ public void store(StoredEntry ed) throws CacheLoaderException {
if (active) {
if (trace) log.trace("Storing key {0}. Instance: {1}", ed.getKey(),
this);
super.store(ed);
@@ -112,27 +112,27 @@
}
@Override
- public void store(InputStream inputStream) throws IOException, ClassNotFoundException
{
+ public void store(InputStream inputStream) throws CacheLoaderException {
if (active) super.store(inputStream);
}
@Override
- public void clear() {
+ public void clear() throws CacheLoaderException {
if (active) super.clear();
}
@Override
- public boolean remove(Object key) {
+ public boolean remove(Object key) throws CacheLoaderException {
return active && super.remove(key);
}
@Override
- public void purgeExpired() {
+ public void purgeExpired() throws CacheLoaderException {
if (active) super.purgeExpired();
}
@Override
- public void commit(Transaction tx) {
+ public void commit(Transaction tx) throws CacheLoaderException {
if (active) super.commit(tx);
}
@@ -142,7 +142,7 @@
}
@Override
- public void prepare(List<? extends Modification> list, Transaction tx, boolean
isOnePhase) {
+ public void prepare(List<? extends Modification> list, Transaction tx, boolean
isOnePhase) throws CacheLoaderException {
if (active) super.prepare(list, tx, isOnePhase);
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -1,41 +1,68 @@
package org.horizon.loader.file;
import org.horizon.Cache;
-import org.horizon.CacheException;
+import org.horizon.config.ConfigurationException;
import org.horizon.loader.AbstractCacheStore;
import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.StoredEntry;
+import org.horizon.lock.StripedLock;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
+import org.horizon.util.concurrent.WithinThreadExecutor;
import java.io.*;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* A filesystem-based implementation of a {@link org.horizon.loader.CacheStore}. This
file store stores stuff in the
- * following format: <tt>/location/cache name/bucket.dat</tt>
+ * following format: <tt>/{location}/cache name/bucket_number.bucket</tt>
* <p/>
* A hashing algorithm is used to map keys to buckets, and a bucket consists of a
collection of key/value pairs.
* <p/>
* This approach, while adding an overhead of having to search buckets for keys, means
that we can use any serializable
- * object we like as keys and not just Strings or objects that translate to something
meaningful on a file system.
+ * object we like as keys and not just Strings or objects that translate to something
meaningful on a file system. Also,
+ * the implementation uses up to {@link Integer#MAX_VALUE} bucket files, which makes it
very unlikely that cached
+ * entries would have to share buckets provided the {@link Object#hashCode()}
implementations of keys used is well
+ * spread.
+ * <p/>
+ * Locking is based on a {@link org.horizon.lock.StripedLock}, and granularity is
per-bucket to prevent file system
+ * corruption with concurrent writes. You can tune the concurrency level of the striped
lock (see the Javadocs of
+ * StripedLock for details on what this is) by using the {@link
org.horizon.loader.file.FileCacheStoreConfig#setLockConcurrencyLevel(int)}
+ * setter.
+ * <p/>
*
* @author Manik Surtani
* @since 1.0
*/
public class FileCacheStore extends AbstractCacheStore {
private static final Log log = LogFactory.getLog(FileCacheStore.class);
- // TODO: make bucket size fixed rather than number of buckets, and support resizes
+ // doesn't matter that we have such a large amount of file buckets since they are
created on disk on demand and
+ // take up no extra memory
private static final int NUM_BUCKETS = Integer.MAX_VALUE;
+ private int streamBufferSize;
+ ExecutorService purgerService;
+ StripedLock bucketLocks;
+ // This global lock guards against direct file system access via clear() and the
stream APIs. These three methods
+ // would need exclusive (write) access to this lock while all others can use shared
(read) access to this lock since
+ // other methods will use finer grained bucket locks.
+ final ReentrantReadWriteLock globalLock = new ReentrantReadWriteLock();
- // TODO use read and write locks on buckets!
-
FileCacheStoreConfig cfg;
Cache cache;
Marshaller m;
File root;
+ long lockTimeout;
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
this.cfg = (FileCacheStoreConfig) config;
@@ -43,41 +70,58 @@
this.m = m;
}
- public StoredEntry load(Object key) {
- String bucketName = getBucketName(key);
- File f = new File(root, bucketName);
- if (f.exists()) {
- FileInputStream is = null;
- ObjectInputStream ois = null;
- try {
- is = new FileInputStream(f);
- ois = new ObjectInputStream(is);
- Map data = (Map) ois.readObject();
- if (data.containsKey(key)) return (StoredEntry) data.get(key);
- } catch (FileNotFoundException e) {
- e.printStackTrace(); // TODO: Manik: Customise this generated block
- } catch (IOException e) {
- e.printStackTrace(); // TODO: Manik: Customise this generated block
- } catch (ClassNotFoundException e) {
- e.printStackTrace(); // TODO: Manik: Customise this generated block
- } finally {
- if (ois != null) try {
- ois.close();
- } catch (IOException e) {
- e.printStackTrace(); // TODO: Manik: Customise this generated block
- }
- if (is != null) try {
- is.close();
- } catch (IOException e) {
- e.printStackTrace(); // TODO: Manik: Customise this generated block
- }
+ public StoredEntry load(Object key) throws CacheLoaderException {
+ log.trace("Loading entry {0}", key);
+ Bucket b = null;
+ try {
+ b = lockAndGetBucket(key, false, false);
+ if (b == null) return null;
+ StoredEntry se = b.getEntry(key);
+ if (se != null && se.isExpired()) {
+ b.removeEntry(key);
+ saveBucket(b);
+ return null;
+ } else {
+ return se;
}
+ } catch (Exception e) {
+ CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
+ new CacheLoaderException("Problems loading key " + key, e);
+ throw cle;
+ } finally {
+ unlockBucket(b);
}
- return null;
}
- public Set<StoredEntry> loadAll() {
- return null; // TODO: Manik: Customise this generated block
+ public Set<StoredEntry> loadAll() throws CacheLoaderException {
+ log.trace("Loading all entries");
+ Set<StoredEntry> set = new HashSet<StoredEntry>();
+ try {
+ for (File f : root.listFiles()) {
+ Bucket b = null;
+ try {
+ b = lockAndGetBucket(f, false, false);
+ if (b != null) {
+ Set<Object> expiredOnBucket = new HashSet<Object>();
+ for (StoredEntry e : b.entries.values()) {
+ if (e.isExpired())
+ expiredOnBucket.add(e.getKey());
+ else
+ set.add(e);
+ }
+ for (Object expired : expiredOnBucket) b.removeEntry(expired);
+ saveBucket(b);
+ }
+ } finally {
+ unlockBucket(b);
+ }
+ }
+ } catch (Exception e) {
+ CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
+ new CacheLoaderException("Problems loading keys", e);
+ throw cle;
+ }
+ return set;
}
public Class<? extends CacheLoaderConfig> getConfigurationClass() {
@@ -91,67 +135,176 @@
root = new File(location);
if (!root.exists()) {
if (!root.mkdirs())
- throw new CacheException("Directory " + root.getAbsolutePath() +
" does not exist and cannot be created!");
+ throw new ConfigurationException("Directory " +
root.getAbsolutePath() + " does not exist and cannot be created!");
}
+
+ if (cfg.isPurgeSynchronously()) {
+ purgerService = new WithinThreadExecutor();
+ } else {
+ purgerService = Executors.newSingleThreadExecutor();
+ }
+
+ streamBufferSize = cfg.getStreamBufferSize();
+ int lockConcurrencyLevel = cfg.getLockConcurrencyLevel();
+ bucketLocks = new StripedLock(lockConcurrencyLevel);
+ lockTimeout = cfg.getLockAcquistionTimeout();
}
public void stop() {
+ purgerService.shutdownNow();
}
- public void store(StoredEntry ed) {
- // TODO: Manik: Customise this generated block
+ public void store(StoredEntry ed) throws CacheLoaderException {
+ if (ed == null) return;
+ if (ed.isExpired()) {
+ log.trace("Entry {0} is expired! Not doing anything.", ed);
+ return;
+ }
+
+ log.trace("Storing entry {0}", ed);
+
+ Bucket b = null;
+ try {
+ b = lockAndGetBucket(ed.getKey(), true, true);
+ b.addEntry(ed);
+ saveBucket(b);
+ } catch (Exception e) {
+ CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
+ new CacheLoaderException("Problems storing entry with key " +
ed.getKey(), e);
+ throw cle;
+ } finally {
+ unlockBucket(b);
+ }
}
- public void store(InputStream inputStream) throws IOException, ClassNotFoundException
{
- clear();
- // TODO: buffer streams!
- // TODO: close streams in a finally block, and close Object stream if an object
stream was created
- ObjectInputStream ois =
- (inputStream instanceof ObjectInputStream) ? (ObjectInputStream) inputStream
:
- new ObjectInputStream(inputStream);
- int numFiles = ois.readInt();
- for (int i = 0; i < numFiles; i++) {
- String fName = (String) ois.readObject();
- int numBytes = ois.readInt();
- FileOutputStream fos = new FileOutputStream(root.getAbsolutePath() +
File.separator + fName);
- for (int bytes = 0; bytes < numBytes; bytes++) fos.write(ois.read());
- fos.close();
+ public void store(InputStream inputStream) throws CacheLoaderException {
+ ObjectInputStream ois = null;
+ try {
+ // first clear all local state
+ acquireGlobalLock(true);
+ clear();
+ ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream)
inputStream :
+ new ObjectInputStream(inputStream);
+ int numFiles = ois.readInt();
+ byte[] buffer = new byte[streamBufferSize];
+ int bytesRead, totalBytesRead = 0;
+ for (int i = 0; i < numFiles; i++) {
+ String fName = (String) ois.readObject();
+ int numBytes = ois.readInt();
+ FileOutputStream fos = new FileOutputStream(root.getAbsolutePath() +
File.separator + fName);
+ BufferedOutputStream bos = new BufferedOutputStream(fos, streamBufferSize);
+
+ while (numBytes > totalBytesRead) {
+ bytesRead = ois.read(buffer, 0, streamBufferSize);
+ if (bytesRead == -1) break;
+ totalBytesRead += bytesRead;
+ bos.write(buffer, 0, bytesRead);
+ }
+ bos.flush();
+ bos.close();
+ fos.flush();
+ fos.close();
+ totalBytesRead = 0;
+ }
}
+ catch (Exception e) {
+ CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
+ new CacheLoaderException("Problems reading from stream", e);
+ throw cle;
+ }
+ finally {
+ releaseGlobalLock();
+ // we should close the stream we created!
+ if (inputStream != ois) safeClose(ois);
+ }
}
- public void load(OutputStream outputStream) throws IOException {
- // TODO: buffer streams!
- // TODO: close streams in a finally block, and close Object stream if an object
stream was created
- ObjectOutputStream oos = (outputStream instanceof ObjectOutputStream) ?
(ObjectOutputStream) outputStream :
- new ObjectOutputStream(outputStream);
- File[] files = root.listFiles();
- oos.writeInt(files.length);
- for (int i = 0; i < files.length; i++) {
- FileInputStream is = new FileInputStream(files[i]);
- int sz = is.available();
- oos.writeObject(files[i].getName());
- oos.writeInt(sz);
- for (int bytes = 0; bytes < sz; bytes++) oos.write(is.read());
- is.close();
+ public void load(OutputStream outputStream) throws CacheLoaderException {
+ ObjectOutputStream oos = null;
+ try {
+ acquireGlobalLock(true);
+ oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream)
outputStream :
+ new ObjectOutputStream(outputStream);
+
+ File[] files = root.listFiles();
+ oos.writeInt(files.length);
+ byte[] buffer = new byte[streamBufferSize];
+ int bytesRead, totalBytesRead = 0;
+ for (File file : files) {
+ FileInputStream is = new FileInputStream(file);
+ int sz = is.available();
+ BufferedInputStream bis = new BufferedInputStream(is);
+ oos.writeObject(file.getName());
+ oos.writeInt(sz);
+
+ while (sz > totalBytesRead) {
+ bytesRead = bis.read(buffer, 0, streamBufferSize);
+ if (bytesRead == -1) break;
+ totalBytesRead += bytesRead;
+ oos.write(buffer, 0, bytesRead);
+ }
+ bis.close();
+ is.close();
+ }
+ } catch (Exception ioe) {
+ throw new CacheLoaderException("Problems handling stream", ioe);
+ } finally {
+ releaseGlobalLock();
+ // we should close the stream we created!
+ if (oos != outputStream) safeClose(oos);
}
}
- public void clear() {
- for (File f : root.listFiles()) f.delete();
+ public void clear() throws CacheLoaderException {
+ log.trace("Clearing store");
+ try {
+ acquireGlobalLock(true);
+ for (File f : root.listFiles()) {
+ if (!f.delete()) log.warn("Had problems removing file {0}", f);
+ }
+ } catch (Exception e) {
+ throw new CacheLoaderException("Problems clearing cache store", e);
+ } finally {
+ releaseGlobalLock();
+ }
}
- public boolean remove(Object key) {
- return false; // TODO: Manik: Customise this generated block
+ public boolean remove(Object key) throws CacheLoaderException {
+ log.trace("Removing key {0}", key);
+ Bucket b = null;
+ try {
+ b = lockAndGetBucket(key, false, true);
+ if (b == null) {
+ return false;
+ } else {
+ boolean success = b.removeEntry(key);
+ if (success) saveBucket(b);
+ return success;
+ }
+ } catch (Exception e) {
+ CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
+ new CacheLoaderException("Problems removing key " + key, e);
+ throw cle;
+ } finally {
+ unlockBucket(b);
+ }
}
public void purgeExpired() {
- // TODO: Manik: Customise this generated block
+ purgerService.execute(new Runnable() {
+ public void run() {
+ try {
+ loadAll();
+ } catch (CacheLoaderException e) {
+ log.info("Problems encountered while purging expired", e);
+ }
+ }
+ });
}
- private String getBucketName(Object key) {
- int bucketNumber = index(hash(key));
- return bucketNumber + ".dat";
- }
+ //
------------------------------------------------------------------------------------------------------------------
+ // Buckets and bucket manipulators
+ //
------------------------------------------------------------------------------------------------------------------
private int hash(Object key) {
int h = key.hashCode();
@@ -163,24 +316,119 @@
return h & (NUM_BUCKETS - 1);
}
- private Map<Object, StoredEntry> deserialize(Object key, boolean createIfNeeded)
{
- return null;
+ final Bucket lockAndGetBucket(Object key, boolean create, boolean exclusiveLock)
throws IOException, ClassNotFoundException, TimeoutException, InterruptedException {
+ int bucketNumber = index(hash(key));
+ File bucket = new File(root, bucketNumber + ".bucket");
+ return lockAndGetBucket(bucket, create, exclusiveLock);
}
- private void serialize(Map<Object, StoredEntry> map) {
+ final Bucket lockAndGetBucket(File f, boolean create, boolean exclusiveLock) throws
IOException, ClassNotFoundException, TimeoutException, InterruptedException {
+ Bucket b = null;
+ String bucketName = f.getName();
+ // first get a shared lock on the global lock to make sure no state tfr is going
on
+ acquireGlobalLock(false);
+ bucketLocks.acquireLock(bucketName, exclusiveLock);
+ if (f.exists()) {
+ FileInputStream is = new FileInputStream(f);
+ ObjectInputStream ois = new ObjectInputStream(is);
+ b = (Bucket) ois.readObject();
+ ois.close();
+ is.close();
+ } else if (create) {
+ b = new Bucket();
+ b.entries = new HashMap<Object, StoredEntry>();
+ }
+ if (b == null) {
+ bucketLocks.releaseLock(bucketName); // don't bother holding locks for null
buckets
+ releaseGlobalLock();
+ } else b.fileName = bucketName;
+
+ return b;
}
- private static class Bucket implements Externalizable {
+ final void unlockBucket(Bucket b) {
+ if (b != null) bucketLocks.releaseLock(b.fileName);
+ releaseGlobalLock();
+ }
+
+ final void acquireGlobalLock(boolean exclusive) throws TimeoutException,
InterruptedException {
+ Lock l = exclusive ? globalLock.writeLock() : globalLock.readLock();
+ if (!l.tryLock(lockTimeout, TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Timed out trying to acquire " + (exclusive
? "exclusive" : "shared") + " global lock after " +
lockTimeout + " millis. Lock is " + l);
+ }
+
+ final void releaseGlobalLock() {
+ try {
+ globalLock.readLock().unlock();
+ } catch (IllegalMonitorStateException imse) {
+ // no op
+ }
+ }
+
+ final void saveBucket(Bucket b) throws IOException, CacheLoaderException {
+ if (b.modified) {
+ File f = new File(root, b.fileName);
+ if (f.exists()) {
+ if (!f.delete()) log.warn("Had problems removing file {0}", f);
+ }
+
+ if (!b.entries.isEmpty()) {
+ FileOutputStream fos = null;
+ ObjectOutputStream oos = null;
+ try {
+ fos = new FileOutputStream(f);
+ oos = new ObjectOutputStream(fos);
+ oos.writeObject(b);
+ oos.flush();
+ fos.flush();
+ } finally {
+ safeClose(oos);
+ safeClose(fos);
+ }
+ }
+
+ b.modified = false; // reset this
+ }
+ }
+
+ /**
+ * A bucket is where entries are stored.
+ */
+ public final static class Bucket implements Externalizable {
Map<Object, StoredEntry> entries;
- String bucketName;
+ transient String fileName;
+ transient boolean modified = false;
- public void writeExternal(ObjectOutput out) throws IOException {
- // TODO: Manik: Customise this generated block
+ final void addEntry(StoredEntry se) {
+ entries.put(se.getKey(), se);
+ modified = true;
}
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
- // TODO: Manik: Customise this generated block
+ final boolean removeEntry(Object key) {
+ if (entries.remove(key) != null) {
+ modified = true;
+ return true;
+ }
+ return false;
}
+
+ final StoredEntry getEntry(Object key) {
+ return entries.get(key);
+ }
+
+ public final void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(entries.size());
+ for (StoredEntry se : entries.values()) out.writeObject(se);
+ }
+
+ public final void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ int sz = in.readInt();
+ entries = new HashMap<Object, StoredEntry>(sz);
+ for (int i = 0; i < sz; i++) {
+ StoredEntry se = (StoredEntry) in.readObject();
+ entries.put(se.getKey(), se);
+ }
+ }
}
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -3,13 +3,29 @@
import org.horizon.loader.AbstractCacheLoaderConfig;
/**
- * Configures {@link org.horizon.loader.file.FileCacheStore}
+ * Configures {@link org.horizon.loader.file.FileCacheStore}. This allows you to tune a
number of characteristics of
+ * the {@link FileCacheStore}.
+ * <p/>
+ * <ul> <li><tt>location</tt> - a location on disk where the
store can write internal files. This defaults to
+ * <tt>Horizon-FileCacheStore</tt> in the current working
directory.</li> <li><tt>purgeSynchronously</tt> - whether
+ * {@link org.horizon.loader.CacheStore#purgeExpired()} calls happen synchronously or
not. By default, this is set to
+ * <tt>false</tt>.</li> <li><tt>streamBufferSize</tt>
- when writing state to disk, a buffered stream is used. This
+ * parameter allows you to tune the buffer size. Larger buffers are usually faster but
take up more (temporary) memory,
+ * resulting in more gc. By default, this is set to <tt>8192</tt>.</li>
<li><tt>lockConcurrencyLevel</tt> - locking
+ * granularity is per file bucket. This setting defines the number of shared locks to
use. The more locks you have,
+ * the better your concurrency will be, but more locks take up more memory. By default,
this is set to
+ * <tt>2048</tt>.</li>
<li><tt>lockAcquistionTimeout</tt> - the length of time, in
milliseconds, to wait for locks
+ * before timing out and throwing an exception. By default, this is set to
<tt>60000</tt>.</li> </ul>
*
* @author Manik Surtani
* @since 1.0
*/
public class FileCacheStoreConfig extends AbstractCacheLoaderConfig {
- String location;
+ String location = "Horizon-FileCacheStore";
+ private boolean purgeSynchronously = false;
+ private int streamBufferSize = 8192;
+ private int lockConcurrencyLevel = 2048;
+ private long lockAcquistionTimeout = 60000;
public FileCacheStoreConfig() {
setClassName(FileCacheStore.class.getName());
@@ -23,4 +39,40 @@
testImmutability("location");
this.location = location;
}
+
+ public boolean isPurgeSynchronously() {
+ return purgeSynchronously;
+ }
+
+ public void setPurgeSynchronously(boolean purgeSynchronously) {
+ testImmutability("purgeSynchronously");
+ this.purgeSynchronously = purgeSynchronously;
+ }
+
+ public int getStreamBufferSize() {
+ return streamBufferSize;
+ }
+
+ public void setStreamBufferSize(int streamBufferSize) {
+ testImmutability("steamBufferSize");
+ this.streamBufferSize = streamBufferSize;
+ }
+
+ public int getLockConcurrencyLevel() {
+ return lockConcurrencyLevel;
+ }
+
+ public void setLockConcurrencyLevel(int lockConcurrencyLevel) {
+ testImmutability("lockConcurrencyLevel");
+ this.lockConcurrencyLevel = lockConcurrencyLevel;
+ }
+
+ public long getLockAcquistionTimeout() {
+ return lockAcquistionTimeout;
+ }
+
+ public void setLockAcquistionTimeout(long lockAcquistionTimeout) {
+ testImmutability("lockAcquistionTimeout");
+ this.lockAcquistionTimeout = lockAcquistionTimeout;
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JDBCCacheStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JDBCCacheStore.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JDBCCacheStore.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -1,17 +1,14 @@
package org.horizon.loader.jdbc;
import org.horizon.Cache;
+import org.horizon.loader.AbstractCacheStore;
import org.horizon.loader.CacheLoaderConfig;
-import org.horizon.loader.CacheStore;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.StoredEntry;
import org.horizon.marshall.Marshaller;
-import javax.transaction.Transaction;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Collection;
-import java.util.List;
import java.util.Set;
/**
@@ -19,20 +16,16 @@
*
* @author Manik Surtani
*/
-public class JDBCCacheStore implements CacheStore {
+public class JDBCCacheStore extends AbstractCacheStore {
public void store(StoredEntry ed) {
// TODO: Manik: Customise this generated block
}
- public void storeAll(Collection ed) {
- // TODO: Manik: Customise this generated block
- }
-
public void store(InputStream inputStream) {
// TODO: Manik: Customise this generated block
}
- public void load(OutputStream outputStream) throws IOException {
+ public void load(OutputStream outputStream) {
// TODO: Manik: Customise this generated block
}
@@ -44,26 +37,10 @@
return false; // TODO: Manik: Customise this generated block
}
- public void removeAll(Set<Object> keys) {
- // TODO: Manik: Customise this generated block
- }
-
public void purgeExpired() {
// TODO: Manik: Customise this generated block
}
- public void commit(Transaction tx) {
- // TODO: Manik: Customise this generated block
- }
-
- public void rollback(Transaction tx) {
- // TODO: Manik: Customise this generated block
- }
-
- public void prepare(List list, Transaction tx, boolean isOnePhase) {
- // TODO: Manik: Customise this generated block
- }
-
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
// TODO: Manik: Customise this generated block
}
@@ -72,18 +49,10 @@
return null; // TODO: Manik: Customise this generated block
}
- public Set loadAll(Collection keys) {
+ public Set<StoredEntry> loadAll() throws CacheLoaderException {
return null; // TODO: Manik: Customise this generated block
}
- public Set loadAll() {
- return null; // TODO: Manik: Customise this generated block
- }
-
- public boolean containsKey(Object key) {
- return false; // TODO: Manik: Customise this generated block
- }
-
public Class<? extends CacheLoaderConfig> getConfigurationClass() {
return JDBCCacheStoreConfig.class;
}
Modified: core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-02-18
16:06:10 UTC (rev 7723)
+++ core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -19,7 +19,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Random;
import java.util.Set;
@SuppressWarnings("unchecked")
@@ -37,7 +39,7 @@
}
@AfterMethod
- public void tearDown() {
+ public void tearDown() throws CacheLoaderException {
if (cs != null) {
cs.clear();
cs.stop();
@@ -63,7 +65,7 @@
}
- public void testLoadAndStore() throws InterruptedException {
+ public void testLoadAndStore() throws InterruptedException, CacheLoaderException {
assert !cs.containsKey("k");
StoredEntry se = new StoredEntry("k", "v", -1, -1);
cs.store(se);
@@ -93,7 +95,7 @@
assert !cs.containsKey("k");
}
- public void testOnePhaseCommit() {
+ public void testOnePhaseCommit() throws CacheLoaderException {
List<Modification> mods = new ArrayList<Modification>();
mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
@@ -118,7 +120,7 @@
assert cs.containsKey("k3");
}
- public void testTwoPhaseCommit() {
+ public void testTwoPhaseCommit() throws CacheLoaderException {
List<Modification> mods = new ArrayList<Modification>();
mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
mods.add(new Store(new StoredEntry("k2", "v2", -1, -1)));
@@ -155,7 +157,7 @@
assert cs.containsKey("k3");
}
- public void testRollback() {
+ public void testRollback() throws CacheLoaderException {
cs.store(new StoredEntry("old", "old", -1, -1));
@@ -197,7 +199,7 @@
assert cs.containsKey("old");
}
- public void testCommitAndRollbackWithoutPrepare() {
+ public void testCommitAndRollbackWithoutPrepare() throws CacheLoaderException {
cs.store(new StoredEntry("old", "old", -1, -1));
Transaction tx = EasyMock.createNiceMock(Transaction.class);
cs.commit(tx);
@@ -207,7 +209,7 @@
assert cs.containsKey("old");
}
- public void testPreload() {
+ public void testPreload() throws CacheLoaderException {
cs.store(new StoredEntry("k1", "v1", -1, -1));
cs.store(new StoredEntry("k2", "v2", -1, -1));
cs.store(new StoredEntry("k3", "v3", -1, -1));
@@ -223,12 +225,15 @@
assert expected.isEmpty();
}
- public void testPurgeExpired() throws InterruptedException {
+ public void testPurgeExpired() throws InterruptedException, Exception {
long now = System.currentTimeMillis();
long lifespan = 1000;
cs.store(new StoredEntry("k1", "v1", now, now + lifespan));
cs.store(new StoredEntry("k2", "v2", now, now + lifespan));
cs.store(new StoredEntry("k3", "v3", now, now + lifespan));
+ assert cs.containsKey("k1");
+ assert cs.containsKey("k2");
+ assert cs.containsKey("k3");
Thread.sleep(lifespan + 100);
cs.purgeExpired();
assert !cs.containsKey("k1");
@@ -236,7 +241,7 @@
assert !cs.containsKey("k3");
}
- public void testStreamingAPI() throws IOException, ClassNotFoundException {
+ public void testStreamingAPI() throws IOException, ClassNotFoundException,
CacheLoaderException {
cs.store(new StoredEntry("k1", "v1", -1, -1));
cs.store(new StoredEntry("k2", "v2", -1, -1));
cs.store(new StoredEntry("k3", "v3", -1, -1));
@@ -264,4 +269,70 @@
CacheLoaderConfig clc = Util.getInstance(cfgClass);
assert clc.getClassName().equals(cs.getClass().getName()) : "Cache loader
doesn't provide a proper configuration type that is capable of creating the
loader!";
}
+
+ public void testConcurrency() throws Exception {
+ int numThreads = 5;
+ final int loops = 1000;
+ final String[] keys = new String[10];
+ final String[] values = new String[10];
+ for (int i = 0; i < 10; i++) keys[i] = "k" + i;
+ for (int i = 0; i < 10; i++) values[i] = "v" + i;
+
+
+ final Random r = new Random();
+ final List<Exception> exceptions = new LinkedList<Exception>();
+
+ final Runnable store = new Runnable() {
+ public void run() {
+ try {
+ int randomInt = r.nextInt(10);
+ cs.store(new StoredEntry(keys[randomInt], values[randomInt]));
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ };
+
+ final Runnable remove = new Runnable() {
+ public void run() {
+ try {
+ cs.remove(keys[r.nextInt(10)]);
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ };
+
+ final Runnable get = new Runnable() {
+ public void run() {
+ try {
+ int randomInt = r.nextInt(10);
+ StoredEntry se = cs.load(keys[randomInt]);
+ assert se == null || se.getValue().equals(values[randomInt]);
+ cs.loadAll();
+ } catch (Exception e) {
+ exceptions.add(e);
+ }
+ }
+ };
+
+ Thread[] threads = new Thread[numThreads];
+
+ for (int i = 0; i < numThreads; i++) {
+ threads[i] = new Thread() {
+ public void run() {
+ for (int i = 0; i < loops; i++) {
+ store.run();
+ remove.run();
+ get.run();
+ }
+ }
+ };
+ }
+
+ for (Thread t : threads) t.start();
+ for (Thread t : threads) t.join();
+
+ if (!exceptions.isEmpty()) throw exceptions.get(0);
+ }
}
Modified:
core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -53,25 +53,25 @@
}
@AfterMethod
- public void afterMethod() {
+ public void afterMethod() throws CacheLoaderException {
if (cache != null) cache.clear();
if (store != null) store.clear();
}
- private void assertInCacheAndStore(Object key, Object value) {
+ private void assertInCacheAndStore(Object key, Object value) throws
CacheLoaderException {
assertInCacheAndStore(key, value, -1);
}
- private void assertInCacheAndStore(Object key, Object value, long lifespanMillis) {
+ private void assertInCacheAndStore(Object key, Object value, long lifespanMillis)
throws CacheLoaderException {
assertInCacheAndStore(cache, store, key, value, lifespanMillis);
}
- private void assertInCacheAndStore(Cache cache, CacheStore store, Object key, Object
value) {
+ private void assertInCacheAndStore(Cache cache, CacheStore store, Object key, Object
value) throws CacheLoaderException {
assertInCacheAndStore(cache, store, key, value, -1);
}
- private void assertInCacheAndStore(Cache cache, CacheStore store, Object key, Object
value, long lifespanMillis) {
+ private void assertInCacheAndStore(Cache cache, CacheStore store, Object key, Object
value, long lifespanMillis) throws CacheLoaderException {
StoredEntry se =
cache.getAdvancedCache().getDataContainer().createEntryForStorage(key);
testStoredEntry(se, value, lifespanMillis, "Cache", key);
se = store.load(key);
@@ -84,18 +84,18 @@
assert entry.getLifespan() == expectedLifespan : src + " expected lifespan for
key " + key + " to be " + expectedLifespan + " but was " +
entry.getLifespan() + ". Entry is " + entry;
}
- private void assertNotInCacheAndStore(Cache cache, CacheStore store, Object... keys)
{
+ private void assertNotInCacheAndStore(Cache cache, CacheStore store, Object... keys)
throws CacheLoaderException {
for (Object key : keys) {
assert !cache.getAdvancedCache().getDataContainer().containsKey(key) :
"Cache should not contain key " + key;
assert !store.containsKey(key) : "Store should not contain key " +
key;
}
}
- private void assertNotInCacheAndStore(Object... keys) {
+ private void assertNotInCacheAndStore(Object... keys) throws CacheLoaderException {
assertNotInCacheAndStore(cache, store, keys);
}
- public void testStoreAndRetrieve() {
+ public void testStoreAndRetrieve() throws CacheLoaderException {
assertNotInCacheAndStore("k1", "k2", "k3",
"k4", "k5", "k6", "k7");
cache.put("k1", "v1");
@@ -144,7 +144,7 @@
assertNotInCacheAndStore("k1", "k2", "k3",
"k4", "k5", "k6", "k7");
}
- public void testReplaceMethods() {
+ public void testReplaceMethods() throws CacheLoaderException {
assertNotInCacheAndStore("k1", "k2", "k3",
"k4");
cache.replace("k1", "v1-SHOULD-NOT-STORE");
@@ -179,7 +179,7 @@
}
- public void testLoading() {
+ public void testLoading() throws CacheLoaderException {
assertNotInCacheAndStore("k1", "k2", "k3",
"k4");
store.store(new StoredEntry("k1", "v1"));
store.store(new StoredEntry("k2", "v2"));
@@ -204,7 +204,7 @@
assertNotInCacheAndStore("k1", "k2", "k3",
"k4");
}
- public void testPreloading() {
+ public void testPreloading() throws CacheLoaderException {
Configuration preloadingCfg = cfg.clone();
preloadingCfg.getCacheLoaderManagerConfig().setPreload(true);
((DummyInMemoryCacheStore.Cfg)
preloadingCfg.getCacheLoaderManagerConfig().getFirstCacheLoaderConfig()).setStore("preloadingCache");
@@ -246,7 +246,7 @@
}
}
- public void testPurgeOnStartup() {
+ public void testPurgeOnStartup() throws CacheLoaderException {
Configuration purgingCfg = cfg.clone();
purgingCfg.getCacheLoaderManagerConfig().getFirstCacheLoaderConfig().setPurgeOnStartup(true);
((DummyInMemoryCacheStore.Cfg)
purgingCfg.getCacheLoaderManagerConfig().getFirstCacheLoaderConfig()).setStore("purgingCache");
@@ -338,7 +338,7 @@
assertInCacheAndStore("k2", "v2", lifespan);
}
- public void testEvictAndRemove() {
+ public void testEvictAndRemove() throws CacheLoaderException {
assertNotInCacheAndStore("k1", "k2");
cache.put("k1", "v1");
cache.put("k2", "v2", lifespan, MILLISECONDS);
Modified:
core/branches/flat/src/test/java/org/horizon/loader/PassivationFunctionalTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/PassivationFunctionalTest.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/test/java/org/horizon/loader/PassivationFunctionalTest.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -53,26 +53,26 @@
}
@AfterMethod
- public void afterMethod() {
+ public void afterMethod() throws CacheLoaderException {
if (cache != null) cache.clear();
if (store != null) store.clear();
}
- private void assertInCacheNotInStore(Object key, Object value) {
+ private void assertInCacheNotInStore(Object key, Object value) throws
CacheLoaderException {
assertInCacheNotInStore(key, value, -1);
}
- private void assertInCacheNotInStore(Object key, Object value, long lifespanMillis) {
+ private void assertInCacheNotInStore(Object key, Object value, long lifespanMillis)
throws CacheLoaderException {
StoredEntry se =
cache.getAdvancedCache().getDataContainer().createEntryForStorage(key);
testStoredEntry(se, value, lifespanMillis, "Cache", key);
assert !store.containsKey(key) : "Key " + key + " should not be in
store!";
}
- private void assertInStoreNotInCache(Object key, Object value) {
+ private void assertInStoreNotInCache(Object key, Object value) throws
CacheLoaderException {
assertInStoreNotInCache(key, value, -1);
}
- private void assertInStoreNotInCache(Object key, Object value, long lifespanMillis) {
+ private void assertInStoreNotInCache(Object key, Object value, long lifespanMillis)
throws CacheLoaderException {
StoredEntry se = store.load(key);
testStoredEntry(se, value, lifespanMillis, "Store", key);
assert !cache.getAdvancedCache().getDataContainer().containsKey(key) : "Key
" + key + " should not be in cache!";
@@ -85,14 +85,14 @@
assert entry.getLifespan() == expectedLifespan : src + " expected lifespan for
key " + key + " to be " + expectedLifespan + " but was " +
entry.getLifespan() + ". Entry is " + entry;
}
- private void assertNotInCacheAndStore(Object... keys) {
+ private void assertNotInCacheAndStore(Object... keys) throws CacheLoaderException {
for (Object key : keys) {
assert !cache.getAdvancedCache().getDataContainer().containsKey(key) :
"Cache should not contain key " + key;
assert !store.containsKey(key) : "Store should not contain key " +
key;
}
}
- public void testPassivate() {
+ public void testPassivate() throws CacheLoaderException {
assertNotInCacheAndStore("k1", "k2");
cache.put("k1", "v1");
@@ -122,7 +122,7 @@
assertInStoreNotInCache("k2", "v2", lifespan);
}
- public void testRemoveAndReplace() {
+ public void testRemoveAndReplace() throws CacheLoaderException {
assertNotInCacheAndStore("k1", "k2");
cache.put("k1", "v1");
@@ -221,7 +221,7 @@
assertInStoreNotInCache("k2", "v2", lifespan);
}
- public void testPutMap() {
+ public void testPutMap() throws CacheLoaderException {
assertNotInCacheAndStore("k1", "k2", "k3");
cache.put("k1", "v1");
cache.put("k2", "v2");
Modified:
core/branches/flat/src/test/java/org/horizon/loader/decorators/ChainingCacheLoaderTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/decorators/ChainingCacheLoaderTest.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/test/java/org/horizon/loader/decorators/ChainingCacheLoaderTest.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -3,6 +3,7 @@
import org.easymock.EasyMock;
import org.horizon.loader.BaseCacheStoreTest;
import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
import org.horizon.loader.dummy.DummyInMemoryCacheStore;
@@ -227,7 +228,7 @@
}
- public void testPropagatingStreams() throws IOException, ClassNotFoundException {
+ public void testPropagatingStreams() throws IOException, ClassNotFoundException,
CacheLoaderException {
store2.store(new StoredEntry("k1", "v1"));
store2.store(new StoredEntry("k2", "v2", lifespan));
Modified:
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -1,6 +1,7 @@
package org.horizon.loader.decorators;
import static org.easymock.EasyMock.*;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
import org.testng.annotations.Test;
@@ -9,7 +10,7 @@
@Test(groups = "unit", testName =
"loader.decorators.ReadOnlyCacheStoreTest")
public class ReadOnlyCacheStoreTest {
- public void testWriteMethods() {
+ public void testWriteMethods() throws CacheLoaderException {
CacheStore mock = createMock(CacheStore.class);
ReadOnlyStore store = new ReadOnlyStore(mock);
StoredEntry mockEntry = new StoredEntry();
Modified:
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -5,6 +5,7 @@
import org.horizon.Cache;
import org.horizon.config.CacheLoaderManagerConfig;
import org.horizon.config.Configuration;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheLoaderManager;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
@@ -98,7 +99,7 @@
return stores;
}
- private Object load(CacheStore cs, Object key) {
+ private Object load(CacheStore cs, Object key) throws CacheLoaderException {
StoredEntry se = cs.load(key);
return se == null ? null : se.getValue();
}
Modified:
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -4,13 +4,13 @@
import org.horizon.loader.AbstractCacheLoaderConfig;
import org.horizon.loader.AbstractCacheStore;
import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.StoredEntry;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
import org.horizon.marshall.ObjectStreamMarshaller;
-import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -36,24 +36,31 @@
}
@SuppressWarnings("unchecked")
- public void store(InputStream inputStream) throws IOException, ClassNotFoundException
{
- ObjectInputStream ois = inputStream instanceof ObjectInputStream ?
(ObjectInputStream) inputStream :
- new ObjectInputStream(inputStream);
+ public void store(InputStream inputStream) throws CacheLoaderException {
+ try {
+ ObjectInputStream ois = inputStream instanceof ObjectInputStream ?
(ObjectInputStream) inputStream :
+ new ObjectInputStream(inputStream);
- int numEntries = (Integer) marshaller.objectFromObjectStream(ois);
- store.clear();
- for (int i = 0; i < numEntries; i++) {
- StoredEntry e = (StoredEntry) marshaller.objectFromObjectStream(ois);
- store.put(e.getKey(), e);
+ int numEntries = (Integer) marshaller.objectFromObjectStream(ois);
+ store.clear();
+ for (int i = 0; i < numEntries; i++) {
+ StoredEntry e = (StoredEntry) marshaller.objectFromObjectStream(ois);
+ store.put(e.getKey(), e);
+ }
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
}
}
- public void load(OutputStream outputStream) throws IOException {
-
- ObjectOutputStream oos = outputStream instanceof ObjectOutputStream ?
(ObjectOutputStream) outputStream :
- new ObjectOutputStream(outputStream);
- marshaller.objectToObjectStream(store.size(), oos);
- for (StoredEntry se : store.values()) marshaller.objectToObjectStream(se, oos);
+ public void load(OutputStream outputStream) throws CacheLoaderException {
+ try {
+ ObjectOutputStream oos = outputStream instanceof ObjectOutputStream ?
(ObjectOutputStream) outputStream :
+ new ObjectOutputStream(outputStream);
+ marshaller.objectToObjectStream(store.size(), oos);
+ for (StoredEntry se : store.values()) marshaller.objectToObjectStream(se, oos);
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ }
}
public void clear() {
Modified:
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java 2009-02-18
16:06:10 UTC (rev 7723)
+++
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java 2009-02-18
17:13:31 UTC (rev 7724)
@@ -2,26 +2,78 @@
import org.horizon.loader.BaseCacheStoreTest;
import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
import org.horizon.test.TestingUtil;
import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-@Test(groups = "unit", enabled = false, testName =
"loader.file.FileCacheStoreTest")
+import java.io.File;
+
+@Test(groups = "unit", enabled = true, testName =
"loader.file.FileCacheStoreTest")
public class FileCacheStoreTest extends BaseCacheStoreTest {
- private final String tmpDirectory = "__tempDir/" +
getClass().getSimpleName();
+ private final String tmpDirectory = TestingUtil.TEST_FILES + File.separator +
getClass().getSimpleName();
protected CacheStore createCacheStore() {
CacheStore cs = new FileCacheStore();
FileCacheStoreConfig cfg = new FileCacheStoreConfig();
cfg.setLocation(tmpDirectory);
+ cfg.setPurgeSynchronously(true); // for more accurate unit testing
cs.init(cfg, getCache(), getMarshaller());
cs.start();
return cs;
}
@AfterTest
+ @BeforeTest
public void removeTempDirectory() {
TestingUtil.recursiveFileRemove(tmpDirectory);
}
+
+ @Override
+ public void testPurgeExpired() throws Exception {
+ long now = System.currentTimeMillis();
+ long lifespan = 1000;
+ cs.store(new StoredEntry("k1", "v1", now, now + lifespan));
+ cs.store(new StoredEntry("k2", "v2", now, now + lifespan));
+ cs.store(new StoredEntry("k3", "v3", now, now + lifespan));
+ assert cs.containsKey("k1");
+ assert cs.containsKey("k2");
+ assert cs.containsKey("k3");
+ Thread.sleep(lifespan + 100);
+ cs.purgeExpired();
+ FileCacheStore fcs = (FileCacheStore) cs;
+ assert fcs.lockAndGetBucket("k1", false, false) == null;
+ assert fcs.lockAndGetBucket("k2", false, false) == null;
+ assert fcs.lockAndGetBucket("k3", false, false) == null;
+ System.out.println("Global lock: " +
fcs.globalLock.readLock().toString());
+ }
+
+ public void testBucketRemoval() throws Exception {
+ FileCacheStore fcs = (FileCacheStore) cs;
+ FileCacheStore.Bucket b = null;
+ try {
+ b = fcs.lockAndGetBucket("test", true, false);
+ assert b != null;
+ assert !b.modified;
+ b.addEntry(new StoredEntry("test", "value"));
+ assert b.modified;
+
+ fcs.saveBucket(b);
+ assert !b.modified;
+ assert !b.entries.isEmpty();
+
+ assert new File(fcs.root, b.fileName).exists();
+
+ b.removeEntry("test");
+ assert b.entries.isEmpty();
+ assert b.modified;
+
+ fcs.saveBucket(b);
+ assert !new File(fcs.root, b.fileName).exists();
+ } finally {
+ fcs.unlockBucket(b);
+ }
+ }
}