Author: manik.surtani(a)jboss.com
Date: 2009-02-16 20:07:15 -0500 (Mon, 16 Feb 2009)
New Revision: 7697
Added:
core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/
core/branches/flat/src/test/java/org/horizon/loader/decorators/AsyncTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
core/branches/flat/src/test/java/org/horizon/util/internals/ViewChangeListener.java
Removed:
core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/AdvancedCache.java
core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java
core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/horizon/factories/EntryFactory.java
core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStoreConfig.java
core/branches/flat/src/test/java/org/horizon/expiry/ExpiryTest.java
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java
Log:
Loader tests and interceptor code
Modified: core/branches/flat/src/main/java/org/horizon/AdvancedCache.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/AdvancedCache.java 2009-02-16 14:25:32
UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/AdvancedCache.java 2009-02-17 01:07:15
UTC (rev 7697)
@@ -1,6 +1,7 @@
package org.horizon;
import org.horizon.batch.BatchContainer;
+import org.horizon.container.DataContainer;
import org.horizon.eviction.EvictionManager;
import org.horizon.factories.ComponentRegistry;
import org.horizon.interceptors.base.CommandInterceptor;
@@ -82,6 +83,8 @@
InvocationContextContainer getInvocationContextContainer();
+ DataContainer getDataContainer();
+
void putForExternalRead(K key, V value, Options... options);
V put(K key, V value, Options... options);
Modified: core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/CacheDelegate.java 2009-02-16 14:25:32
UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/CacheDelegate.java 2009-02-17 01:07:15
UTC (rev 7697)
@@ -397,6 +397,10 @@
return invocationContextContainer;
}
+ public DataContainer getDataContainer() {
+ return dataContainer;
+ }
+
public CacheManager getCacheManager() {
return cacheManager;
}
@@ -435,19 +439,16 @@
}
public void compact() {
- for (Object key : dataContainer.keySet())
- {
+ for (Object key : dataContainer.keySet()) {
// get the key first, before attempting to serialize stuff since data.get() may
deserialize the key if doing
// a hashcode() or equals().
Object value = dataContainer.get(key);
- if (key instanceof MarshalledValue)
- {
+ if (key instanceof MarshalledValue) {
((MarshalledValue) key).compact(true, true);
}
- if (value instanceof MarshalledValue)
- {
+ if (value instanceof MarshalledValue) {
((MarshalledValue) value).compact(true, true);
}
}
Modified:
core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -95,8 +95,10 @@
if (existing instanceof DeltaAware) toMergeWith = (DeltaAware) existing;
e.setValue(dv.merge(toMergeWith));
o = existing;
+ e.setLifespan(lifespanMillis);
} else {
o = e.setValue(value);
+ e.setLifespan(lifespanMillis);
}
notifier.notifyCacheEntryModified(key, e.getValue(), false, ctx);
}
@@ -161,9 +163,10 @@
@Override
public String toString() {
return "PutKeyValueCommand{" +
- "lifespanMillis=" + lifespanMillis +
+ "key=" + key +
+ ", value=" + value +
+ ", lifespanMillis=" + lifespanMillis +
", putIfAbsent=" + putIfAbsent +
- ", value=" + value +
'}';
}
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -68,6 +68,7 @@
MVCCEntry me = ctx.lookupEntry(key);
notifier.notifyCacheEntryModified(key, me.getValue(), true, ctx);
me.setValue(e.getValue());
+ me.setLifespan(lifespanMillis);
notifier.notifyCacheEntryModified(key, me.getValue(), false, ctx);
}
return null;
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -25,6 +25,8 @@
import org.horizon.commands.read.AbstractDataCommand;
import org.horizon.container.MVCCEntry;
import org.horizon.context.InvocationContext;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
import org.horizon.notifications.cachelistener.CacheNotifier;
@@ -33,6 +35,8 @@
* @since 1.0
*/
public class RemoveCommand extends AbstractDataCommand implements DataWriteCommand {
+ private static final Log log = LogFactory.getLog(RemoveCommand.class);
+ private static final boolean trace = log.isTraceEnabled();
public static final byte METHOD_ID = 6;
protected CacheNotifier notifier;
boolean successful = true;
@@ -59,6 +63,7 @@
public Object perform(InvocationContext ctx) throws Throwable {
MVCCEntry e = ctx.lookupEntry(key);
if (e == null || e.isNullEntry()) {
+ log.trace("Nothing to remove since the entry is null or we have a null
entry");
if (value == null) {
return null;
} else {
Modified: core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -67,12 +67,14 @@
if (oldValue == null || oldValue.equals(e.getValue())) {
Object old = e.setValue(newValue);
+ e.setLifespan(lifespanMillis);
return returnValue(old, true);
}
return returnValue(null, false);
} else {
// for remotely originating calls, this doesn't check the status of what
is under the key at the moment
Object old = e.setValue(newValue);
+ e.setLifespan(lifespanMillis);
return returnValue(old, true);
}
}
Modified: core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java 2009-02-16
14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -51,4 +51,8 @@
boolean isValid();
void setValid(boolean valid);
+
+ long getLifespan();
+
+ void setLifespan(long lifespan);
}
Modified: core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -39,7 +39,6 @@
protected byte flags = 0;
private long lifespan;
-
protected ReadCommittedEntry() {
setValid(true);
}
@@ -51,6 +50,14 @@
this.lifespan = lifespan;
}
+ public long getLifespan() {
+ return lifespan;
+ }
+
+ public void setLifespan(long lifespan) {
+ this.lifespan = lifespan;
+ }
+
public Object getKey() {
return key;
}
Modified:
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -22,6 +22,7 @@
package org.horizon.container;
import org.horizon.factories.annotations.Inject;
+import org.horizon.factories.annotations.Stop;
import org.horizon.loader.CacheLoaderManager;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
@@ -172,6 +173,7 @@
return immortalData.size() + expirableData.size();
}
+ @Stop(priority = 999)
public void clear() {
immortalData.clear();
expirableData.clear();
@@ -205,6 +207,10 @@
return new StoredEntry(key, immortal.getValue());
ExpirableCachedValue ecv = expirableData.get(key);
if (ecv == null) return null;
+ if (ecv.isExpired()) {
+ expirableData.remove(key);
+ return null;
+ }
return new StoredEntry(key, ecv.getValue(), ecv.getCreatedTime(),
ecv.getExpiryTime());
}
Modified: core/branches/flat/src/main/java/org/horizon/factories/EntryFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/EntryFactory.java 2009-02-16
14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/factories/EntryFactory.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -32,7 +32,7 @@
* @since 1.0
*/
public interface EntryFactory {
- void releaseLock(Object key);
+ void releaseLock(InvocationContext ctx, Object key);
/**
* Attempts to lock a node if the lock isn't already held in the current scope,
and records the lock in the context.
@@ -47,11 +47,9 @@
*/
boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException,
TimeoutException;
- MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean
createIfAbsent, boolean forceLockIfAbsent, long lifespan) throws InterruptedException;
+ MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean
createIfAbsent, boolean forceLockIfAbsent) throws InterruptedException;
MVCCEntry wrapEntryForReading(InvocationContext ctx, Object key, boolean putInContext,
boolean forceWriteLock) throws InterruptedException;
MVCCEntry wrapEntryForReading(InvocationContext ctx, Object key, boolean putInContext)
throws InterruptedException;
-
- MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert, long
lifespan);
}
Modified: core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -31,6 +31,7 @@
import org.horizon.factories.annotations.Inject;
import org.horizon.factories.annotations.Start;
import org.horizon.invocation.Options;
+import org.horizon.loader.StoredEntry;
import org.horizon.lock.IsolationLevel;
import org.horizon.lock.LockManager;
import org.horizon.lock.TimeoutException;
@@ -67,8 +68,7 @@
public MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert,
long lifespan) {
if (value == null && !isForInsert) return useRepeatableRead ? NULL_MARKER :
null;
- MVCCEntry mvccEntry = useRepeatableRead ? new RepeatableReadEntry(key, value,
lifespan) : new ReadCommittedEntry(key, value, lifespan);
- return mvccEntry;
+ return useRepeatableRead ? new RepeatableReadEntry(key, value, lifespan) : new
ReadCommittedEntry(key, value, lifespan);
}
public MVCCEntry wrapEntryForReading(InvocationContext ctx, Object key, boolean
putInContext) throws InterruptedException {
@@ -82,12 +82,14 @@
MVCCEntry mvccEntry;
if (forceWriteLock) {
if (trace) log.trace("Forcing lock on reading");
- return wrapEntryForWriting(ctx, key, false, false, -1);
+ return wrapEntryForWriting(ctx, key, false, false);
} else if ((mvccEntry = ctx.lookupEntry(key)) == null) {
if (trace) log.trace("Key " + key + " is not in context, fetching
from container.");
// simple implementation. Peek the node, wrap it, put wrapped node in the
context.
- Object value = container.get(key);
- mvccEntry = createWrappedEntry(key, value, false, -1);
+ StoredEntry se = container.createEntryForStorage(key);
+ mvccEntry = se == null ?
+ createWrappedEntry(key, null, false, -1) :
+ createWrappedEntry(key, se.getValue(), false, se.getLifespan());
if (mvccEntry != null && putInContext) ctx.putLookedUpEntry(key,
mvccEntry);
return mvccEntry;
} else {
@@ -96,7 +98,7 @@
}
}
- public MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean
createIfAbsent, boolean forceLockIfAbsent, long lifespan) throws InterruptedException {
+ public MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean
createIfAbsent, boolean forceLockIfAbsent) throws InterruptedException {
MVCCEntry mvccEntry = ctx.lookupEntry(key);
if (createIfAbsent && mvccEntry != null && mvccEntry.isNullEntry())
mvccEntry = null;
if (mvccEntry != null) // exists in context! Just acquire lock if needed, and
wrap.
@@ -114,14 +116,14 @@
}
} else {
// else, fetch from dataContainer.
- Object value = container.get(key);
- if (value != null) {
+ StoredEntry storedEntry = container.createEntryForStorage(key);
+ if (storedEntry != null) {
if (trace) log.trace("Retrieved from container.");
// exists in cache! Just acquire lock if needed, and wrap.
// do we need a lock?
boolean needToCopy = false;
if (acquireLock(ctx, key)) needToCopy = true;
- mvccEntry = createWrappedEntry(key, value, false, lifespan);
+ mvccEntry = createWrappedEntry(key, storedEntry.getValue(), false,
storedEntry.getLifespan());
ctx.putLookedUpEntry(key, mvccEntry);
if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
} else if (createIfAbsent) {
@@ -130,7 +132,7 @@
// now to lock and create the node. Lock first to prevent concurrent
creation!
acquireLock(ctx, key);
notifier.notifyCacheEntryCreated(key, true, ctx);
- mvccEntry = createWrappedEntry(key, value, true, lifespan);
+ mvccEntry = createWrappedEntry(key, null, true, -1);
mvccEntry.setCreated(true);
ctx.putLookedUpEntry(key, mvccEntry);
mvccEntry.copyForUpdate(container, writeSkewCheck);
@@ -166,7 +168,7 @@
ctx.addKeyLocked(key);
} else if (!lockManager.lockAndRecord(key, ctx)) {
Object owner = lockManager.getOwner(key);
- throw new TimeoutException("Unable to acquire lock on key [" + key
+ "] after [" + getLockAcquisitionTImeout(ctx)
+ throw new TimeoutException("Unable to acquire lock on key [" + key
+ "] after [" + getLockAcquisitionTimeout(ctx)
+ "] milliseconds for requestor [" +
lockManager.getLockOwner(ctx) + "]! Lock held by [" + owner + "]");
}
return true;
@@ -174,12 +176,13 @@
return false;
}
- private long getLockAcquisitionTImeout(InvocationContext ctx) {
+ private long getLockAcquisitionTimeout(InvocationContext ctx) {
return ctx.hasOption(Options.ZERO_LOCK_ACQUISITION_TIMEOUT) ?
0 : configuration.getLockAcquisitionTimeout();
}
- public void releaseLock(Object key) {
+ public void releaseLock(InvocationContext ctx, Object key) {
lockManager.unlock(key, lockManager.getOwner(key));
+ ctx.removeKeyLocked(key);
}
}
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -37,6 +37,7 @@
import org.horizon.interceptors.base.JmxStatsCommandInterceptor;
import org.horizon.loader.CacheLoader;
import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.StoredEntry;
import org.horizon.notifications.cachelistener.CacheNotifier;
import org.horizon.transaction.TransactionTable;
@@ -86,97 +87,82 @@
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable {
- if (command.getKey() != null) {
- loadIfNeeded(ctx, command.getKey());
- }
+ if (command.getKey() != null) loadIfNeeded(ctx, command.getKey());
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand
command) throws Throwable {
- if (command.getKey() != null) {
- loadIfNeeded(ctx, command.getKey());
- }
+ if (command.getKey() != null) loadIfNeeded(ctx, command.getKey());
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command)
throws Throwable {
- if (command.getKeys() != null) {
+ if (command.getKeys() != null)
for (Object key : command.getKeys()) loadIfNeeded(ctx, key);
- }
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws
Throwable {
- if (command.getKey() != null) {
- loadIfNeeded(ctx, command.getKey());
- }
+ if (command.getKey() != null) loadIfNeeded(ctx, command.getKey());
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
throws Throwable {
- if (command.getKey() != null) {
- loadIfNeeded(ctx, command.getKey());
- }
+ if (command.getKey() != null) loadIfNeeded(ctx, command.getKey());
return invokeNextInterceptor(ctx, command);
}
private void loadIfNeeded(InvocationContext ctx, Object key) throws Throwable {
- if (dataContainer.containsKey(key) || !loader.containsKey(key))
+ if (!loader.containsKey(key)) {
+ log.trace("No need to load. Key doesn't exist in the loader.");
return;
+ }
// Obtain a temporary lock to verify the key is not being concurrently added
boolean release = entryFactory.acquireLock(ctx, key);
if (dataContainer.containsKey(key)) {
- if (release)
- entryFactory.releaseLock(key);
+ if (release) entryFactory.releaseLock(ctx, key);
+ log.trace("No need to load. Key exists in the data container.");
return;
}
// Reuse the lock and create a new entry for loading
- MVCCEntry n = entryFactory.wrapEntryForWriting(ctx, key, true, false, -1); // TODO:
handle expiry information from loaded data
+ MVCCEntry n = entryFactory.wrapEntryForWriting(ctx, key, true, false);
n = loadEntry(ctx, key, n);
}
/**
- * Loads a node from disk; if it exists creates parent TreeNodes. If it doesn't
exist on disk but in memory, clears
- * the uninitialized flag, otherwise returns null.
+ * Loads a node from loader
*/
private MVCCEntry loadEntry(InvocationContext ctx, Object key, MVCCEntry entry) throws
Exception {
- if (trace) log.trace("loading entry " + key + " entry is " +
entry);
+ log.trace("Loading key {0}", key);
- Object value = loader.load(key);
- boolean nodeExists = (value != null);
- if (trace) log.trace("nodeExists " + nodeExists);
+ StoredEntry storedEntry = loader.load(key);
+ boolean entryExists = (storedEntry != null);
+ log.trace("Entry exists in loader? " + entryExists);
if (getStatisticsEnabled()) {
- if (nodeExists) {
+ if (entryExists) {
cacheLoads++;
} else {
cacheMisses++;
}
}
- if (value != null) {
- if (trace) log.trace("Entry is not null, loading");
-// notifier.notifyNodeLoaded(fqn, true, Collections.emptyMap(), ctx);
-// if (isActivation)
-// {
-// notifier.notifyNodeActivated(fqn, true, Collections.emptyMap(), ctx);
-// }
-
- entry.setValue(value);
+ if (entryExists) {
+ notifier.notifyCacheEntryLoaded(key, true, ctx);
+ if (isActivation) notifier.notifyCacheEntryActivated(key, true, ctx);
+ entry.setValue(storedEntry.getValue());
+ entry.setLifespan(storedEntry.getLifespan());
entry.setValid(true);
-// notifier.notifyNodeLoaded(fqn, false, nodeData, ctx);
-// if (isActivation)
-// {
-// notifier.notifyNodeActivated(fqn, false, nodeData, ctx);
-// }
+ notifier.notifyCacheEntryLoaded(key, false, ctx);
+ if (isActivation) notifier.notifyCacheEntryActivated(key, false, ctx);
}
return entry;
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -30,8 +30,9 @@
import org.horizon.commands.write.PutKeyValueCommand;
import org.horizon.commands.write.PutMapCommand;
import org.horizon.commands.write.RemoveCommand;
+import org.horizon.commands.write.ReplaceCommand;
import org.horizon.config.CacheLoaderManagerConfig;
-import org.horizon.container.DataContainer;
+import org.horizon.container.MVCCEntry;
import org.horizon.context.InvocationContext;
import org.horizon.context.TransactionContext;
import org.horizon.factories.annotations.Inject;
@@ -51,6 +52,7 @@
import org.horizon.transaction.GlobalTransaction;
import javax.transaction.SystemException;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.HashMap;
@@ -70,13 +72,12 @@
public class CacheStoreInterceptor extends JmxStatsCommandInterceptor {
private CacheLoaderManagerConfig loaderConfig = null;
private TransactionManager txMgr = null;
- private HashMap<GlobalTransaction, Integer> txStores = new
HashMap<GlobalTransaction, Integer>();
- private Map<GlobalTransaction, Set<Object>> preparingTxs = new
ConcurrentHashMap<GlobalTransaction, Set<Object>>();
+ private HashMap<Transaction, Integer> txStores = new HashMap<Transaction,
Integer>();
+ private Map<Transaction, Set<Object>> preparingTxs = new
ConcurrentHashMap<Transaction, Set<Object>>();
private long cacheStores = 0;
CacheStore store;
private CacheLoaderManager loaderManager;
private boolean statsEnabled;
- private DataContainer dataContainer;
public CacheStoreInterceptor() {
log = LogFactory.getLog(getClass());
@@ -84,26 +85,23 @@
}
@Inject
- protected void init(CacheLoaderManager loaderManager, TransactionManager txManager,
CacheLoaderManagerConfig clConfig, DataContainer dataContainer) {
- // never inject a CacheLoaderOld at this stage - only a CacheLoaderManager, since
the CacheLoaderManager only creates a CacheLoaderOld instance when it @Starts.
+ protected void init(CacheLoaderManager loaderManager, TransactionManager txManager) {
this.loaderManager = loaderManager;
- this.loaderConfig = clConfig;
txMgr = txManager;
- this.dataContainer = dataContainer;
}
@Start
protected void start() {
- // this should only happen after the CacheLoaderManager has started, since the
CacheLoaderManager only creates the CacheLoaderOld instance in its @Start method.
store = loaderManager.getCacheStore();
this.setStatisticsEnabled(configuration.isExposeManagementStatistics());
+ loaderConfig = configuration.getCacheLoaderConfig();
}
/**
* if this is a shared cache loader and the call is of remote origin, pass up the
chain
*/
public final boolean skip(InvocationContext ctx, VisitableCommand command) {
- if (store == null) return true;
+ if (store == null) return true; // could be because the cache loader oes not
implement cache store
if ((!ctx.isOriginLocal() && loaderConfig.isShared()) ||
ctx.hasOption(Options.SKIP_CACHE_STORE)) {
if (trace)
log.trace("Passing up method call and bypassing this interceptor since
the cache loader is shared and this call originated remotely.");
@@ -117,24 +115,21 @@
if (!skip(ctx, command) && inTransaction()) {
if (ctx.getTransactionContext().hasAnyModifications()) {
// this is a commit call.
- GlobalTransaction gtx = command.getGlobalTransaction();
- if (trace) log.trace("Calling loader.commit() for gtx " + gtx);
- // sync call (a write) on the loaderold
- // ignore modified FQNs
- // List fqnsModified =
getFqnsFromModificationList(txTable.get(globalTransaction).getCacheLoaderModifications());
+ Transaction tx = ctx.getTransaction();
+ log.trace("Calling loader.commit() for transaction {0}", tx);
try {
-// store.commit(gtx);
+ store.commit(tx);
}
catch (Throwable t) {
- preparingTxs.remove(gtx);
+ preparingTxs.remove(tx);
throw t;
}
if (getStatisticsEnabled()) {
- Integer puts = (Integer) txStores.get(gtx);
+ Integer puts = txStores.get(tx);
if (puts != null) {
cacheStores = cacheStores + puts;
}
- txStores.remove(gtx);
+ txStores.remove(tx);
}
return invokeNextInterceptor(ctx, command);
} else {
@@ -149,13 +144,13 @@
if (!skip(ctx, command) && inTransaction()) {
if (trace) log.trace("transactional so don't put stuff in the cloader
yet.");
if (ctx.getTransactionContext().hasAnyModifications()) {
- GlobalTransaction gtx = command.getGlobalTransaction();
+ Transaction tx = ctx.getTransaction();
// this is a rollback method
- if (preparingTxs.containsKey(gtx)) {
- preparingTxs.remove(gtx);
-// store.rollback(gtx);
+ if (preparingTxs.containsKey(tx)) {
+ preparingTxs.remove(tx);
+ store.rollback(tx);
}
- if (getStatisticsEnabled()) txStores.remove(gtx);
+ if (getStatisticsEnabled()) txStores.remove(tx);
} else {
if (trace) log.trace("Rollback called with no modifications;
ignoring.");
}
@@ -167,17 +162,27 @@
public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable {
if (!skip(ctx, command) && inTransaction()) {
if (trace) log.trace("transactional so don't put stuff in the cloader
yet.");
- prepareCacheLoader(command.getGlobalTransaction(), ctx.getTransactionContext(),
command.isOnePhaseCommit());
+ prepareCacheLoader(ctx, command.getGlobalTransaction(),
ctx.getTransactionContext(), command.isOnePhaseCommit());
}
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws
Throwable {
+ Object retval = invokeNextInterceptor(ctx, command);
+ if (!skip(ctx, command) && !inTransaction() &&
command.isSuccessful()) {
+ Object key = command.getKey();
+ boolean resp = store.remove(key);
+ log.trace("Removed entry under key {0} and got response {1} from
CacheStore", key, resp);
+ }
+ return retval;
+ }
+
+ @Override
+ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws
Throwable {
if (!skip(ctx, command) && !inTransaction()) {
- Object returnValue = store.remove(command.getKey());
- invokeNextInterceptor(ctx, command);
- return returnValue;
+ store.clear();
+ log.trace("Cleared cache store");
}
return invokeNextInterceptor(ctx, command);
}
@@ -185,66 +190,72 @@
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable {
Object returnValue = invokeNextInterceptor(ctx, command);
- if (skip(ctx, command) || inTransaction())
- return returnValue;
+ if (skip(ctx, command) || inTransaction() || !command.isSuccessful()) return
returnValue;
- store.store((StoredEntry) null);//command.getKey(), command.getValue());
+ Object key = command.getKey();
+ StoredEntry se = getStoredEntry(key, ctx);
+ store.store(se);
+ log.trace("Stored entry {0} under key {1}", se, key);
if (getStatisticsEnabled()) cacheStores++;
return returnValue;
}
@Override
- public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws
Throwable {
+ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
throws Throwable {
Object returnValue = invokeNextInterceptor(ctx, command);
- if (skip(ctx, command) || inTransaction())
- return returnValue;
+ if (skip(ctx, command) || inTransaction() || !command.isSuccessful()) return
returnValue;
- // Perhaps this should be optimized
- Map<Object, Object> map = command.getMap();
- List<Modification> modifications = toModifications(map);
-// store.put(modifications);
-
+ Object key = command.getKey();
+ StoredEntry se = getStoredEntry(key, ctx);
+ store.store(se);
+ log.trace("Stored entry {0} under key {1}", se, key);
if (getStatisticsEnabled()) cacheStores++;
return returnValue;
}
- private static List<Modification> toModifications(Map<Object, Object> map)
{
- List<Modification> modifications = new
ArrayList<Modification>(map.size());
- // TODO fix me
-// for (Map.Entry<Object, Object> entry : map.entrySet())
-// modifications.add(new Modification(ModificationType.PUT, entry.getKey(),
entry.getValue()));
- return modifications;
+ @Override
+ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws
Throwable {
+ Object returnValue = invokeNextInterceptor(ctx, command);
+ if (skip(ctx, command) || inTransaction()) return returnValue;
+
+ Map<Object, Object> map = command.getMap();
+ for (Object key : map.keySet()) {
+ StoredEntry se = getStoredEntry(key, ctx);
+ store.store(se);
+ log.trace("Stored entry {0} under key {1}", se, key);
+ }
+ if (getStatisticsEnabled()) cacheStores += map.size();
+ return returnValue;
}
private boolean inTransaction() throws SystemException {
return txMgr != null && txMgr.getTransaction() != null;
}
- private void prepareCacheLoader(GlobalTransaction gtx, TransactionContext
transactionContext, boolean onePhase) throws Throwable {
+ private void prepareCacheLoader(InvocationContext ctx, GlobalTransaction gtx,
TransactionContext transactionContext, boolean onePhase) throws Throwable {
if (transactionContext == null) {
throw new Exception("transactionContext for transaction " + gtx +
" not found in transaction table");
}
List<VisitableCommand> modifications =
transactionContext.getModifications();
if (modifications.size() == 0) {
- if (trace) log.trace("Transaction has not logged any
modifications!");
+ log.trace("Transaction has not logged any modifications!");
return;
}
- if (trace) log.trace("Cache loader modification list: " +
modifications);
+ log.trace("Cache loader modification list: {0}", modifications);
StoreModificationsBuilder modsBuilder = new
StoreModificationsBuilder(getStatisticsEnabled());
- for (VisitableCommand cacheCommand : modifications) {
- cacheCommand.acceptVisitor(null, modsBuilder);
- }
- if (trace) {
- log.trace("Converted method calls to cache loader modifications. List
size: " + modsBuilder.modifications.size());
- }
- if (modsBuilder.modifications.size() > 0) {
-// loader.prepare(gtx, modsBuilder.modifications, onePhase);
+ for (VisitableCommand cacheCommand : modifications) cacheCommand.acceptVisitor(ctx,
modsBuilder);
+ int numMods = modsBuilder.modifications.size();
+ log.trace("Converted method calls to cache loader modifications. List size:
{0}", numMods);
- preparingTxs.put(gtx, modsBuilder.affectedKeys);
+ if (numMods > 0) {
+ Transaction tx = transactionContext.getTransaction();
+ store.prepare(modsBuilder.modifications, tx, onePhase);
+
+ preparingTxs.put(tx, modsBuilder.affectedKeys);
if (getStatisticsEnabled() && modsBuilder.putCount > 0) {
- txStores.put(gtx, modsBuilder.putCount);
+ txStores.put(tx, modsBuilder.putCount);
}
}
}
@@ -267,7 +278,7 @@
@SuppressWarnings("unchecked")
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable {
if (generateStatistics) putCount++;
- modifications.add(new
Store(dataContainer.createEntryForStorage(command.getKey())));
+ modifications.add(new Store(getStoredEntry(command.getKey(), ctx)));
affectedKeys.add(command.getKey());
return null;
}
@@ -278,8 +289,7 @@
Map<Object, Object> map = command.getMap();
if (generateStatistics) putCount += map.size();
affectedKeys.addAll(map.keySet());
- Set<StoredEntry> entries = new HashSet<StoredEntry>();
- for (Object key : map.keySet()) modifications.add(new
Store(dataContainer.createEntryForStorage(key)));
+ for (Object key : map.keySet()) modifications.add(new Store(getStoredEntry(key,
ctx)));
return null;
}
@@ -326,4 +336,10 @@
return cacheStores;
}
+ private StoredEntry getStoredEntry(Object key, InvocationContext ctx) {
+ MVCCEntry entry = ctx.lookupEntry(key);
+ StoredEntry se = new StoredEntry(key, entry.getValue());
+ se.setLifespan(entry.getLifespan());
+ return se;
+ }
}
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -133,7 +133,7 @@
public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws
Throwable {
try {
// get a snapshot of all keys in the data container
- for (Object key : dataContainer.keySet()) entryFactory.wrapEntryForWriting(ctx,
key, false, false, -1);
+ for (Object key : dataContainer.keySet()) entryFactory.wrapEntryForWriting(ctx,
key, false, false);
return invokeNextInterceptor(ctx, command);
}
@@ -152,7 +152,7 @@
public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command)
throws Throwable {
try {
if (command.getKeys() != null) {
- for (Object key : command.getKeys()) entryFactory.wrapEntryForWriting(ctx,
key, false, true, -1);
+ for (Object key : command.getKeys()) entryFactory.wrapEntryForWriting(ctx,
key, false, true);
}
return invokeNextInterceptor(ctx, command);
}
@@ -164,7 +164,7 @@
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable {
try {
- entryFactory.wrapEntryForWriting(ctx, command.getKey(), true, false,
command.getLifespanMillis());
+ entryFactory.wrapEntryForWriting(ctx, command.getKey(), true, false);
return invokeNextInterceptor(ctx, command);
}
finally {
@@ -176,7 +176,7 @@
public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws
Throwable {
try {
for (Object key : command.getMap().keySet()) {
- entryFactory.wrapEntryForWriting(ctx, key, true, false,
command.getLifespanMillis());
+ entryFactory.wrapEntryForWriting(ctx, key, true, false);
}
return invokeNextInterceptor(ctx, command);
}
@@ -188,7 +188,7 @@
@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws
Throwable {
try {
- entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true, -1);
+ entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true);
return invokeNextInterceptor(ctx, command);
}
finally {
@@ -199,7 +199,7 @@
@Override
public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
throws Throwable {
try {
- entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true,
command.getLifespanMillis());
+ entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true);
return invokeNextInterceptor(ctx, command);
}
finally {
Modified:
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -9,8 +9,8 @@
private boolean ignoreModifications;
private boolean fetchPersistentState;
private boolean purgeOnStartup;
- private SingletonStoreConfig singletonStoreConfig;
- private AsyncStoreConfig asyncStoreConfig;
+ private SingletonStoreConfig singletonStoreConfig = new SingletonStoreConfig();
+ private AsyncStoreConfig asyncStoreConfig = new AsyncStoreConfig();
public boolean isPurgeOnStartup() {
return purgeOnStartup;
@@ -113,6 +113,7 @@
throw new RuntimeException("Should not happen!", e);
}
if (singletonStoreConfig != null)
clone.setSingletonStoreConfig(singletonStoreConfig.clone());
+ if (asyncStoreConfig != null) clone.setAsyncStoreConfig((AsyncStoreConfig)
asyncStoreConfig.clone());
return clone;
}
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java 2009-02-16
14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -17,9 +17,9 @@
*/
public abstract class AbstractCacheStore extends AbstractCacheLoader implements
CacheStore {
- private final Map<Transaction, List<Modification>> transactions = new
ConcurrentHashMap<Transaction, List<Modification>>();
+ private final Map<Transaction, List<? extends Modification>> transactions
= new ConcurrentHashMap<Transaction, List<? extends Modification>>();
- protected void applyModifications(List<Modification> mods) {
+ protected void applyModifications(List<? extends Modification> mods) {
for (Modification m : mods) {
switch (m.getType()) {
case STORE:
@@ -39,7 +39,7 @@
}
}
- public void prepare(List<Modification> mods, Transaction tx, boolean isOnePhase)
{
+ public void prepare(List<? extends Modification> mods, Transaction tx, boolean
isOnePhase) {
if (isOnePhase) {
applyModifications(mods);
} else {
@@ -52,7 +52,7 @@
}
public void commit(Transaction tx) {
- List<Modification> list = transactions.remove(tx);
+ List<? extends Modification> list = transactions.remove(tx);
if (list != null && !list.isEmpty()) applyModifications(list);
}
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -181,7 +181,7 @@
// singleton?
SingletonStoreConfig ssc = cfg.getSingletonStoreConfig();
if (ssc != null && ssc.isSingletonStoreEnabled()) {
- tmpStore = new SingletonStore(tmpStore);
+ tmpStore = new SingletonStore(tmpStore, cache, ssc);
tmpLoader = tmpStore;
}
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-16
14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -73,7 +73,7 @@
* @param isOnePhase if true, there will not be a commit or rollback phase and
changes should be flushed
* immediately
*/
- void prepare(List<Modification> modifications, Transaction tx, boolean
isOnePhase);
+ void prepare(List<? extends Modification> modifications, Transaction tx, boolean
isOnePhase);
/**
* Commits a transaction that has been previously prepared
Modified: core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java 2009-02-16
14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -19,7 +19,7 @@
}
public StoredEntry(Object key, Object value) {
- super(value, -1, -1);
+ super(value, System.currentTimeMillis(), -1);
this.key = key;
}
@@ -58,6 +58,9 @@
public String toString() {
return "StoredEntry{" +
"key=" + key +
+ ", value=" + value +
+ ", createdTime=" + createdTime +
+ ", expiryTime=" + expiryTime +
'}';
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -5,6 +5,7 @@
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Modification;
import org.horizon.marshall.Marshaller;
import javax.transaction.Transaction;
@@ -69,7 +70,7 @@
delegate.rollback(tx);
}
- public void prepare(List list, Transaction tx, boolean isOnePhase) {
+ public void prepare(List<? extends Modification> list, Transaction tx, boolean
isOnePhase) {
delegate.prepare(list, tx, isOnePhase);
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -60,4 +60,13 @@
this.threadPoolSize = threadPoolSize;
}
+ @Override
+ public AsyncStoreConfig clone() {
+ try {
+ return (AsyncStoreConfig) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Should not happen!", e);
+ }
+ }
+
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -2,6 +2,7 @@
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Modification;
import javax.transaction.Transaction;
import java.io.InputStream;
@@ -56,7 +57,7 @@
}
@Override
- public void prepare(List list, Transaction tx, boolean isOnePhase) {
+ public void prepare(List<? extends Modification> list, Transaction tx, boolean
isOnePhase) {
// no-op
}
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -1,98 +1,356 @@
package org.horizon.loader.decorators;
import org.horizon.Cache;
-import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.container.DataContainer;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
-import org.horizon.marshall.Marshaller;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.manager.CacheManager;
+import org.horizon.notifications.Listener;
+import org.horizon.notifications.cachemanagerlistener.annotation.CacheStarted;
+import org.horizon.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.horizon.notifications.cachemanagerlistener.event.Event;
+import org.horizon.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.horizon.remoting.transport.Address;
import javax.transaction.Transaction;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
- * // TODO: Manik: Document this!
+ * SingletonStore is a delegating cache store used for situations when only one instance
should interact with the
+ * underlying store. The coordinator of the cluster will be responsible for the
underlying CacheStore.
+ * <p/>
+ * SingletonStore is a simply facade to a real CacheStore implementation. It always
delegates reads to the real
+ * CacheStore.
+ * <p/>
+ * Writes are delegated <i>only if,/i> this SingletonStore is currently the
cordinator. This avoids having all stores in
+ * a cluster writing the same data to the same underlying store. Although not incorrect
(e.g. a DB will just discard
+ * additional INSERTs for the same key, and throw an exception), this will avoid a lot of
redundant work.
+ * <p/>
+ * Whenever the current coordinator dies (or leaves), the second in line will take over.
That SingletonStore will then
+ * pass writes through to its underlying CacheStore. Optionally, when a new coordinator
takes over the Singleton, it can
+ * push the in-memory state to the cache cacheStore, within a time constraint.
*
+ * @author Bela Ban
+ * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
* @author Manik Surtani
+ * @since 1.0
*/
public class SingletonStore extends AbstractDelegatingStore {
- public SingletonStore(CacheStore delegate) {
+ private static final Log log = LogFactory.getLog(SingletonStore.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ CacheManager cacheManager;
+ Cache cache;
+ SingletonStoreConfig config;
+
+ /**
+ * Name of thread that should pushing in-memory state to cache loader.
+ */
+ private static final String THREAD_NAME = "SingletonStorePusherThread";
+
+ /**
+ * Executor service used to submit tasks to push in-memory state.
+ */
+ private final ExecutorService executor;
+
+ /**
+ * Future result of the in-memory push state task. This allows SingletonStore to check
whether there's any push taks
+ * on going.
+ */
+ Future<?> pushStateFuture; /* FutureTask guarantess a safe publication of the
result */
+
+ /**
+ * Address instance that allows SingletonStore to find out whether it became the
coordinator of the cluster, or
+ * whether it stopped being it. This dictates whether the SingletonStore is active or
not.
+ */
+ private Address localAddress;
+
+ /**
+ * Whether the the current node is the coordinator and therefore SingletonStore is
active. Being active means
+ * delegating calls to the underlying cache loader.
+ */
+ private volatile boolean active;
+
+
+ public SingletonStore(CacheStore delegate, Cache cache, SingletonStoreConfig config)
{
super(delegate);
- }
+ this.cacheManager = cache == null ? null : cache.getCacheManager();
+ this.cache = cache;
+ this.config = config;
- public void store(StoredEntry ed) {
- // TODO: Manik: Customise this generated block
- }
+ executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r, THREAD_NAME);
+ }
+ });
- public void storeAll(Collection ed) {
- // TODO: Manik: Customise this generated block
}
- public void store(InputStream inputStream) {
- // TODO: Manik: Customise this generated block
+ // -------------- Basic write methods
+ // only delegate if the current instance is active
+
+ @Override
+ public void store(StoredEntry ed) {
+ if (active) {
+ if (trace) log.trace("Storing key {0}. Instance: {1}", ed.getKey(),
this);
+ super.store(ed);
+ } else if (trace) log.trace("Not storing key {0}. Instance: {1}",
ed.getKey(), this);
}
- public void load(OutputStream outputStream) throws IOException {
- // TODO: Manik: Customise this generated block
+ @Override
+ public void store(InputStream inputStream) throws IOException, ClassNotFoundException
{
+ if (active) super.store(inputStream);
}
+ @Override
public void clear() {
- // TODO: Manik: Customise this generated block
+ if (active) super.clear();
}
+ @Override
public boolean remove(Object key) {
- return false; // TODO: Manik: Customise this generated block
+ return active && super.remove(key);
}
+ @Override
public void purgeExpired() {
- // TODO: Manik: Customise this generated block
+ if (active) super.purgeExpired();
}
+ @Override
public void commit(Transaction tx) {
- // TODO: Manik: Customise this generated block
+ if (active) super.commit(tx);
}
+ @Override
public void rollback(Transaction tx) {
- // TODO: Manik: Customise this generated block
+ if (active) super.rollback(tx);
}
- public void prepare(List list, Transaction tx, boolean isOnePhase) {
- // TODO: Manik: Customise this generated block
+ @Override
+ public void prepare(List<? extends Modification> list, Transaction tx, boolean
isOnePhase) {
+ if (active) super.prepare(list, tx, isOnePhase);
}
- public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
- // TODO: Manik: Customise this generated block
+ @Override
+ public void start() {
+ cacheManager.addListener(new SingletonStoreListener());
+ super.start();
}
- public StoredEntry load(Object key) {
- return null; // TODO: Manik: Customise this generated block
+ /**
+ * Factory method for the creation of a Callable task in charge of pushing in-memory
state to cache loader.
+ *
+ * @return new instance of Callable<?> whose call() method either throws an
exception or returns null if the task was
+ * successfull.
+ */
+ protected Callable<?> createPushStateTask() {
+ return new Callable() {
+ public Object call() throws Exception {
+ final boolean debugEnabled = log.isDebugEnabled();
+
+ if (debugEnabled) log.debug("start pushing in-memory state to cache
cacheLoader");
+ pushState(cache);
+ if (debugEnabled) log.debug("in-memory state passed to cache cacheLoader
successfully");
+
+ return null;
+ }
+ };
}
- public Set loadAll(Collection keys) {
- return null; // TODO: Manik: Customise this generated block
+ /**
+ * Pushes the state of a specific node by reading the node's data from the cache
and putting in the cache store via
+ * the cache loader. This method is call recursively so that it iterates through the
whole cache.
+ *
+ * @throws Exception if there's any issues reading the data from the cache or
pushing the node's data to the cache
+ * loader.
+ */
+ protected void pushState(final Cache cache) throws Exception {
+ DataContainer dc = cache.getAdvancedCache().getDataContainer();
+ Set keys = dc.keySet();
+ StoredEntry entry;
+ for (Object k : keys) if ((entry = dc.createEntryForStorage(k)) != null)
store(entry);
}
- public Set loadAll() {
- return null; // TODO: Manik: Customise this generated block
+
+ /**
+ * Method that waits for the in-memory to cache loader state to finish. This
method's called in case a push state is
+ * already in progress and we need to wait for it to finish.
+ *
+ * @param future instance of Future representing the on going push task
+ * @param timeout time to wait for the push task to finish
+ * @param unit instance of TimeUnit representing the unit of timeout
+ */
+ protected void awaitForPushToFinish(Future future, long timeout, TimeUnit unit) {
+ final boolean debugEnabled = log.isDebugEnabled();
+ try {
+ if (debugEnabled) log.debug("wait for state push to cache loader to
finish");
+ future.get(timeout, unit);
+ }
+ catch (TimeoutException e) {
+ if (debugEnabled) log.debug("timed out waiting for state push to cache
loader to finish");
+ }
+ catch (ExecutionException e) {
+ if (debugEnabled) log.debug("exception reported waiting for state push to
cache loader to finish");
+ }
+ catch (InterruptedException ie) {
+ /* Re-assert the thread's interrupted status */
+ Thread.currentThread().interrupt();
+ if (trace) log.trace("wait for state push to cache loader to finish was
interrupted");
+ }
}
- public boolean containsKey(Object key) {
- return false; // TODO: Manik: Customise this generated block
+
+ /**
+ * Method called when the node either becomes the coordinator or stops being the
coordinator. If it becomes the
+ * coordinator, it can optionally start the in-memory state transfer to the underlying
cache store.
+ *
+ * @param newActiveState true if the node just became the coordinator, false if the
nodes stopped being the
+ * coordinator.
+ */
+ protected void activeStatusChanged(boolean newActiveState) throws PushStateException
{
+ active = newActiveState;
+ log.debug("changed mode {0}", this);
+ if (active && config.isPushStateWhenCoordinator()) doPushState();
}
- public Class<? extends CacheLoaderConfig> getConfigurationClass() {
- return null;
+ /**
+ * Indicates whether the current nodes is the coordinator of the cluster. This
implementation assumes that the
+ * coordinator is the first member in the list.
+ *
+ * @param newView View instance containing the new view of the cluster
+ * @return whether the current node is the coordinator or not.
+ */
+ private boolean isCoordinator(List<Address> newView, Address currentAddress) {
+ if (!currentAddress.equals(localAddress)) localAddress = currentAddress;
+ if (localAddress != null) {
+ return newView.size() > 0 && localAddress.equals(newView.get(0));
+ } else {
+ /* Invalid new view, so previous value returned */
+ return active;
+ }
}
- public void start() {
- // TODO: Manik: Customise this generated block
+ /**
+ * Called when the SingletonStore discovers that the node has become the coordinator
and push in memory state has
+ * been enabled. It might not actually push the state if there's an ongoing push
task running, in which case will
+ * wait for the push task to finish.
+ *
+ * @throws PushStateException when the push state task reports an issue.
+ */
+ private void doPushState() throws PushStateException {
+ if (pushStateFuture == null || pushStateFuture.isDone()) {
+ Callable<?> task = createPushStateTask();
+ pushStateFuture = executor.submit(task);
+ try {
+ waitForTaskToFinish(pushStateFuture, config.getPushStateTimeout(),
TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ throw new PushStateException("unable to complete in memory state push to
cache loader", e);
+ }
+ } else {
+ /* at the most, we wait for push state timeout value. if it push task finishes
earlier, this call
+* will stop when the push task finishes, otherwise a timeout exception will be reported
*/
+ awaitForPushToFinish(pushStateFuture, config.getPushStateTimeout(),
TimeUnit.MILLISECONDS);
+ }
}
- public void stop() {
- // TODO: Manik: Customise this generated block
+
+ /**
+ * Waits, within a time constraint, for a task to finish.
+ *
+ * @param future represents the task waiting to finish.
+ * @param timeout maximum time to wait for the time to finish.
+ * @param unit instance of TimeUnit representing the unit of timeout
+ * @throws Exception if any issues are reported while waiting for the task to finish
+ */
+ private void waitForTaskToFinish(Future future, long timeout, TimeUnit unit) throws
Exception {
+ try {
+ future.get(timeout, unit);
+ }
+ catch (TimeoutException e) {
+ throw new Exception("task timed out", e);
+ }
+ catch (InterruptedException e) {
+ /* Re-assert the thread's interrupted status */
+ Thread.currentThread().interrupt();
+ if (trace) log.trace("task was interrupted");
+ }
+ finally {
+ /* no-op if task is completed */
+ future.cancel(true); /* interrupt if running */
+ }
}
+
+
+ /**
+ * Cache listener that reacts to cluster topology changes to find out whether a new
coordinator is elected.
+ * SingletonStore reacts to these changes in order to decide which node should
interact with the underlying cache
+ * store.
+ */
+ @Listener
+ public class SingletonStoreListener {
+ /**
+ * Cache started, check whether the node is the coordinator and set the singleton
store's active status.
+ */
+ @CacheStarted
+ public void cacheStarted(Event e) {
+ localAddress = cacheManager.getAddress();
+ active = cacheManager.isCoordinator();
+ }
+
+ /**
+ * The cluster formation changed, so determine whether the current node stopped
being the coordinator or became
+ * the coordinator. This method can lead to an optional in memory to cache loader
state push, if the current node
+ * became the coordinator. This method will report any issues that could
potentially arise from this push.
+ */
+ @ViewChanged
+ public void viewChange(ViewChangedEvent event) {
+ boolean tmp = isCoordinator(event.getNewMemberList(), event.getLocalAddress());
+
+ if (active != tmp) {
+ try {
+ activeStatusChanged(tmp);
+ }
+ catch (PushStateException e) {
+ log.error("exception reported changing nodes active status",
e);
+ }
+
+ }
+ }
+ }
+
+ /**
+ * Exception representing any issues that arise from pushing the in-memory state to
the cache loader.
+ */
+ public static class PushStateException extends Exception {
+ private static final long serialVersionUID = 5542893943730200886L;
+
+ public PushStateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PushStateException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SingletonStore: localAddress=" + localAddress + ",
active=" + active;
+ }
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStoreConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStoreConfig.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStoreConfig.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -13,7 +13,7 @@
private static final long serialVersionUID = 824251894176131850L;
boolean singletonStoreEnabled;
- boolean pushStateWhenCoordinator;
+ boolean pushStateWhenCoordinator = true;
long pushStateTimeout = 10000;
public boolean isSingletonStoreEnabled() {
Modified: core/branches/flat/src/test/java/org/horizon/expiry/ExpiryTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/expiry/ExpiryTest.java 2009-02-16
14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/test/java/org/horizon/expiry/ExpiryTest.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -1,6 +1,8 @@
package org.horizon.expiry;
import org.horizon.Cache;
+import org.horizon.container.DataContainer;
+import org.horizon.loader.StoredEntry;
import org.horizon.manager.CacheManager;
import org.horizon.manager.DefaultCacheManager;
import org.horizon.util.TestingUtil;
@@ -29,17 +31,19 @@
public void testExpiryInPut() throws InterruptedException {
Cache cache = cm.getCache();
- long startTime = System.currentTimeMillis();
long lifespan = 1000;
cache.put("k", "v", lifespan, TimeUnit.MILLISECONDS);
- while (System.currentTimeMillis() < startTime + lifespan + 100) {
- if (System.currentTimeMillis() < startTime + lifespan) {
- assert cache.get("k").equals("v");
- } else {
- assert cache.get("k") == null;
- }
- Thread.sleep(50);
- }
+
+ DataContainer dc = cache.getAdvancedCache().getDataContainer();
+ StoredEntry se = dc.createEntryForStorage("k");
+ assert se.getKey().equals("k");
+ assert se.getValue().equals("v");
+ assert se.getLifespan() == lifespan;
+ assert !se.isExpired();
+ assert cache.get("k").equals("v");
+ Thread.sleep(1100);
+ assert se.isExpired();
+ assert cache.get("k") == null;
}
public void testExpiryInPutAll() throws InterruptedException {
Deleted: core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java 2009-02-16 14:25:32
UTC (rev 7696)
+++ core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java 2009-02-17 01:07:15
UTC (rev 7697)
@@ -1,62 +0,0 @@
-package org.horizon.loader;
-
-import org.horizon.CacheException;
-import org.horizon.loader.decorators.AsyncStore;
-import org.horizon.loader.decorators.AsyncStoreConfig;
-import org.horizon.loader.dummy.DummyInMemoryCacheLoader;
-import org.horizon.util.TestingUtil;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
-import org.testng.annotations.Test;
-
-import java.util.concurrent.ExecutorService;
-
-@Test(groups = "unit", sequential = true)
-public class AsyncTest {
-
- AsyncStore store;
- ExecutorService asyncExecutor;
-
-
- @BeforeTest
- public void setUp() {
- store = new AsyncStore(new DummyInMemoryCacheLoader(), new AsyncStoreConfig());
- DummyInMemoryCacheLoader.Cfg cfg = new DummyInMemoryCacheLoader.Cfg();
- cfg.setStore(AsyncTest.class.getName());
- store.init(cfg, null, null);
- store.start();
- asyncExecutor = (ExecutorService) TestingUtil.extractField(store,
"executor");
- }
-
- @AfterTest
- public void tearDown() {
- if (store != null) store.stop();
- }
-
- @AfterMethod
- public void clearStore() {
- if (store != null) store.clear();
- }
-
- public void testRestrictionOnAddingToAsyncQueue() throws Exception {
- store.remove("blah");
-
- store.store(new StoredEntry("one", "value"));
- store.store(new StoredEntry("two", "value"));
- store.store(new StoredEntry("three", "value"));
- store.store(new StoredEntry("four", "value"));
-
- // stop the cache store
- store.stop();
- try {
- store.remove("blah");
- assert false : "Should have restricted this entry from being made";
- }
- catch (CacheException expected) {
- }
-
- // clean up
- store.start();
- }
-}
Added: core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -0,0 +1,340 @@
+package org.horizon.loader;
+
+import org.horizon.Cache;
+import org.horizon.config.CacheLoaderManagerConfig;
+import org.horizon.config.Configuration;
+import org.horizon.container.DataContainer;
+import org.horizon.lifecycle.ComponentStatus;
+import org.horizon.loader.dummy.DummyInMemoryCacheLoader;
+import org.horizon.manager.CacheManager;
+import org.horizon.manager.DefaultCacheManager;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.Collections;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+/**
+ * Tests the interceptor chain and surrounding logic
+ *
+ * @author Manik Surtani
+ */
+@Test(groups = "functional", sequential = true)
+public class CacheLoaderFunctionalTest {
+ Cache cache;
+ CacheStore store;
+ TransactionManager tm;
+ Configuration cfg;
+ CacheManager cm;
+ long lifespan = 6000000; // very large lifespan so nothing actually expires
+
+ @BeforeTest
+ public void setUp() {
+ cfg = new Configuration();
+
cfg.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+ clmc.addIndividualCacheLoaderConfig(new DummyInMemoryCacheLoader.Cfg());
+ cfg.setCacheLoaderConfig(clmc);
+ cm = new DefaultCacheManager(cfg);
+ cache = cm.getCache();
+ store = TestingUtil.extractComponent(cache,
CacheLoaderManager.class).getCacheStore();
+ tm = TestingUtil.getTransactionManager(cache);
+ }
+
+ @AfterTest
+ public void tearDown() {
+ TestingUtil.killCacheManagers(cm);
+ }
+
+ @AfterMethod
+ public void afterMethod() {
+ if (cache != null) cache.clear();
+ if (store != null) store.clear();
+ }
+
+ private void assertInCacheAndStore(Object key, Object value) {
+ assertInCacheAndStore(key, value, -1);
+ }
+
+ private void assertInCacheAndStore(Object key, Object value, long lifespanMillis) {
+ assertInCacheAndStore(cache, store, key, value, lifespanMillis);
+ }
+
+
+ private void assertInCacheAndStore(Cache cache, CacheStore store, Object key, Object
value) {
+ assertInCacheAndStore(cache, store, key, value, -1);
+ }
+
+ private void assertInCacheAndStore(Cache cache, CacheStore store, Object key, Object
value, long lifespanMillis) {
+ StoredEntry se =
cache.getAdvancedCache().getDataContainer().createEntryForStorage(key);
+ testStoredEntry(se, value, lifespanMillis, "Cache", key);
+ se = store.load(key);
+ testStoredEntry(se, value, lifespanMillis, "Store", key);
+ }
+
+ private void testStoredEntry(StoredEntry entry, Object expectedValue, long
expectedLifespan, String src, Object key) {
+ assert entry != null : src + " entry for key " + key + " should NOT
be null";
+ assert entry.getValue().equals(expectedValue) : src + " should contain value
" + expectedValue + " under key " + entry.getKey() + " but was "
+ entry.getValue() + ". Entry is " + entry;
+ assert entry.getLifespan() == expectedLifespan : src + " expected lifespan for
key " + key + " to be " + expectedLifespan + " but was " +
entry.getLifespan() + ". Entry is " + entry;
+ }
+
+ private void assertNotInCacheAndStore(Cache cache, CacheStore store, Object... keys)
{
+ for (Object key : keys) {
+ assert !cache.getAdvancedCache().getDataContainer().containsKey(key) :
"Cache should not contain key " + key;
+ assert !store.containsKey(key) : "Store should not contain key " +
key;
+ }
+ }
+
+ private void assertNotInCacheAndStore(Object... keys) {
+ assertNotInCacheAndStore(cache, store, keys);
+ }
+
+ public void testStoreAndRetrieve() {
+ assertNotInCacheAndStore("k1", "k2", "k3",
"k4", "k5", "k6", "k7");
+
+ cache.put("k1", "v1");
+ cache.put("k2", "v2", lifespan, MILLISECONDS);
+ cache.putAll(Collections.singletonMap("k3", "v3"));
+ cache.putAll(Collections.singletonMap("k4", "v4"), lifespan,
MILLISECONDS);
+ cache.putIfAbsent("k5", "v5");
+ cache.putIfAbsent("k6", "v6", lifespan, MILLISECONDS);
+ cache.putIfAbsent("k5", "v5-SHOULD-NOT-PUT");
+ cache.putIfAbsent("k6", "v6-SHOULD-NOT-PUT", lifespan,
MILLISECONDS);
+ cache.putForExternalRead("k7", "v7");
+ cache.putForExternalRead("k7", "v7-SHOULD-NOT-PUT");
+
+ for (int i = 1; i < 8; i++) {
+ // even numbers have lifespans
+ if (i % 2 == 1)
+ assertInCacheAndStore("k" + i, "v" + i);
+ else
+ assertInCacheAndStore("k" + i, "v" + i, lifespan);
+ }
+
+ cache.remove("k1", "some rubbish");
+
+ for (int i = 1; i < 8; i++) {
+ // even numbers have lifespans
+ if (i % 2 == 1)
+ assertInCacheAndStore("k" + i, "v" + i);
+ else
+ assertInCacheAndStore("k" + i, "v" + i, lifespan);
+ }
+
+ assert cache.remove("k1", "v1");
+ assert cache.remove("k2").equals("v2");
+
+ assertNotInCacheAndStore("k1", "k2");
+
+ for (int i = 3; i < 8; i++) {
+ // even numbers have lifespans
+ if (i % 2 == 1)
+ assertInCacheAndStore("k" + i, "v" + i);
+ else
+ assertInCacheAndStore("k" + i, "v" + i, lifespan);
+ }
+
+ cache.clear();
+ assertNotInCacheAndStore("k1", "k2", "k3",
"k4", "k5", "k6", "k7");
+ }
+
+ public void testReplaceMethods() {
+ assertNotInCacheAndStore("k1", "k2", "k3",
"k4");
+
+ cache.replace("k1", "v1-SHOULD-NOT-STORE");
+ cache.replace("k2", "v2-SHOULD-NOT-STORE", lifespan,
MILLISECONDS);
+
+ assertNotInCacheAndStore("k1", "k2", "k3",
"k4");
+
+ cache.put("k1", "v1");
+ cache.put("k2", "v2");
+ cache.put("k3", "v3");
+ cache.put("k4", "v4");
+
+ for (int i = 1; i < 5; i++) assertInCacheAndStore("k" + i,
"v" + i);
+
+ cache.replace("k1", "v1-SHOULD-NOT-STORE",
"v1-STILL-SHOULD-NOT-STORE");
+ cache.replace("k2", "v2-SHOULD-NOT-STORE",
"v2-STILL-SHOULD-NOT-STORE", lifespan, MILLISECONDS);
+
+ for (int i = 1; i < 5; i++) assertInCacheAndStore("k" + i,
"v" + i);
+
+ cache.replace("k1", "v1-REPLACED");
+ cache.replace("k2", "v2-REPLACED", lifespan, MILLISECONDS);
+ cache.replace("k3", "v3", "v3-REPLACED");
+ cache.replace("k4", "v4", "v4-REPLACED", lifespan,
MILLISECONDS);
+
+ for (int i = 1; i < 5; i++) {
+ // even numbers have lifespans
+ if (i % 2 == 1)
+ assertInCacheAndStore("k" + i, "v" + i +
"-REPLACED");
+ else
+ assertInCacheAndStore("k" + i, "v" + i +
"-REPLACED", lifespan);
+ }
+
+ }
+
+ public void testLoading() {
+ assertNotInCacheAndStore("k1", "k2", "k3",
"k4");
+ store.store(new StoredEntry("k1", "v1"));
+ store.store(new StoredEntry("k2", "v2"));
+ store.store(new StoredEntry("k3", "v3"));
+ store.store(new StoredEntry("k4", "v4"));
+
+ for (int i = 1; i < 5; i++) assert cache.get("k" +
i).equals("v" + i);
+
+ for (int i = 1; i < 5; i++) cache.evict("k" + i);
+
+ assert cache.putIfAbsent("k1",
"v1-SHOULD-NOT-STORE").equals("v1");
+ assert cache.remove("k2").equals("v2");
+ assert cache.replace("k3",
"v3-REPLACED").equals("v3");
+ assert cache.replace("k4", "v4", "v4-REPLACED");
+
+ assert cache.size() == 3 : "Expected the cache to contain 3 elements but
contained " + cache.size();
+
+ for (int i = 1; i < 5; i++) cache.evict("k" + i);
+ assert cache.size() == 0; // cache size ops will not trigger a load
+
+ cache.clear(); // this should propagate to the loader though
+ assertNotInCacheAndStore("k1", "k2", "k3",
"k4");
+ }
+
+ public void testPreloading() {
+ Configuration preloadingCfg = cfg.clone();
+ preloadingCfg.getCacheLoaderConfig().setPreload(true);
+ ((DummyInMemoryCacheLoader.Cfg)
preloadingCfg.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("preloadingCache");
+ cm.defineCache("preloadingCache", preloadingCfg);
+ Cache preloadingCache = cm.getCache("preloadingCache");
+ CacheStore preloadingStore = TestingUtil.extractComponent(preloadingCache,
CacheLoaderManager.class).getCacheStore();
+
+ assert preloadingCache.getConfiguration().getCacheLoaderConfig().isPreload();
+
+ assertNotInCacheAndStore(preloadingCache, preloadingStore, "k1",
"k2", "k3", "k4");
+
+ preloadingCache.put("k1", "v1");
+ preloadingCache.put("k2", "v2", lifespan, MILLISECONDS);
+ preloadingCache.put("k3", "v3");
+ preloadingCache.put("k4", "v4", lifespan, MILLISECONDS);
+
+ for (int i = 1; i < 5; i++) {
+ if (i % 2 == 1)
+ assertInCacheAndStore(preloadingCache, preloadingStore, "k" + i,
"v" + i);
+ else
+ assertInCacheAndStore(preloadingCache, preloadingStore, "k" + i,
"v" + i, lifespan);
+ }
+
+ DataContainer c = preloadingCache.getAdvancedCache().getDataContainer();
+ assert c.size() == 4;
+ preloadingCache.stop();
+ assert c.size() == 0;
+
+ preloadingCache.start();
+ assert preloadingCache.getConfiguration().getCacheLoaderConfig().isPreload();
+ c = preloadingCache.getAdvancedCache().getDataContainer();
+ assert c.size() == 4;
+
+ for (int i = 1; i < 5; i++) {
+ if (i % 2 == 1)
+ assertInCacheAndStore(preloadingCache, preloadingStore, "k" + i,
"v" + i);
+ else
+ assertInCacheAndStore(preloadingCache, preloadingStore, "k" + i,
"v" + i, lifespan);
+ }
+ }
+
+ public void testPurgeOnStartup() {
+ Configuration purgingCfg = cfg.clone();
+
purgingCfg.getCacheLoaderConfig().getFirstCacheLoaderConfig().setPurgeOnStartup(true);
+ ((DummyInMemoryCacheLoader.Cfg)
purgingCfg.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("purgingCache");
+ cm.defineCache("purgingCache", purgingCfg);
+ Cache purgingCache = cm.getCache("purgingCache");
+ CacheStore purgingStore = TestingUtil.extractComponent(purgingCache,
CacheLoaderManager.class).getCacheStore();
+
+ assertNotInCacheAndStore(purgingCache, purgingStore, "k1",
"k2", "k3", "k4");
+
+ purgingCache.put("k1", "v1");
+ purgingCache.put("k2", "v2", lifespan, MILLISECONDS);
+ purgingCache.put("k3", "v3");
+ purgingCache.put("k4", "v4", lifespan, MILLISECONDS);
+
+ for (int i = 1; i < 5; i++) {
+ if (i % 2 == 1)
+ assertInCacheAndStore(purgingCache, purgingStore, "k" + i,
"v" + i);
+ else
+ assertInCacheAndStore(purgingCache, purgingStore, "k" + i,
"v" + i, lifespan);
+ }
+
+ DataContainer c = purgingCache.getAdvancedCache().getDataContainer();
+ assert c.size() == 4;
+ purgingCache.stop();
+ assert c.size() == 0;
+
+ purgingCache.start();
+ c = purgingCache.getAdvancedCache().getDataContainer();
+ assert c.size() == 0;
+
+ assertNotInCacheAndStore(purgingCache, purgingStore, "k1",
"k2", "k3", "k4");
+ }
+
+ public void testTransactionalWrites() throws Exception {
+ assert cache.getStatus() == ComponentStatus.RUNNING;
+ assertNotInCacheAndStore("k1", "k2");
+
+ tm.begin();
+ cache.put("k1", "v1");
+ cache.put("k2", "v2", lifespan, MILLISECONDS);
+ Transaction t = tm.suspend();
+
+ assertNotInCacheAndStore("k1", "k2");
+
+ tm.resume(t);
+ tm.commit();
+
+ assertInCacheAndStore("k1", "v1");
+ assertInCacheAndStore("k2", "v2", lifespan);
+
+ tm.begin();
+ cache.clear();
+ t = tm.suspend();
+
+ assertInCacheAndStore("k1", "v1");
+ assertInCacheAndStore("k2", "v2", lifespan);
+ tm.resume(t);
+ tm.commit();
+
+ assertNotInCacheAndStore("k1", "k2");
+
+ tm.begin();
+ cache.put("k1", "v1");
+ cache.put("k2", "v2", lifespan, MILLISECONDS);
+ t = tm.suspend();
+
+ assertNotInCacheAndStore("k1", "k2");
+
+ tm.resume(t);
+ tm.rollback();
+
+ assertNotInCacheAndStore("k1", "k2");
+ cache.put("k1", "v1");
+ cache.put("k2", "v2", lifespan, MILLISECONDS);
+
+ assertInCacheAndStore("k1", "v1");
+ assertInCacheAndStore("k2", "v2", lifespan);
+
+ tm.begin();
+ cache.clear();
+ t = tm.suspend();
+
+ assertInCacheAndStore("k1", "v1");
+ assertInCacheAndStore("k2", "v2", lifespan);
+ tm.resume(t);
+ tm.rollback();
+
+ assertInCacheAndStore("k1", "v1");
+ assertInCacheAndStore("k2", "v2", lifespan);
+ }
+}
Copied: core/branches/flat/src/test/java/org/horizon/loader/decorators/AsyncTest.java
(from rev 7695, core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java)
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/decorators/AsyncTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/decorators/AsyncTest.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -0,0 +1,61 @@
+package org.horizon.loader.decorators;
+
+import org.horizon.CacheException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.dummy.DummyInMemoryCacheLoader;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutorService;
+
+@Test(groups = "unit", sequential = true)
+public class AsyncTest {
+
+ AsyncStore store;
+ ExecutorService asyncExecutor;
+
+
+ @BeforeTest
+ public void setUp() {
+ store = new AsyncStore(new DummyInMemoryCacheLoader(), new AsyncStoreConfig());
+ DummyInMemoryCacheLoader.Cfg cfg = new DummyInMemoryCacheLoader.Cfg();
+ cfg.setStore(AsyncTest.class.getName());
+ store.init(cfg, null, null);
+ store.start();
+ asyncExecutor = (ExecutorService) TestingUtil.extractField(store,
"executor");
+ }
+
+ @AfterTest
+ public void tearDown() {
+ if (store != null) store.stop();
+ }
+
+ @AfterMethod
+ public void clearStore() {
+ if (store != null) store.clear();
+ }
+
+ public void testRestrictionOnAddingToAsyncQueue() throws Exception {
+ store.remove("blah");
+
+ store.store(new StoredEntry("one", "value"));
+ store.store(new StoredEntry("two", "value"));
+ store.store(new StoredEntry("three", "value"));
+ store.store(new StoredEntry("four", "value"));
+
+ // stop the cache store
+ store.stop();
+ try {
+ store.remove("blah");
+ assert false : "Should have restricted this entry from being made";
+ }
+ catch (CacheException expected) {
+ }
+
+ // clean up
+ store.start();
+ }
+}
Added:
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -0,0 +1,32 @@
+package org.horizon.loader.decorators;
+
+import static org.easymock.EasyMock.*;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.testng.annotations.Test;
+
+import java.io.InputStream;
+
+@Test(groups = "unit")
+public class ReadOnlyCacheStoreTest {
+ public void testWriteMethods() {
+ CacheStore mock = createMock(CacheStore.class);
+ ReadOnlyStore store = new ReadOnlyStore(mock);
+ StoredEntry mockEntry = new StoredEntry();
+ expect(mock.load(eq("key"))).andReturn(mockEntry).once();
+ replay(mock);
+
+ // these should be "silent" no-ops and not actually change anything.
+ store.clear();
+ store.purgeExpired();
+ store.remove("key");
+ store.store((StoredEntry) null);
+ store.store((InputStream) null);
+ store.prepare(null, null, true);
+ store.commit(null);
+ store.rollback(null);
+ assert mockEntry == store.load("key");
+
+ verify(mock);
+ }
+}
Added:
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -0,0 +1,321 @@
+package org.horizon.loader.decorators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
+import org.horizon.config.CacheLoaderManagerConfig;
+import org.horizon.config.Configuration;
+import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.dummy.DummyInMemoryCacheLoader;
+import org.horizon.manager.CacheManager;
+import org.horizon.util.TestingUtil;
+import org.horizon.util.internals.ViewChangeListener;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.fail;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Test(groups = "functional", sequential = true)
+public class SingletonStoreTest extends BaseClusteredTest {
+ private static final Log log = LogFactory.getLog(SingletonStoreTest.class);
+ private static final AtomicInteger storeCounter = new AtomicInteger(0);
+ private CacheManager cm0, cm1, cm2;
+
+ @BeforeMethod
+ public void setUp() {
+ cm0 = addClusterEnabledCacheManager();
+ cm1 = addClusterEnabledCacheManager();
+ cm2 = addClusterEnabledCacheManager();
+
+ Configuration conf = new Configuration();
+ conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ DummyInMemoryCacheLoader.Cfg cfg = new DummyInMemoryCacheLoader.Cfg();
+ cfg.setStore("Store-" + storeCounter.getAndIncrement());
+ CacheLoaderManagerConfig pushingCfg = new CacheLoaderManagerConfig();
+ pushingCfg.addIndividualCacheLoaderConfig(cfg);
+ SingletonStoreConfig ssc = new SingletonStoreConfig();
+ ssc.setPushStateWhenCoordinator(true);
+ ssc.setSingletonStoreEnabled(true);
+ cfg.setSingletonStoreConfig(ssc);
+ conf.setCacheLoaderConfig(pushingCfg);
+
+ // cannot define on ALL cache managers since the same dummy in memory CL bin will
be used!
+ cm0.defineCache("pushing", conf);
+ ((DummyInMemoryCacheLoader.Cfg)
conf.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("Store-" +
storeCounter.getAndIncrement());
+ cm1.defineCache("pushing", conf);
+ ((DummyInMemoryCacheLoader.Cfg)
conf.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("Store-" +
storeCounter.getAndIncrement());
+ cm2.defineCache("pushing", conf);
+
+ conf = new Configuration();
+ conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ cfg = new DummyInMemoryCacheLoader.Cfg();
+ cfg.setStore("Store-" + storeCounter.getAndIncrement());
+ CacheLoaderManagerConfig nonPushingCfg = new CacheLoaderManagerConfig();
+ nonPushingCfg.addIndividualCacheLoaderConfig(cfg);
+ ssc = new SingletonStoreConfig();
+ ssc.setPushStateWhenCoordinator(false);
+ ssc.setSingletonStoreEnabled(true);
+ cfg.setSingletonStoreConfig(ssc);
+ conf.setCacheLoaderConfig(nonPushingCfg);
+
+ // cannot define on ALL cache managers since the same dummy in memory CL bin will
be used!
+ cm0.defineCache("nonPushing", conf);
+ ((DummyInMemoryCacheLoader.Cfg)
conf.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("Store-" +
storeCounter.getAndIncrement());
+ cm1.defineCache("nonPushing", conf);
+ ((DummyInMemoryCacheLoader.Cfg)
conf.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("Store-" +
storeCounter.getAndIncrement());
+ cm2.defineCache("nonPushing", conf);
+ }
+
+ private Cache[] getCaches(String name) {
+ return new Cache[]{
+ cm0.getCache(name), cm1.getCache(name), cm2.getCache(name)
+ };
+ }
+
+ private SingletonStore[] extractStores(Cache[] caches) {
+ SingletonStore[] stores = new SingletonStore[caches.length];
+
+ int i = 0;
+ for (Cache c : caches)
+ stores[i++] = (SingletonStore) TestingUtil.extractComponent(c,
CacheLoaderManager.class).getCacheStore();
+ return stores;
+ }
+
+ private Object load(CacheStore cs, Object key) {
+ StoredEntry se = cs.load(key);
+ return se == null ? null : se.getValue();
+ }
+
+ public void testPutCacheLoaderWithNoPush() throws Exception {
+ Cache[] caches = getCaches("nonPushing");
+ for (Cache c : caches) c.start();
+
+ int i = 1;
+ for (Cache c : caches) {
+ c.put("key" + i, "value" + i);
+ i++;
+ }
+
+ // all values should be on all caches since they are sync-repl
+ for (Cache c : caches) {
+ for (i = 1; i < 4; i++) assert c.get("key" +
i).equals("value" + i);
+ }
+
+ // now test the stores. These should *only* be on the store on cache 1.
+ CacheStore[] stores = extractStores(caches);
+
+ for (i = 1; i < 4; i++) {
+ // should ONLY be on the first loader!
+ assert load(stores[0], "key" + i).equals("value" + i);
+ assert load(stores[1], "key" + i) == null : "stores[1] should not
have stored key key" + i;
+ assert load(stores[2], "key" + i) == null : "stores[2] should not
have stored key key" + i;
+ }
+
+ cm0.stop();
+ TestingUtil.blockUntilViewsReceived(60000, cm1, cm2);
+
+ caches[1].put("key4", "value4");
+ caches[2].put("key5", "value5");
+
+ assert load(stores[1], "key4").equals("value4");
+ assert load(stores[1], "key5").equals("value5");
+
+ assert load(stores[2], "key4") == null;
+ assert load(stores[2], "key5") == null;
+
+ cm1.stop();
+ TestingUtil.blockUntilViewsReceived(60000, cm2);
+
+ caches[2].put("key6", "value6");
+ assert load(stores[2], "key6").equals("value6");
+ }
+
+ public void testPutCacheLoaderWithPush() throws Exception {
+ Cache[] caches = getCaches("pushing");
+ for (Cache c : caches) c.start();
+ Map<String, String> expected = new HashMap<String, String>();
+
+ expected.put("a-key", "a-value");
+ expected.put("aa-key", "aa-value");
+ expected.put("b-key", "b-value");
+ expected.put("bb-key", "bb-value");
+ expected.put("c-key", "c-value");
+ expected.put("d-key", "d-value");
+ expected.put("e-key", "e-value");
+ expected.put("g-key", "g-value");
+
+ caches[0].putAll(expected);
+
+ SingletonStore[] stores = extractStores(caches);
+
+ for (String key : expected.keySet()) {
+ assert load(stores[0], key).equals(expected.get(key));
+ assert load(stores[1], key) == null;
+ assert load(stores[2], key) == null;
+ }
+
+ ViewChangeListener viewChangeListener = new ViewChangeListener(caches[1]);
+
+ cm0.stop();
+
+ viewChangeListener.waitForViewChange(60, TimeUnit.SECONDS);
+
+ waitForPushStateCompletion(stores[1].pushStateFuture);
+
+ // cache store 1 should have all state now, and store 2 should have nothing
+
+ for (String key : expected.keySet()) {
+ assert load(stores[1], key).equals(expected.get(key));
+ assert load(stores[2], key) == null;
+ }
+
+ caches[1].put("h-key", "h-value");
+ caches[2].put("i-key", "i-value");
+ expected.put("h-key", "h-value");
+ expected.put("i-key", "i-value");
+
+ for (String key : expected.keySet()) {
+ assert load(stores[1], key).equals(expected.get(key));
+ assert load(stores[2], key) == null;
+ }
+
+ viewChangeListener = new ViewChangeListener(caches[2]);
+ cm1.stop();
+ viewChangeListener.waitForViewChange(60, TimeUnit.SECONDS);
+
+ waitForPushStateCompletion(stores[2].pushStateFuture);
+
+ for (String key : expected.keySet()) assert load(stores[2],
key).equals(expected.get(key));
+
+ caches[2].put("aaa-key", "aaa-value");
+ expected.put("aaa-key", "aaa-value");
+
+ for (String key : expected.keySet()) assert load(stores[2],
key).equals(expected.get(key));
+ }
+
+ public void testAvoidConcurrentStatePush() throws Exception {
+ final ExecutorService executor = Executors.newFixedThreadPool(2);
+ final CountDownLatch pushStateCanFinish = new CountDownLatch(1);
+ final CountDownLatch secondActiveStatusChangerCanStart = new CountDownLatch(1);
+ final TestingSingletonStore mscl = new TestingSingletonStore(pushStateCanFinish,
secondActiveStatusChangerCanStart, new SingletonStoreConfig());
+
+ Future f1 = executor.submit(createActiveStatusChanger(mscl));
+ assert secondActiveStatusChangerCanStart.await(1000, TimeUnit.MILLISECONDS) :
"Failed waiting on latch";
+
+ Future f2 = executor.submit(createActiveStatusChanger(mscl));
+
+ f1.get();
+ f2.get();
+
+ assertEquals(1, mscl.getNumberCreatedTasks());
+ }
+
+ public void testPushStateTimedOut() throws Throwable {
+ final CountDownLatch pushStateCanFinish = new CountDownLatch(1);
+ SingletonStoreConfig ssdc = new SingletonStoreConfig();
+ ssdc.setPushStateTimeout(100);
+ final TestingSingletonStore mscl = new TestingSingletonStore(pushStateCanFinish,
null, ssdc);
+
+ Future f =
Executors.newSingleThreadExecutor().submit(createActiveStatusChanger(mscl));
+ pushStateCanFinish.await(200, TimeUnit.MILLISECONDS);
+ pushStateCanFinish.countDown();
+
+ try {
+ f.get();
+ fail("Should have timed out");
+ }
+ catch (ExecutionException expected) {
+ Throwable e;
+ if ((e = expected.getCause().getCause().getCause()) instanceof TimeoutException)
{
+ assert true : "This is expected";
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ private void waitForPushStateCompletion(Future pushThreadFuture) throws Exception {
+ if (pushThreadFuture != null) pushThreadFuture.get();
+ }
+
+ private Callable<?> createActiveStatusChanger(SingletonStore mscl) {
+ return new ActiveStatusModifier(mscl);
+ }
+
+ class TestingSingletonStore extends SingletonStore {
+ private int numberCreatedTasks = 0;
+ private CountDownLatch pushStateCanFinish;
+ private CountDownLatch secondActiveStatusChangerCanStart;
+
+ public TestingSingletonStore(CountDownLatch pushStateCanFinish, CountDownLatch
secondActiveStatusChangerCanStart, SingletonStoreConfig cfg) {
+ super(null, null, cfg);
+ this.pushStateCanFinish = pushStateCanFinish;
+ this.secondActiveStatusChangerCanStart = secondActiveStatusChangerCanStart;
+ }
+
+ public int getNumberCreatedTasks() {
+ return numberCreatedTasks;
+ }
+
+ public void setNumberCreatedTasks(int numberCreatedTasks) {
+ this.numberCreatedTasks = numberCreatedTasks;
+ }
+
+ @Override
+ protected Callable<?> createPushStateTask() {
+ return new Callable() {
+ public Object call() throws Exception {
+ numberCreatedTasks++;
+ try {
+ if (secondActiveStatusChangerCanStart != null) {
+ secondActiveStatusChangerCanStart.countDown();
+ }
+ pushStateCanFinish.await();
+ }
+ catch (InterruptedException e) {
+ fail("ActiveStatusModifier interrupted");
+ }
+ return null;
+ }
+ };
+ }
+
+
+ @Override
+ protected void awaitForPushToFinish(Future future, long timeout, TimeUnit unit) {
+ pushStateCanFinish.countDown();
+ super.awaitForPushToFinish(future, timeout, unit);
+ }
+ }
+
+ class ActiveStatusModifier implements Callable {
+ private SingletonStore scl;
+
+ public ActiveStatusModifier(SingletonStore singleton) {
+ scl = singleton;
+ }
+
+ public Object call() throws Exception {
+ log.debug("active status modifier started");
+ scl.activeStatusChanged(true);
+ scl.pushStateFuture.get();
+
+ return null;
+ }
+ }
+}
Modified:
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java 2009-02-16
14:25:32 UTC (rev 7696)
+++
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -5,6 +5,8 @@
import org.horizon.loader.AbstractCacheStore;
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.StoredEntry;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
import java.io.IOException;
@@ -20,7 +22,7 @@
import java.util.concurrent.ConcurrentMap;
public class DummyInMemoryCacheLoader extends AbstractCacheStore {
-
+ private static final Log log = LogFactory.getLog(DummyInMemoryCacheLoader.class);
static final ConcurrentMap<String, Map> stores = new
ConcurrentHashMap<String, Map>();
String storeName = "__DEFAULT_STORES__";
Map<Object, StoredEntry> store;
@@ -73,6 +75,7 @@
StoredEntry se = store.get(key);
if (se == null) return null;
if (se.isExpired()) {
+ log.debug("Key {0} exists, but has expired. Entry is {1}", key, se);
store.remove(key);
return null;
}
@@ -84,9 +87,10 @@
Set<StoredEntry> s = new HashSet<StoredEntry>();
for (Iterator<StoredEntry> i = store.values().iterator(); i.hasNext();) {
StoredEntry se = i.next();
- if (se.isExpired())
+ if (se.isExpired()) {
+ log.debug("Key {0} exists, but has expired. Entry is {1}",
se.getKey(), se);
i.remove();
- else
+ } else
s.add(se);
}
return s;
@@ -109,13 +113,16 @@
}
public void stop() {
- stores.remove(storeName);
}
public static class Cfg extends AbstractCacheLoaderConfig {
boolean debug;
- String store;
+ String store = "__DEFAULT_STORE__";
+ public Cfg() {
+ setClassName(DummyInMemoryCacheLoader.class.getName());
+ }
+
public boolean isDebug() {
return debug;
}
@@ -131,5 +138,10 @@
public void setStore(String store) {
this.store = store;
}
+
+ @Override
+ public Cfg clone() {
+ return (Cfg) super.clone();
+ }
}
}
Added:
core/branches/flat/src/test/java/org/horizon/util/internals/ViewChangeListener.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/util/internals/ViewChangeListener.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/util/internals/ViewChangeListener.java 2009-02-17
01:07:15 UTC (rev 7697)
@@ -0,0 +1,45 @@
+package org.horizon.util.internals;
+
+import org.horizon.Cache;
+import org.horizon.manager.CacheManager;
+import org.horizon.notifications.Listener;
+import org.horizon.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.horizon.notifications.cachemanagerlistener.event.ViewChangedEvent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Listens for view changes. Note that you do NOT have to register this listener; it
does so automatically when
+ * constructed.
+ */
+@Listener
+public class ViewChangeListener {
+ CacheManager cm;
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ public ViewChangeListener(Cache c) {
+ this(c.getCacheManager());
+ }
+
+ public ViewChangeListener(CacheManager cm) {
+ this.cm = cm;
+ cm.addListener(this);
+ }
+
+ @ViewChanged
+ public void onViewChange(ViewChangedEvent e) {
+ latch.countDown();
+ }
+
+ /**
+ * Blocks for a certain amount of time until a view change is received. Note that
this class will start listening
+ * for the view change the moment it is constructed.
+ *
+ * @param time time to wait
+ * @param unit time unit
+ */
+ public void waitForViewChange(long time, TimeUnit unit) throws InterruptedException {
+ if (!latch.await(time, unit)) assert false : "View change not seen after
" + time + " " + unit;
+ }
+}