Author: manik.surtani(a)jboss.com
Date: 2009-02-11 12:13:41 -0500 (Wed, 11 Feb 2009)
New Revision: 7683
Added:
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/modifications/PurgeExpired.java
core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/config/parsing/element/LoadersElementParser.java
core/branches/flat/src/main/java/org/horizon/container/CachedValue.java
core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
core/branches/flat/src/main/java/org/horizon/container/ExpirableCachedValue.java
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/horizon/eviction/EvictionAlgorithm.java
core/branches/flat/src/main/java/org/horizon/eviction/EvictionManagerImpl.java
core/branches/flat/src/main/java/org/horizon/eviction/algorithms/BaseEvictionAlgorithm.java
core/branches/flat/src/main/java/org/horizon/eviction/algorithms/nullalgo/NullEvictionAlgorithm.java
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderConfig.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
core/branches/flat/src/main/java/org/horizon/loader/modifications/Clear.java
core/branches/flat/src/main/java/org/horizon/loader/modifications/Modification.java
core/branches/flat/src/main/java/org/horizon/loader/modifications/Remove.java
core/branches/flat/src/main/java/org/horizon/loader/modifications/Store.java
core/branches/flat/src/main/resources/config-samples/all.xml
core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd
core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java
core/branches/flat/src/test/java/org/horizon/eviction/EvictionFunctionalTest.java
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java
Log:
more loader work in progress
Modified:
core/branches/flat/src/main/java/org/horizon/config/parsing/element/LoadersElementParser.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/config/parsing/element/LoadersElementParser.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/config/parsing/element/LoadersElementParser.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -27,6 +27,7 @@
import org.horizon.config.parsing.XmlParserBase;
import org.horizon.loader.CacheLoader;
import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.decorators.AsyncStoreConfig;
import org.horizon.loader.decorators.SingletonStoreConfig;
import org.horizon.util.Util;
import org.w3c.dom.Element;
@@ -69,13 +70,11 @@
CacheLoaderConfig clc;
try {
cl = (CacheLoader) Util.getInstance(clClass);
- clc = (CacheLoaderConfig) Util.getInstance(cl.getConfigurationClass());
+ clc = Util.getInstance(cl.getConfigurationClass());
} catch (Exception e) {
throw new ConfigurationException("Unable to instantiate cache loader or
configuration", e);
}
- String async = getAttributeValue(indivElement, "async");
- if (existsAttribute(async)) clc.setAsync(getBoolean(async));
String fetchPersistentState = getAttributeValue(indivElement,
"fetchPersistentState");
if (existsAttribute(fetchPersistentState))
clc.setFetchPersistentState(getBoolean(fetchPersistentState));
String ignoreModifications = getAttributeValue(indivElement,
"ignoreModifications");
@@ -85,8 +84,8 @@
clc.setClassName(clClass);
XmlConfigHelper.setValues(clc,
XmlConfigHelper.readPropertiesContents(indivElement), false, true);
- SingletonStoreConfig ssc =
parseSingletonStoreConfig(getSingleElementInCoreNS("singletonStore",
indivElement));
- if (ssc != null) clc.setSingletonStoreConfig(ssc);
+
clc.setSingletonStoreConfig(parseSingletonStoreConfig(getSingleElementInCoreNS("singletonStore",
indivElement)));
+
clc.setAsyncStoreConfig(parseAsyncStoreConfig(getSingleElementInCoreNS("async",
indivElement)));
return clc;
}
@@ -106,4 +105,29 @@
}
return ssc;
}
+
+ public AsyncStoreConfig parseAsyncStoreConfig(Element element) {
+ AsyncStoreConfig asc = new AsyncStoreConfig();
+ if (element == null) {
+ asc.setEnabled(false);
+ } else {
+ boolean async = getBoolean(getAttributeValue(element, "enabled"));
+ asc.setEnabled(async);
+
+ if (async) {
+ String tmp = getAttributeValue(element, "batchSize");
+ if (existsAttribute(tmp)) asc.setBatchSize(getInt(tmp));
+
+ tmp = getAttributeValue(element, "pollWait");
+ if (existsAttribute(tmp)) asc.setPollWait(getLong(tmp));
+
+ tmp = getAttributeValue(element, "queueSize");
+ if (existsAttribute(tmp)) asc.setQueueSize(getInt(tmp));
+
+ tmp = getAttributeValue(element, "threadPoolSize");
+ if (existsAttribute(tmp)) asc.setThreadPoolSize(getInt(tmp));
+ }
+ }
+ return asc;
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/container/CachedValue.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/CachedValue.java 2009-02-11
12:53:09 UTC (rev 7682)
+++ core/branches/flat/src/main/java/org/horizon/container/CachedValue.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -1,13 +1,13 @@
package org.horizon.container;
-public class CachedValue<V> {
- protected V value;
+public class CachedValue {
+ protected Object value;
protected long modifiedTime;
protected CachedValue() {
}
- public CachedValue(V value) {
+ public CachedValue(Object value) {
this.value = value;
touch();
}
@@ -20,11 +20,11 @@
return modifiedTime;
}
- public final V getValue() {
+ public final Object getValue() {
return value;
}
- public final void setValue(V value) {
+ public final void setValue(Object value) {
this.value = value;
}
}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/DataContainer.java 2009-02-11
12:53:09 UTC (rev 7682)
+++ core/branches/flat/src/main/java/org/horizon/container/DataContainer.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -34,29 +34,29 @@
* @since 1.0
*/
@Scope(Scopes.NAMED_CACHE)
-public interface DataContainer<K, V> {
- V get(K k);
+public interface DataContainer {
+ Object get(Object k);
- void put(K k, V v, long lifespan);
+ void put(Object k, Object v, long lifespan);
- boolean containsKey(K k);
+ boolean containsKey(Object k);
- V remove(K k);
+ Object remove(Object k);
int size();
void clear();
- Set<K> keySet();
+ Set<Object> keySet();
- long getModifiedTimestamp(K key);
+ long getModifiedTimestamp(Object key);
/**
* Purges entries that have passed their expiry time, returning a set of keys that
have been purged.
*
* @return set of keys that have been purged.
*/
- Set<K> purgeExpiredEntries();
+ Set<Object> purgeExpiredEntries();
- StoredEntry<K, V> createEntryForStorage(K key);
+ StoredEntry createEntryForStorage(Object key);
}
Modified:
core/branches/flat/src/main/java/org/horizon/container/ExpirableCachedValue.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/container/ExpirableCachedValue.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/container/ExpirableCachedValue.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -1,19 +1,19 @@
package org.horizon.container;
-public class ExpirableCachedValue<V> extends CachedValue<V> {
+public class ExpirableCachedValue extends CachedValue {
protected long createdTime;
protected long expiryTime;
protected ExpirableCachedValue() {
}
- public ExpirableCachedValue(V value, long createdTime, long expiryTime) {
+ public ExpirableCachedValue(Object value, long createdTime, long expiryTime) {
super(value);
this.createdTime = createdTime;
this.expiryTime = expiryTime;
}
- public ExpirableCachedValue(V value, long lifespan) {
+ public ExpirableCachedValue(Object value, long lifespan) {
super(value);
createdTime = getModifiedTime();
setLifespan(lifespan);
Modified:
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -42,11 +42,11 @@
* @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
* @since 1.0
*/
-public class UnsortedDataContainer<K, V> implements DataContainer<K, V> {
+public class UnsortedDataContainer implements DataContainer {
// data with expiry and without expiry are stored in different maps, for efficiency.
E.g., so that when purging expired
// stuff, we don't need to iterate through immortal data.
- final ConcurrentMap<K, CachedValue<V>> immortalData = new
ConcurrentHashMap<K, CachedValue<V>>();
- final ConcurrentMap<K, ExpirableCachedValue<V>> expirableData = new
ConcurrentHashMap<K, ExpirableCachedValue<V>>();
+ final ConcurrentMap<Object, CachedValue> immortalData = new
ConcurrentHashMap<Object, CachedValue>();
+ final ConcurrentMap<Object, ExpirableCachedValue> expirableData = new
ConcurrentHashMap<Object, ExpirableCachedValue>();
private static final Object NULL = new Object();
private CacheLoaderManager clm;
private CacheStore cacheStore;
@@ -56,36 +56,34 @@
this.clm = clm;
}
- private void expire(K key) {
+ private void expire(Object key) {
expirableData.remove(key);
expireOnCacheLoader(key);
}
- private void expireOnCacheLoader(K key) {
+ private void expireOnCacheLoader(Object key) {
if (cacheStore == null && clm != null) cacheStore = clm.getCacheStore();
if (cacheStore != null) {
cacheStore.remove(key);
}
}
- @SuppressWarnings("unchecked")
- private K maskNullKey(K o) {
- return o == null ? (K) NULL : o;
+ private Object maskNullKey(Object o) {
+ return o == null ? NULL : o;
}
- private K unmaskNullKey(K o) {
+ private Object unmaskNullKey(Object o) {
return (o == NULL) ? null : o;
}
- @SuppressWarnings("unchecked")
- public V get(K k) {
- K maskedKey = maskNullKey(k);
- CachedValue<V> cv = immortalData.get(maskedKey);
+ public Object get(Object k) {
+ Object maskedKey = maskNullKey(k);
+ CachedValue cv = immortalData.get(maskedKey);
if (cv != null) {
cv.touch();
return cv.getValue();
} else {
- ExpirableCachedValue<V> ecv = expirableData.get(maskedKey);
+ ExpirableCachedValue ecv = expirableData.get(maskedKey);
if (ecv != null) {
if (ecv.isExpired()) {
expire(maskedKey);
@@ -99,11 +97,11 @@
return null;
}
- public void put(K k, V v, long lifespan) {
- K maskedKey = maskNullKey(k);
+ public void put(Object k, Object v, long lifespan) {
+ Object maskedKey = maskNullKey(k);
- CachedValue<V> cv = immortalData.get(maskedKey);
- ExpirableCachedValue<V> ecv;
+ CachedValue cv = immortalData.get(maskedKey);
+ ExpirableCachedValue ecv;
if (cv != null) {
// do we need to move this to expirable?
if (lifespan < 0) {
@@ -111,7 +109,7 @@
cv.setValue(v);
cv.touch();
} else {
- ecv = new ExpirableCachedValue<V>(v, lifespan);
+ ecv = new ExpirableCachedValue(v, lifespan);
immortalData.remove(maskedKey);
expirableData.put(maskedKey, ecv);
}
@@ -119,7 +117,7 @@
// do we need to move this to immortal?
if (lifespan < 0) {
// yes.
- cv = new CachedValue<V>(v);
+ cv = new CachedValue(v);
expirableData.remove(maskedKey);
immortalData.put(maskedKey, cv);
} else {
@@ -129,19 +127,19 @@
} else {
// does not exist anywhere!
if (lifespan < 0) {
- cv = new CachedValue<V>(v);
+ cv = new CachedValue(v);
immortalData.put(maskedKey, cv);
} else {
- ecv = new ExpirableCachedValue<V>(v, lifespan);
+ ecv = new ExpirableCachedValue(v, lifespan);
expirableData.put(maskedKey, ecv);
}
}
}
- public boolean containsKey(K k) {
- K maskedKey = maskNullKey(k);
+ public boolean containsKey(Object k) {
+ Object maskedKey = maskNullKey(k);
if (!immortalData.containsKey(maskedKey)) {
- ExpirableCachedValue<V> ecv = expirableData.get(maskedKey);
+ ExpirableCachedValue ecv = expirableData.get(maskedKey);
if (ecv == null) return false;
if (ecv.isExpired()) {
expire(maskedKey);
@@ -151,17 +149,16 @@
return true;
}
- public long getModifiedTimestamp(K key) {
- K maskedKey = maskNullKey(key);
- CachedValue<V> cv = immortalData.get(maskedKey);
+ public long getModifiedTimestamp(Object key) {
+ Object maskedKey = maskNullKey(key);
+ CachedValue cv = immortalData.get(maskedKey);
if (cv == null) cv = expirableData.get(maskedKey);
return cv == null ? -1 : cv.getModifiedTime();
}
- @SuppressWarnings("unchecked")
- public V remove(K k) {
- K maskedKey = maskNullKey(k);
- CachedValue<V> cv = immortalData.remove(maskedKey);
+ public Object remove(Object k) {
+ Object maskedKey = maskNullKey(k);
+ CachedValue cv = immortalData.remove(maskedKey);
if (cv == null) cv = expirableData.remove(maskedKey);
if (cv == null) {
@@ -180,7 +177,7 @@
expirableData.clear();
}
- public Set<K> keySet() {
+ public Set<Object> keySet() {
return new KeySet();
}
@@ -188,11 +185,11 @@
return "Immortal Data: " + immortalData.toString() + "\n" +
"Expirable Data: " + expirableData.toString();
}
- public Set<K> purgeExpiredEntries() {
- Set<K> purged = new HashSet<K>();
- for (Iterator<Map.Entry<K, ExpirableCachedValue<V>>> iter =
expirableData.entrySet().iterator(); iter.hasNext();) {
- Map.Entry<K, ExpirableCachedValue<V>> entry = iter.next();
- ExpirableCachedValue<V> cv = entry.getValue();
+ public Set<Object> purgeExpiredEntries() {
+ Set<Object> purged = new HashSet<Object>();
+ for (Iterator<Map.Entry<Object, ExpirableCachedValue>> iter =
expirableData.entrySet().iterator(); iter.hasNext();) {
+ Map.Entry<Object, ExpirableCachedValue> entry = iter.next();
+ ExpirableCachedValue cv = entry.getValue();
if (cv.isExpired()) {
expireOnCacheLoader(entry.getKey());
purged.add(entry.getKey());
@@ -202,25 +199,25 @@
return purged;
}
- public StoredEntry<K, V> createEntryForStorage(K key) {
- CachedValue<V> immortal = immortalData.get(key);
+ public StoredEntry createEntryForStorage(Object key) {
+ CachedValue immortal = immortalData.get(key);
if (immortal != null)
- return new StoredEntry<K, V>(key, immortal.getValue(), -1, -1);
- ExpirableCachedValue<V> ecv = expirableData.get(key);
+ return new StoredEntry(key, immortal.getValue());
+ ExpirableCachedValue ecv = expirableData.get(key);
if (ecv == null) return null;
- return new StoredEntry<K, V>(key, ecv.getValue(), ecv.getCreatedTime(),
ecv.getExpiryTime());
+ return new StoredEntry(key, ecv.getValue(), ecv.getCreatedTime(),
ecv.getExpiryTime());
}
- private class KeySet extends AbstractSet<K> {
- Set<K> immortalKeys;
- Set<K> expirableKeys;
+ private class KeySet extends AbstractSet<Object> {
+ Set<Object> immortalKeys;
+ Set<Object> expirableKeys;
public KeySet() {
immortalKeys = immortalData.keySet();
expirableKeys = expirableData.keySet();
}
- public Iterator<K> iterator() {
+ public Iterator<Object> iterator() {
return new KeyIterator(immortalKeys.iterator(), expirableKeys.iterator());
}
@@ -229,7 +226,7 @@
}
public boolean contains(Object o) {
- K maskedKey = maskNullKey((K) o);
+ Object maskedKey = maskNullKey((Object) o);
return immortalKeys.contains(maskedKey) || expirableKeys.contains(maskedKey);
}
@@ -242,14 +239,14 @@
}
}
- private class KeyIterator implements Iterator<K> {
- Iterator<Iterator<K>> metaIterator;
- Iterator<K> immortalIterator;
- Iterator<K> expirableIterator;
- Iterator<K> currentIterator;
+ private class KeyIterator implements Iterator<Object> {
+ Iterator<Iterator<Object>> metaIterator;
+ Iterator<Object> immortalIterator;
+ Iterator<Object> expirableIterator;
+ Iterator<Object> currentIterator;
- private KeyIterator(Iterator<K> immortalIterator, Iterator<K>
expirableIterator) {
- List<Iterator<K>> iterators = new
ArrayList<Iterator<K>>(2);
+ private KeyIterator(Iterator<Object> immortalIterator, Iterator<Object>
expirableIterator) {
+ List<Iterator<Object>> iterators = new
ArrayList<Iterator<Object>>(2);
iterators.add(immortalIterator);
iterators.add(expirableIterator);
metaIterator = iterators.iterator();
@@ -267,7 +264,7 @@
}
@SuppressWarnings("unchecked")
- public K next() {
+ public Object next() {
return unmaskNullKey(currentIterator.next());
}
Modified: core/branches/flat/src/main/java/org/horizon/eviction/EvictionAlgorithm.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/eviction/EvictionAlgorithm.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/eviction/EvictionAlgorithm.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -66,7 +66,7 @@
* @param dataContiner the cache's data container
* @param evictionAlgorithmConfig algorithm configuration to use
*/
- void init(Cache<?, ?> cache, DataContainer<?, ?> dataContiner,
+ void init(Cache<?, ?> cache, DataContainer dataContiner,
EvictionAlgorithmConfig evictionAlgorithmConfig);
/**
Modified: core/branches/flat/src/main/java/org/horizon/eviction/EvictionManagerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/eviction/EvictionManagerImpl.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/eviction/EvictionManagerImpl.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -46,7 +46,7 @@
ScheduledExecutorService executor;
Configuration configuration;
Cache cache;
- private DataContainer<?, ?> dataContainer;
+ private DataContainer dataContainer;
@Inject
public void initialize((a)ComponentName(KnownComponentNames.EVICTION_SCHEDULED_EXECUTOR)
ScheduledExecutorService executor,
Modified:
core/branches/flat/src/main/java/org/horizon/eviction/algorithms/BaseEvictionAlgorithm.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/eviction/algorithms/BaseEvictionAlgorithm.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/eviction/algorithms/BaseEvictionAlgorithm.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -116,7 +116,7 @@
this.action = evictionAction;
}
- public void init(Cache<?, ?> cache, DataContainer<?, ?> dataContainer,
EvictionAlgorithmConfig evictionAlgorithmConfig) {
+ public void init(Cache<?, ?> cache, DataContainer dataContainer,
EvictionAlgorithmConfig evictionAlgorithmConfig) {
this.cache = cache;
this.dataContainer = dataContainer;
this.config = (BaseEvictionAlgorithmConfig) evictionAlgorithmConfig;
Modified:
core/branches/flat/src/main/java/org/horizon/eviction/algorithms/nullalgo/NullEvictionAlgorithm.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/eviction/algorithms/nullalgo/NullEvictionAlgorithm.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/eviction/algorithms/nullalgo/NullEvictionAlgorithm.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -66,7 +66,7 @@
// no-op
}
- public void init(Cache<?, ?> cache, DataContainer<?, ?> dataContainer,
EvictionAlgorithmConfig evictionAlgorithmConfig) {
+ public void init(Cache<?, ?> cache, DataContainer dataContainer,
EvictionAlgorithmConfig evictionAlgorithmConfig) {
// no-op
}
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -56,7 +56,7 @@
protected TransactionTable txTable = null;
protected CacheLoader loader;
- protected DataContainer<Object, Object> dataContainer;
+ protected DataContainer dataContainer;
protected CacheNotifier notifier;
protected EntryFactory entryFactory;
@@ -71,7 +71,7 @@
@Inject
protected void injectDependencies(TransactionTable txTable, CacheLoaderManager clm,
- DataContainer<Object, Object> dataContainer,
EntryFactory entryFactory, CacheNotifier notifier) {
+ DataContainer dataContainer, EntryFactory
entryFactory, CacheNotifier notifier) {
this.txTable = txTable;
this.clm = clm;
this.dataContainer = dataContainer;
Modified: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -6,5 +6,5 @@
* @author Manik Surtani
* @since 1.0
*/
-public abstract class AbstractCacheLoader<K, V> implements CacheLoader<K, V>
{
+public abstract class AbstractCacheLoader implements CacheLoader {
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -1,27 +1,17 @@
package org.horizon.loader;
import org.horizon.config.PluggableConfigurationComponent;
+import org.horizon.loader.decorators.AsyncStoreConfig;
import org.horizon.loader.decorators.SingletonStoreConfig;
import org.horizon.util.Util;
public class AbstractCacheLoaderConfig extends PluggableConfigurationComponent implements
CacheLoaderConfig {
- private boolean async;
private boolean ignoreModifications;
private boolean fetchPersistentState;
private boolean purgeOnStartup;
private SingletonStoreConfig singletonStoreConfig;
+ private AsyncStoreConfig asyncStoreConfig;
-// protected void
populateFromBaseConfig(CacheLoaderManagerConfig.IndividualCacheLoaderConfig base) {
-// if (base != null) {
-// setAsync(base.isAsync());
-// setIgnoreModifications(base.isIgnoreModifications());
-// setFetchPersistentState(base.isFetchPersistentState());
-// setSingletonStoreConfig(base.getSingletonStoreConfig());
-// setPurgeOnStartup(base.isPurgeOnStartup());
-// setProperties(base.getProperties());
-// }
-// }
-
public boolean isPurgeOnStartup() {
return purgeOnStartup;
}
@@ -35,15 +25,6 @@
this.fetchPersistentState = fetchPersistentState;
}
- public void setAsync(boolean async) {
- testImmutability("async");
- this.async = async;
- }
-
- public boolean isAsync() {
- return async;
- }
-
public void setIgnoreModifications(boolean ignoreModifications) {
testImmutability("ignoreModifications");
this.ignoreModifications = ignoreModifications;
@@ -67,6 +48,15 @@
this.singletonStoreConfig = singletonStoreConfig;
}
+ public AsyncStoreConfig getAsyncStoreConfig() {
+ return asyncStoreConfig;
+ }
+
+ public void setAsyncStoreConfig(AsyncStoreConfig asyncStoreConfig) {
+ testImmutability("asyncStoreConfig");
+ this.asyncStoreConfig = asyncStoreConfig;
+ }
+
@Override
public boolean equals(Object obj) {
if (super.equals(obj)) {
@@ -81,10 +71,10 @@
AbstractCacheLoaderConfig other = (AbstractCacheLoaderConfig) obj;
return Util.safeEquals(this.className, other.className)
- && (this.async == other.async)
&& (this.ignoreModifications == other.ignoreModifications)
&& (this.fetchPersistentState == other.fetchPersistentState)
- && Util.safeEquals(this.singletonStoreConfig,
other.singletonStoreConfig);
+ && Util.safeEquals(this.singletonStoreConfig,
other.singletonStoreConfig)
+ && Util.safeEquals(this.asyncStoreConfig, other.asyncStoreConfig);
}
@Override
@@ -95,10 +85,10 @@
protected int hashCodeExcludingProperties() {
int result = 17;
result = 31 * result + (className == null ? 0 : className.hashCode());
- result = 31 * result + (async ? 0 : 1);
result = 31 * result + (ignoreModifications ? 0 : 1);
result = 31 * result + (fetchPersistentState ? 0 : 1);
result = 31 * result + (singletonStoreConfig == null ? 0 :
singletonStoreConfig.hashCode());
+ result = 31 * result + (asyncStoreConfig == null ? 0 :
asyncStoreConfig.hashCode());
result = 31 * result + (purgeOnStartup ? 0 : 1);
return result;
}
@@ -106,7 +96,6 @@
@Override
public String toString() {
return new
StringBuilder().append("IndividualCacheLoaderConfig{").append("className='").append(className).append('\'')
- .append(", async=").append(async)
.append(", ignoreModifications=").append(ignoreModifications)
.append(", fetchPersistentState=").append(fetchPersistentState)
.append(", properties=").append(properties)
Modified: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java 2009-02-11
12:53:09 UTC (rev 7682)
+++ core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -4,7 +4,10 @@
import org.horizon.loader.modifications.Remove;
import org.horizon.loader.modifications.Store;
+import javax.transaction.Transaction;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* // TODO: Manik: Document this!
@@ -12,21 +15,22 @@
* @author Manik Surtani
* @since 1.0
*/
-public abstract class AbstractCacheStore<K, V> extends AbstractCacheLoader<K,
V> implements CacheStore<K, V> {
+public abstract class AbstractCacheStore extends AbstractCacheLoader implements
CacheStore {
- @SuppressWarnings("unchecked")
+ private final Map<Transaction, List<Modification>> transactions = new
ConcurrentHashMap<Transaction, List<Modification>>();
+
protected void applyModifications(List<Modification> mods) {
for (Modification m : mods) {
switch (m.getType()) {
case STORE:
- Store<K, V> s = (Store<K, V>) m;
+ Store s = (Store) m;
store(s.getStoredEntry());
break;
case CLEAR:
clear();
break;
case REMOVE:
- Remove<K> r = (Remove<K>) m;
+ Remove r = (Remove) m;
remove(r.getKey());
break;
default:
@@ -35,4 +39,20 @@
}
}
+ public void prepare(List<Modification> mods, Transaction tx, boolean isOnePhase)
{
+ if (isOnePhase) {
+ applyModifications(mods);
+ } else {
+ transactions.put(tx, mods);
+ }
+ }
+
+ public void rollback(Transaction tx) {
+ transactions.remove(tx);
+ }
+
+ public void commit(Transaction tx) {
+ List<Modification> list = transactions.remove(tx);
+ if (list != null && !list.isEmpty()) applyModifications(list);
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java 2009-02-11
12:53:09 UTC (rev 7682)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -12,7 +12,7 @@
* @author Manik Surtani
* @since 1.0
*/
-public interface CacheLoader<K, V> extends Lifecycle {
+public interface CacheLoader extends Lifecycle {
/**
* Used to initialize a cache loader. Typically invoked by the {@link
org.horizon.loader.CacheLoaderManager} when
@@ -32,20 +32,20 @@
* @param key key
* @return an entry
*/
- StoredEntry<K, V> load(K key);
+ StoredEntry load(Object key);
/**
* Loads all entries in the loader. Expired entries are not returned.
*
* @return a set of entries, or an empty set if the loader is emptied.
*/
- Set<StoredEntry<K, V>> loadAll();
+ Set<StoredEntry> loadAll();
/**
* @param key key to test
* @return true if the key exists, false otherwise
*/
- boolean containsKey(K key);
+ boolean containsKey(Object key);
/**
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderConfig.java 2009-02-11
12:53:09 UTC (rev 7682)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderConfig.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -1,5 +1,6 @@
package org.horizon.loader;
+import org.horizon.loader.decorators.AsyncStoreConfig;
import org.horizon.loader.decorators.SingletonStoreConfig;
/**
@@ -15,10 +16,6 @@
void setFetchPersistentState(boolean fetchPersistentState);
- void setAsync(boolean async);
-
- boolean isAsync();
-
void setIgnoreModifications(boolean ignoreModifications);
boolean isIgnoreModifications();
@@ -29,6 +26,10 @@
void setSingletonStoreConfig(SingletonStoreConfig singletonStoreConfig);
+ AsyncStoreConfig getAsyncStoreConfig();
+
+ void setAsyncStoreConfig(AsyncStoreConfig asyncStoreConfig);
+
CacheLoaderConfig clone();
String getClassName();
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -167,8 +167,8 @@
if (tmpLoader instanceof CacheStore) {
CacheStore tmpStore = (CacheStore) tmpLoader;
// async?
- if (cfg.isAsync()) {
- tmpStore = new AsyncStore(tmpStore);
+ if (cfg.getAsyncStoreConfig().isEnabled()) {
+ tmpStore = new AsyncStore(tmpStore, cfg.getAsyncStoreConfig());
tmpLoader = tmpStore;
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-11
12:53:09 UTC (rev 7682)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -14,14 +14,14 @@
* @author Manik Surtani
* @since 1.0
*/
-public interface CacheStore<K, V> extends CacheLoader<K, V> {
+public interface CacheStore extends CacheLoader {
/**
* Stores an entry
*
* @param ed entry to store
*/
- void store(StoredEntry<K, V> ed);
+ void store(StoredEntry ed);
/**
* Writes contents of the stream to the store. Implementations should expect that the
stream contains data in an
@@ -30,6 +30,8 @@
* dealing with the stream to make use of efficient marshalling.
*
* @param inputStream stream to read from
+ * @throws java.io.IOException in case of IO problems
+ * @throws ClassNotFoundException in case of not being able to read the stream
*/
void store(InputStream inputStream) throws IOException, ClassNotFoundException;
@@ -56,7 +58,7 @@
* @param key key to remove
* @return true if the entry was removed; false if the entry wasn't found.
*/
- boolean remove(K key);
+ boolean remove(Object key);
/**
* Purges expired entries from the store.
Modified: core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java 2009-02-11
12:53:09 UTC (rev 7682)
+++ core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -12,18 +12,23 @@
*
* @author Manik Surtani
*/
-public class StoredEntry<K, V> extends ExpirableCachedValue<V> implements
Externalizable {
- private K key;
+public class StoredEntry extends ExpirableCachedValue implements Externalizable {
+ private Object key;
public StoredEntry() {
}
- public StoredEntry(K key, V value, long created, long expiry) {
+ public StoredEntry(Object key, Object value) {
+ super(value, -1, -1);
+ this.key = key;
+ }
+
+ public StoredEntry(Object key, Object value, long created, long expiry) {
super(value, created, expiry);
this.key = key;
}
- public final K getKey() {
+ public final Object getKey() {
return key;
}
@@ -65,8 +70,8 @@
@SuppressWarnings("unchecked")
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- key = (K) in.readObject();
- value = (V) in.readObject();
+ key = in.readObject();
+ value = in.readObject();
createdTime = in.readLong();
expiryTime = in.readLong();
modifiedTime = System.currentTimeMillis();
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -1,21 +1,103 @@
package org.horizon.loader.decorators;
+import org.horizon.Cache;
+import org.horizon.loader.AbstractCacheStore;
+import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.marshall.Marshaller;
+import javax.transaction.Transaction;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Set;
+
/**
- * // TODO: Manik: Document this!
+ * Simple delegate that delegates all calls. This is intended as a building block for
other decorators who wish to add
+ * behavior to certain calls only.
*
* @author Manik Surtani
+ * @since 1.0
*/
-public abstract class AbstractDelegatingStore implements CacheStore {
+public class AbstractDelegatingStore extends AbstractCacheStore {
CacheStore delegate;
- public void setDelegate(CacheStore cacheStore) {
- this.delegate = cacheStore;
+ public AbstractDelegatingStore(CacheStore delegate) {
+ this.delegate = delegate;
}
+ public void setDelegate(CacheStore delegate) {
+ this.delegate = delegate;
+ }
+
public CacheStore getDelegate() {
return delegate;
}
+
+ public void store(StoredEntry ed) {
+ delegate.store(ed);
+ }
+
+ public void store(InputStream inputStream) throws IOException, ClassNotFoundException
{
+ delegate.store(inputStream);
+ }
+
+ public void load(OutputStream outputStream) throws IOException {
+ delegate.load(outputStream);
+ }
+
+ public void clear() {
+ delegate.clear();
+ }
+
+ public boolean remove(Object key) {
+ return delegate.remove(key);
+ }
+
+ public void purgeExpired() {
+ delegate.purgeExpired();
+ }
+
+ public void commit(Transaction tx) {
+ delegate.commit(tx);
+ }
+
+ public void rollback(Transaction tx) {
+ delegate.rollback(tx);
+ }
+
+ public void prepare(List list, Transaction tx, boolean isOnePhase) {
+ delegate.prepare(list, tx, isOnePhase);
+ }
+
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ delegate.init(config, cache, m);
+ }
+
+ public StoredEntry load(Object key) {
+ return delegate.load(key);
+ }
+
+ public Set loadAll() {
+ return delegate.loadAll();
+ }
+
+ public boolean containsKey(Object key) {
+ return delegate.containsKey(key);
+ }
+
+ public Class getConfigurationClass() {
+ return delegate.getConfigurationClass();
+ }
+
+ public void start() {
+ delegate.start();
+ }
+
+ public void stop() {
+ delegate.stop();
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStore.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStore.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -1,107 +1,208 @@
package org.horizon.loader.decorators;
-import org.horizon.Cache;
-import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.CacheException;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
-import org.horizon.marshall.Marshaller;
+import org.horizon.loader.modifications.Clear;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.loader.modifications.PurgeExpired;
+import org.horizon.loader.modifications.Remove;
+import org.horizon.loader.modifications.Store;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
- * An asynchronous wrapper around a cache store
+ * The AsyncStore is a delegating CacheStore that extends AbstractDelegatingStore,
overriding methods to that should not
+ * just delegate the operation to the underlying store.
+ * <p/>
+ * Read operations are done synchronously, while write operations are done
asynchronously. There is no provision for
+ * exception handling for problems encountered with the underlying store during a write
operation, and the exception is
+ * just logged.
+ * <p/>
+ * When configuring the loader, use the following element:
+ * <p/>
+ * <code> <async enabled="true" /> </code>
+ * <p/>
+ * to define whether cache loader operations are to be asynchronous. If not specified, a
cache loader operation is
+ * assumed synchronous and this decorator is not applied.
+ * <p/>
*
* @author Manik Surtani
* @since 1.0
*/
-public class AsyncStore implements CacheStore {
+public class AsyncStore extends AbstractDelegatingStore {
- CacheStore delegate;
+ private static final Log log = LogFactory.getLog(AsyncStore.class);
+ private static final boolean trace = log.isTraceEnabled();
- public AsyncStore(CacheStore delegate) {
- this.delegate = delegate;
- }
+ private static AtomicInteger threadId = new AtomicInteger(0);
- public void store(StoredEntry ed) {
- // TODO: Manik: Customise this generated block
- }
+ private ExecutorService executor;
+ private AtomicBoolean stopped = new AtomicBoolean(true);
+ private BlockingQueue<Modification> queue;
+ private List<Future> processorFutures;
+ private AsyncStoreConfig asyncStoreConfig;
- public void storeAll(Collection ed) {
- // TODO: Manik: Customise this generated block
+ public AsyncStore(CacheStore cacheStore, AsyncStoreConfig asyncStoreConfig) {
+ super(cacheStore);
+ this.asyncStoreConfig = asyncStoreConfig;
}
- public void store(InputStream inputStream) {
- // TODO: Manik: Customise this generated block
+ public void store(StoredEntry ed) {
+ enqueue(new Store(ed));
}
- public void load(OutputStream outputStream) throws IOException {
- // TODO: Manik: Customise this generated block
- }
-
public void clear() {
- // TODO: Manik: Customise this generated block
+ enqueue(new Clear());
}
public boolean remove(Object key) {
- return false; // TODO: Manik: Customise this generated block
+ enqueue(new Remove(key));
+ return true;
}
public void purgeExpired() {
- // TODO: Manik: Customise this generated block
+ enqueue(new PurgeExpired());
}
- public void commit(Transaction tx) {
- // TODO: Manik: Customise this generated block
+ private void enqueue(final Modification mod) {
+ try {
+ if (stopped.get()) {
+ throw new CacheException("AsyncStore stopped; no longer accepting more
entries.");
+ }
+ log.trace("Enqueuing modification {0}", mod);
+ queue.put(mod);
+ } catch (Exception e) {
+ throw new CacheException("Unable to enqueue asynchronous task", e);
+ }
}
- public void rollback(Transaction tx) {
- // TODO: Manik: Customise this generated block
+ @Override
+ public void start() {
+ queue = new
LinkedBlockingQueue<Modification>(asyncStoreConfig.getQueueSize());
+ log.info("Async cache loader starting {0}", this);
+ stopped.set(false);
+ super.start();
+ int poolSize = asyncStoreConfig.getThreadPoolSize();
+ executor = Executors.newFixedThreadPool(poolSize, new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "AsyncStore-" +
threadId.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ processorFutures = new ArrayList<Future>(poolSize);
+ for (int i = 0; i < poolSize; i++) processorFutures.add(executor.submit(new
AsyncProcessor()));
}
- public void prepare(List list, Transaction tx, boolean isOnePhase) {
- // TODO: Manik: Customise this generated block
+ @Override
+ public void stop() {
+ stopped.set(true);
+ if (executor != null) {
+ for (Future f : processorFutures) f.cancel(true);
+ executor.shutdown();
+ try {
+ boolean terminated = executor.isTerminated();
+ while (!terminated) {
+ terminated = executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ executor = null;
+ super.stop();
}
- public void init(CacheLoaderConfig config, Cache cache, TransactionManager tm,
Marshaller m) {
- // TODO: Manik: Customise this generated block
+ protected void applyModificationsSync(List<Modification> mods) {
+ for (Modification m : mods) {
+ switch (m.getType()) {
+ case STORE:
+ Store s = (Store) m;
+ super.store(s.getStoredEntry());
+ break;
+ case CLEAR:
+ super.clear();
+ break;
+ case REMOVE:
+ Remove r = (Remove) m;
+ super.remove(r.getKey());
+ break;
+ case PURGE_EXPIRED:
+ super.purgeExpired();
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown modification type "
+ m.getType());
+ }
+ }
}
- public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
- // TODO: Manik: Customise this generated block
- }
- public StoredEntry load(Object key) {
- return null; // TODO: Manik: Customise this generated block
- }
+ /**
+ * Processes (by batch if possible) a queue of {@link Modification}s.
+ *
+ * @author manik surtani
+ */
+ private class AsyncProcessor implements Runnable {
+ // Modifications to invoke as a single put
+ private final List<Modification> mods = new
ArrayList<Modification>(asyncStoreConfig.getBatchSize());
- public Set loadAll(Collection keys) {
- return null; // TODO: Manik: Customise this generated block
- }
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ run0();
+ }
+ catch (InterruptedException e) {
+ break;
+ }
+ }
- public Set loadAll() {
- return null; // TODO: Manik: Customise this generated block
- }
+ try {
+ if (trace) log.trace("Process remaining batch {0}", mods.size());
+ put(mods);
+ if (trace) log.trace("Process remaining queued {0}",
queue.size());
+ while (!queue.isEmpty()) run0();
+ }
+ catch (InterruptedException e) {
+ log.trace("remaining interrupted");
+ }
+ }
- public boolean containsKey(Object key) {
- return false; // TODO: Manik: Customise this generated block
- }
+ private void run0() throws InterruptedException {
+ log.trace("Checking for modifications");
+ int i = queue.drainTo(mods, asyncStoreConfig.getBatchSize());
+ if (i == 0) {
+ Modification m = queue.take();
+ mods.add(m);
+ }
- public Class getConfigurationClass() {
- return null; // TODO: Manik: Customise this generated block
- }
+ if (trace) log.trace("Calling put(List) with {0} modifications",
mods.size());
+ put(mods);
+ mods.clear();
+ }
- public void start() {
- // TODO: Manik: Customise this generated block
+ private void put(List<Modification> mods) {
+ try {
+ AsyncStore.this.applyModificationsSync(mods);
+ }
+ catch (Exception e) {
+ if (log.isWarnEnabled()) log.warn("Failed to process async
modifications: " + e);
+ if (log.isDebugEnabled()) log.debug("Exception: ", e);
+ }
+ }
}
- public void stop() {
- // TODO: Manik: Customise this generated block
- }
}
Added:
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -0,0 +1,63 @@
+package org.horizon.loader.decorators;
+
+import org.horizon.config.AbstractNamedCacheConfigurationBean;
+
+/**
+ * Configuration for the async cache loader
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public class AsyncStoreConfig extends AbstractNamedCacheConfigurationBean {
+ boolean enabled;
+ int batchSize = 100;
+ long pollWait = 100;
+ int queueSize = 10000;
+ int threadPoolSize = 1;
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ testImmutability("enabled");
+ this.enabled = enabled;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ testImmutability("batchSize");
+ this.batchSize = batchSize;
+ }
+
+ public long getPollWait() {
+ return pollWait;
+ }
+
+ public void setPollWait(long pollWait) {
+ testImmutability("pollWait");
+ this.pollWait = pollWait;
+ }
+
+ public int getQueueSize() {
+ return queueSize;
+ }
+
+ public void setQueueSize(int queueSize) {
+ testImmutability("queueSize");
+ this.queueSize = queueSize;
+ }
+
+ public int getThreadPoolSize() {
+ return threadPoolSize;
+ }
+
+ public void setThreadPoolSize(int threadPoolSize) {
+ testImmutability("threadPoolSize");
+ this.threadPoolSize = threadPoolSize;
+ }
+
+}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -1,106 +1,62 @@
package org.horizon.loader.decorators;
-import org.horizon.Cache;
-import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
-import org.horizon.marshall.Marshaller;
import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
import java.util.List;
-import java.util.Set;
/**
- * // TODO: Manik: Document this!
+ * A decorator that makes the underlying store a {@link org.horizon.loader.CacheLoader},
i.e., suppressing all write
+ * methods.
*
* @author Manik Surtani
+ * @since 1.0
*/
-public class ReadOnlyStore implements CacheStore {
+public class ReadOnlyStore extends AbstractDelegatingStore {
- CacheStore delegate;
-
public ReadOnlyStore(CacheStore delegate) {
- this.delegate = delegate;
+ super(delegate);
}
+ @Override
public void store(StoredEntry ed) {
- // TODO: Manik: Customise this generated block
+ // no-op
}
- public void storeAll(Collection ed) {
- // TODO: Manik: Customise this generated block
- }
-
+ @Override
public void store(InputStream inputStream) {
- // TODO: Manik: Customise this generated block
+ // no-op
}
- public void load(OutputStream outputStream) throws IOException {
- // TODO: Manik: Customise this generated block
- }
-
+ @Override
public void clear() {
- // TODO: Manik: Customise this generated block
+ // no-op
}
+ @Override
public boolean remove(Object key) {
- return false; // TODO: Manik: Customise this generated block
+ return false; // no-op
}
+ @Override
public void purgeExpired() {
- // TODO: Manik: Customise this generated block
+ // no-op
}
+ @Override
public void commit(Transaction tx) {
- // TODO: Manik: Customise this generated block
+ // no-op
}
+ @Override
public void rollback(Transaction tx) {
- // TODO: Manik: Customise this generated block
+ // no-op
}
+ @Override
public void prepare(List list, Transaction tx, boolean isOnePhase) {
- // TODO: Manik: Customise this generated block
+ // no-op
}
-
- public void init(CacheLoaderConfig config, Cache cache, TransactionManager tm,
Marshaller m) {
- // TODO: Manik: Customise this generated block
- }
-
- public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
- // TODO: Manik: Customise this generated block
- }
-
- public StoredEntry load(Object key) {
- return null; // TODO: Manik: Customise this generated block
- }
-
- public Set loadAll(Collection keys) {
- return null; // TODO: Manik: Customise this generated block
- }
-
- public Set loadAll() {
- return null; // TODO: Manik: Customise this generated block
- }
-
- public boolean containsKey(Object key) {
- return false; // TODO: Manik: Customise this generated block
- }
-
- public Class getConfigurationClass() {
- return null; // TODO: Manik: Customise this generated block
- }
-
- public void start() {
- // TODO: Manik: Customise this generated block
- }
-
- public void stop() {
- // TODO: Manik: Customise this generated block
- }
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -20,8 +20,8 @@
* @author Manik Surtani
*/
public class SingletonStore extends AbstractDelegatingStore {
- public SingletonStore(CacheStore tmpStore) {
- setDelegate(tmpStore);
+ public SingletonStore(CacheStore delegate) {
+ super(delegate);
}
public void store(StoredEntry ed) {
Modified: core/branches/flat/src/main/java/org/horizon/loader/modifications/Clear.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/modifications/Clear.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/modifications/Clear.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -7,10 +7,8 @@
* @since 1.0
*/
public class Clear implements Modification {
- final Type type = Type.CLEAR;
-
public Type getType() {
- return type;
+ return Type.CLEAR;
}
@Override
Modified:
core/branches/flat/src/main/java/org/horizon/loader/modifications/Modification.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/modifications/Modification.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/modifications/Modification.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -8,7 +8,7 @@
*/
public interface Modification {
public static enum Type {
- STORE, REMOVE, CLEAR
+ STORE, REMOVE, CLEAR, PURGE_EXPIRED;
}
Type getType();
Added:
core/branches/flat/src/main/java/org/horizon/loader/modifications/PurgeExpired.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/modifications/PurgeExpired.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/modifications/PurgeExpired.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -0,0 +1,7 @@
+package org.horizon.loader.modifications;
+
+public class PurgeExpired implements Modification {
+ public Type getType() {
+ return Type.PURGE_EXPIRED;
+ }
+}
Modified: core/branches/flat/src/main/java/org/horizon/loader/modifications/Remove.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/modifications/Remove.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/modifications/Remove.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -6,20 +6,19 @@
* @author Manik Surtani
* @since 1.0
*/
-public class Remove<K> implements Modification {
+public class Remove implements Modification {
- final Type type = Type.REMOVE;
- final K key;
+ final Object key;
- public Remove(K key) {
+ public Remove(Object key) {
this.key = key;
}
public Type getType() {
- return type;
+ return Type.REMOVE;
}
- public K getKey() {
+ public Object getKey() {
return key;
}
@@ -31,15 +30,13 @@
Remove remove = (Remove) o;
if (key != null ? !key.equals(remove.key) : remove.key != null) return false;
- if (type != remove.type) return false;
return true;
}
@Override
public int hashCode() {
- int result = type != null ? type.hashCode() : 0;
- result = 31 * result + (key != null ? key.hashCode() : 0);
+ int result = key != null ? key.hashCode() : 0;
return result;
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/modifications/Store.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/modifications/Store.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/main/java/org/horizon/loader/modifications/Store.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -8,20 +8,19 @@
* @author Manik Surtani
* @since 1.0
*/
-public class Store<K, V> implements Modification {
+public class Store implements Modification {
- final Type type = Type.STORE;
- final StoredEntry<K, V> storedEntry;
+ final StoredEntry storedEntry;
- public Store(StoredEntry<K, V> storedEntry) {
+ public Store(StoredEntry storedEntry) {
this.storedEntry = storedEntry;
}
public Type getType() {
- return type;
+ return Type.STORE;
}
- public StoredEntry<K, V> getStoredEntry() {
+ public StoredEntry getStoredEntry() {
return storedEntry;
}
@@ -33,15 +32,13 @@
Store store = (Store) o;
if (storedEntry != null ? !storedEntry.equals(store.storedEntry) :
store.storedEntry != null) return false;
- if (type != store.type) return false;
return true;
}
@Override
public int hashCode() {
- int result = type != null ? type.hashCode() : 0;
- result = 31 * result + (storedEntry != null ? storedEntry.hashCode() : 0);
+ int result = storedEntry != null ? storedEntry.hashCode() : 0;
return result;
}
Modified: core/branches/flat/src/main/resources/config-samples/all.xml
===================================================================
--- core/branches/flat/src/main/resources/config-samples/all.xml 2009-02-11 12:53:09 UTC
(rev 7682)
+++ core/branches/flat/src/main/resources/config-samples/all.xml 2009-02-11 17:13:41 UTC
(rev 7683)
@@ -145,7 +145,7 @@
<!--
We can have multiple cache loaders, which get chained
-->
- <loader class="org.horizon.loader.JDBCCacheLoader"
async="true" fetchPersistentState="true"
+ <loader class="org.horizon.loader.JDBCCacheLoader"
fetchPersistentState="true"
ignoreModifications="true"
purgeOnStartup="true">
<!-- See the documentation for more configuration examples and options.
-->
@@ -157,6 +157,7 @@
</properties>
<singletonStore enabled="true"
pushStateWhenCoordinator="true" pushStateTimeout="20000"/>
+ <async enabled="true" batchSize="1000"
threadPoolSize="5"/>
</loader>
</loaders>
Modified: core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd
===================================================================
--- core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd 2009-02-11
12:53:09 UTC (rev 7682)
+++ core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd 2009-02-11
17:13:41 UTC (rev 7683)
@@ -177,9 +177,17 @@
<xs:attribute name="pushStateTimeout"
type="xs:positiveInteger"/>
</xs:complexType>
</xs:element>
+ <xs:element name="async" minOccurs="0"
maxOccurs="1">
+ <xs:complexType>
+ <xs:attribute name="enabled"
type="tns:booleanType"/>
+ <xs:attribute name="batchSize"
type="xs:positiveInteger"/>
+ <xs:attribute name="pollWait"
type="xs:positiveInteger"/>
+ <xs:attribute name="queueSize"
type="xs:positiveInteger"/>
+ <xs:attribute name="threadPoolSize"
type="xs:positiveInteger"/>
+ </xs:complexType>
+ </xs:element>
</xs:all>
<xs:attribute name="class" type="xs:string"/>
- <xs:attribute name="async"
type="tns:booleanType"/>
<xs:attribute name="fetchPersistentState"
type="tns:booleanType"/>
<xs:attribute name="ignoreModifications"
type="tns:booleanType"/>
<xs:attribute name="purgeOnStartup"
type="tns:booleanType"/>
Modified:
core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -155,7 +155,7 @@
public void testCacheLoaders() throws Exception {
XmlConfigurationParserImpl parser = new XmlConfigurationParserImpl();
String xml = "<loaders passivation=\"true\"
shared=\"true\" preload=\"true\">\n" +
- " <loader
class=\"org.horizon.loader.jdbc.JDBCCacheLoader\" async=\"true\"
fetchPersistentState=\"true\"\n" +
+ " <loader
class=\"org.horizon.loader.jdbc.JDBCCacheLoader\"
fetchPersistentState=\"true\"\n" +
" ignoreModifications=\"false\"
purgeOnStartup=\"false\">\n" +
" <properties>\n" +
" dataSource=HorizonDS\n" +
@@ -164,6 +164,7 @@
" dropTable=false\n" +
" </properties>\n" +
" <singletonStore enabled=\"true\"
pushStateWhenCoordinator=\"true\" pushStateTimeout=\"20000\"
/>\n" +
+ " <async enabled=\"true\"
batchSize=\"15\" />\n" +
" </loader>\n" +
" </loaders>";
Element e = XmlConfigHelper.stringToElement(xml);
@@ -180,7 +181,11 @@
CacheLoaderConfig iclc = clc.getFirstCacheLoaderConfig();
assert
iclc.getClassName().equals("org.horizon.loader.jdbc.JDBCCacheLoader");
- assert iclc.isAsync();
+ assert iclc.getAsyncStoreConfig().isEnabled();
+ assert iclc.getAsyncStoreConfig().getBatchSize() == 15;
+ assert iclc.getAsyncStoreConfig().getPollWait() == 100;
+ assert iclc.getAsyncStoreConfig().getQueueSize() == 10000;
+ assert iclc.getAsyncStoreConfig().getThreadPoolSize() == 1;
assert iclc.isFetchPersistentState();
assert !iclc.isIgnoreModifications();
assert !iclc.isPurgeOnStartup();
@@ -218,7 +223,7 @@
CacheLoaderConfig iclc = clc.getFirstCacheLoaderConfig();
assert
iclc.getClassName().equals("org.horizon.loader.jdbc.JDBCCacheLoader");
- assert !iclc.isAsync();
+ assert !iclc.getAsyncStoreConfig().isEnabled();
assert !iclc.isFetchPersistentState();
assert !iclc.isIgnoreModifications();
assert !iclc.isPurgeOnStartup();
Modified:
core/branches/flat/src/test/java/org/horizon/eviction/EvictionFunctionalTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/eviction/EvictionFunctionalTest.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/test/java/org/horizon/eviction/EvictionFunctionalTest.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -99,7 +99,7 @@
delegate.setEvictionAction(evictionAction);
}
- public void init(Cache<?, ?> cache, DataContainer<?, ?> dataContiner,
EvictionAlgorithmConfig evictionAlgorithmConfig) {
+ public void init(Cache<?, ?> cache, DataContainer dataContiner,
EvictionAlgorithmConfig evictionAlgorithmConfig) {
delegate.init(cache, dataContiner, evictionAlgorithmConfig);
}
Added: core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java
(rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java 2009-02-11 17:13:41
UTC (rev 7683)
@@ -0,0 +1,62 @@
+package org.horizon.loader;
+
+import org.horizon.CacheException;
+import org.horizon.loader.decorators.AsyncStore;
+import org.horizon.loader.decorators.AsyncStoreConfig;
+import org.horizon.loader.dummy.DummyInMemoryCacheLoader;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutorService;
+
+@Test(groups = "unit", sequential = true)
+public class AsyncTest {
+
+ AsyncStore store;
+ ExecutorService asyncExecutor;
+
+
+ @BeforeTest
+ public void setUp() {
+ store = new AsyncStore(new DummyInMemoryCacheLoader(), new AsyncStoreConfig());
+ DummyInMemoryCacheLoader.Cfg cfg = new DummyInMemoryCacheLoader.Cfg();
+ cfg.setStore(AsyncTest.class.getName());
+ store.init(cfg, null, null);
+ store.start();
+ asyncExecutor = (ExecutorService) TestingUtil.extractField(store,
"executor");
+ }
+
+ @AfterTest
+ public void tearDown() {
+ if (store != null) store.stop();
+ }
+
+ @AfterMethod
+ public void clearStore() {
+ if (store != null) store.clear();
+ }
+
+ public void testRestrictionOnAddingToAsyncQueue() throws Exception {
+ store.remove("blah");
+
+ store.store(new StoredEntry("one", "value"));
+ store.store(new StoredEntry("two", "value"));
+ store.store(new StoredEntry("three", "value"));
+ store.store(new StoredEntry("four", "value"));
+
+ // stop the cache store
+ store.stop();
+ try {
+ store.remove("blah");
+ assert false : "Should have restricted this entry from being made";
+ }
+ catch (CacheException expected) {
+ }
+
+ // clean up
+ store.start();
+ }
+}
Modified:
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java 2009-02-11
12:53:09 UTC (rev 7682)
+++
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java 2009-02-11
17:13:41 UTC (rev 7683)
@@ -5,10 +5,8 @@
import org.horizon.loader.AbstractCacheStore;
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.StoredEntry;
-import org.horizon.loader.modifications.Modification;
import org.horizon.marshall.Marshaller;
-import javax.transaction.Transaction;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -16,22 +14,19 @@
import java.io.OutputStream;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-public class DummyInMemoryCacheLoader<K, V> extends AbstractCacheStore<K, V>
{
+public class DummyInMemoryCacheLoader extends AbstractCacheStore {
static final ConcurrentMap<String, Map> stores = new
ConcurrentHashMap<String, Map>();
String storeName = "__DEFAULT_STORES__";
- Map<K, StoredEntry<K, V>> store;
+ Map<Object, StoredEntry> store;
Cfg config;
- ConcurrentMap<Transaction, List<Modification>> txs = new
ConcurrentHashMap<Transaction, List<Modification>>();
-
- public void store(StoredEntry<K, V> ed) {
+ public void store(StoredEntry ed) {
store.put(ed.getKey(), ed);
}
@@ -42,7 +37,7 @@
int numEntries = ois.readInt();
store.clear();
for (int i = 0; i < numEntries; i++) {
- StoredEntry<K, V> e = (StoredEntry<K, V>) ois.readObject();
+ StoredEntry e = (StoredEntry) ois.readObject();
store.put(e.getKey(), e);
}
}
@@ -51,47 +46,31 @@
ObjectOutputStream oos = outputStream instanceof ObjectOutputStream ?
(ObjectOutputStream) outputStream :
new ObjectOutputStream(outputStream);
oos.writeInt(store.size());
- for (StoredEntry<K, V> se : store.values()) oos.writeObject(se);
+ for (StoredEntry se : store.values()) oos.writeObject(se);
}
public void clear() {
store.clear();
}
- public boolean remove(K key) {
+ public boolean remove(Object key) {
return store.remove(key) != null;
}
public void purgeExpired() {
- for (Iterator<StoredEntry<K, V>> i = store.values().iterator();
i.hasNext();) {
- StoredEntry<K, V> se = i.next();
+ for (Iterator<StoredEntry> i = store.values().iterator(); i.hasNext();) {
+ StoredEntry se = i.next();
if (se.isExpired()) i.remove();
}
}
- public void commit(Transaction tx) {
- List<Modification> mods = txs.remove(tx);
- if (mods != null) applyModifications(mods);
- }
-
- public void rollback(Transaction tx) {
- txs.remove(tx);
- }
-
- public void prepare(List<Modification> list, Transaction tx, boolean isOnePhase)
{
- if (isOnePhase)
- applyModifications(list);
- else
- txs.put(tx, list);
- }
-
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
this.config = (Cfg) config;
}
- public StoredEntry<K, V> load(K key) {
+ public StoredEntry load(Object key) {
if (key == null) return null;
- StoredEntry<K, V> se = store.get(key);
+ StoredEntry se = store.get(key);
if (se == null) return null;
if (se.isExpired()) {
store.remove(key);
@@ -101,10 +80,10 @@
return se;
}
- public Set<StoredEntry<K, V>> loadAll() {
- Set<StoredEntry<K, V>> s = new HashSet<StoredEntry<K,
V>>();
- for (Iterator<StoredEntry<K, V>> i = store.values().iterator();
i.hasNext();) {
- StoredEntry<K, V> se = i.next();
+ public Set<StoredEntry> loadAll() {
+ Set<StoredEntry> s = new HashSet<StoredEntry>();
+ for (Iterator<StoredEntry> i = store.values().iterator(); i.hasNext();) {
+ StoredEntry se = i.next();
if (se.isExpired())
i.remove();
else
@@ -113,7 +92,7 @@
return s;
}
- public boolean containsKey(K key) {
+ public boolean containsKey(Object key) {
return load(key) != null;
}