[jbosscache-commits] JBoss Cache SVN: r7697 - in core/branches/flat/src: main/java/org/horizon/commands/write and 10 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Feb 16 20:07:15 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-02-16 20:07:15 -0500 (Mon, 16 Feb 2009)
New Revision: 7697

Added:
   core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java
   core/branches/flat/src/test/java/org/horizon/loader/decorators/
   core/branches/flat/src/test/java/org/horizon/loader/decorators/AsyncTest.java
   core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
   core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
   core/branches/flat/src/test/java/org/horizon/util/internals/ViewChangeListener.java
Removed:
   core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java
Modified:
   core/branches/flat/src/main/java/org/horizon/AdvancedCache.java
   core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
   core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
   core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
   core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java
   core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java
   core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
   core/branches/flat/src/main/java/org/horizon/factories/EntryFactory.java
   core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java
   core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java
   core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java
   core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java
   core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
   core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
   core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java
   core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
   core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java
   core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
   core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
   core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStoreConfig.java
   core/branches/flat/src/test/java/org/horizon/expiry/ExpiryTest.java
   core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java
Log:
Loader tests and interceptor code

Modified: core/branches/flat/src/main/java/org/horizon/AdvancedCache.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/AdvancedCache.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/AdvancedCache.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -1,6 +1,7 @@
 package org.horizon;
 
 import org.horizon.batch.BatchContainer;
+import org.horizon.container.DataContainer;
 import org.horizon.eviction.EvictionManager;
 import org.horizon.factories.ComponentRegistry;
 import org.horizon.interceptors.base.CommandInterceptor;
@@ -82,6 +83,8 @@
 
    InvocationContextContainer getInvocationContextContainer();
 
+   DataContainer getDataContainer();
+
    void putForExternalRead(K key, V value, Options... options);
 
    V put(K key, V value, Options... options);

Modified: core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/CacheDelegate.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/CacheDelegate.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -397,6 +397,10 @@
       return invocationContextContainer;
    }
 
+   public DataContainer getDataContainer() {
+      return dataContainer;
+   }
+
    public CacheManager getCacheManager() {
       return cacheManager;
    }
@@ -435,19 +439,16 @@
    }
 
    public void compact() {
-      for (Object key : dataContainer.keySet())
-      {
+      for (Object key : dataContainer.keySet()) {
          // get the key first, before attempting to serialize stuff since data.get() may deserialize the key if doing
          // a hashcode() or equals().
 
          Object value = dataContainer.get(key);
-         if (key instanceof MarshalledValue)
-         {
+         if (key instanceof MarshalledValue) {
             ((MarshalledValue) key).compact(true, true);
          }
 
-         if (value instanceof MarshalledValue)
-         {
+         if (value instanceof MarshalledValue) {
             ((MarshalledValue) value).compact(true, true);
          }
       }

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/PutKeyValueCommand.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -95,8 +95,10 @@
             if (existing instanceof DeltaAware) toMergeWith = (DeltaAware) existing;
             e.setValue(dv.merge(toMergeWith));
             o = existing;
+            e.setLifespan(lifespanMillis);
          } else {
             o = e.setValue(value);
+            e.setLifespan(lifespanMillis);
          }
          notifier.notifyCacheEntryModified(key, e.getValue(), false, ctx);
       }
@@ -161,9 +163,10 @@
    @Override
    public String toString() {
       return "PutKeyValueCommand{" +
-            "lifespanMillis=" + lifespanMillis +
+            "key=" + key +
+            ", value=" + value +
+            ", lifespanMillis=" + lifespanMillis +
             ", putIfAbsent=" + putIfAbsent +
-            ", value=" + value +
             '}';
    }
 

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/PutMapCommand.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -68,6 +68,7 @@
          MVCCEntry me = ctx.lookupEntry(key);
          notifier.notifyCacheEntryModified(key, me.getValue(), true, ctx);
          me.setValue(e.getValue());
+         me.setLifespan(lifespanMillis);
          notifier.notifyCacheEntryModified(key, me.getValue(), false, ctx);
       }
       return null;

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/RemoveCommand.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -25,6 +25,8 @@
 import org.horizon.commands.read.AbstractDataCommand;
 import org.horizon.container.MVCCEntry;
 import org.horizon.context.InvocationContext;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
 import org.horizon.notifications.cachelistener.CacheNotifier;
 
 
@@ -33,6 +35,8 @@
  * @since 1.0
  */
 public class RemoveCommand extends AbstractDataCommand implements DataWriteCommand {
+   private static final Log log = LogFactory.getLog(RemoveCommand.class);
+   private static final boolean trace = log.isTraceEnabled();
    public static final byte METHOD_ID = 6;
    protected CacheNotifier notifier;
    boolean successful = true;
@@ -59,6 +63,7 @@
    public Object perform(InvocationContext ctx) throws Throwable {
       MVCCEntry e = ctx.lookupEntry(key);
       if (e == null || e.isNullEntry()) {
+         log.trace("Nothing to remove since the entry is null or we have a null entry");
          if (value == null) {
             return null;
          } else {

Modified: core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/commands/write/ReplaceCommand.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -67,12 +67,14 @@
 
             if (oldValue == null || oldValue.equals(e.getValue())) {
                Object old = e.setValue(newValue);
+               e.setLifespan(lifespanMillis);
                return returnValue(old, true);
             }
             return returnValue(null, false);
          } else {
             // for remotely originating calls, this doesn't check the status of what is under the key at the moment
             Object old = e.setValue(newValue);
+            e.setLifespan(lifespanMillis);
             return returnValue(old, true);
          }
       }

Modified: core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/container/MVCCEntry.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -51,4 +51,8 @@
    boolean isValid();
 
    void setValid(boolean valid);
+
+   long getLifespan();
+
+   void setLifespan(long lifespan);
 }

Modified: core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/container/ReadCommittedEntry.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -39,7 +39,6 @@
    protected byte flags = 0;
    private long lifespan;
 
-
    protected ReadCommittedEntry() {
       setValid(true);
    }
@@ -51,6 +50,14 @@
       this.lifespan = lifespan;
    }
 
+   public long getLifespan() {
+      return lifespan;
+   }
+
+   public void setLifespan(long lifespan) {
+      this.lifespan = lifespan;
+   }
+
    public Object getKey() {
       return key;
    }

Modified: core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -22,6 +22,7 @@
 package org.horizon.container;
 
 import org.horizon.factories.annotations.Inject;
+import org.horizon.factories.annotations.Stop;
 import org.horizon.loader.CacheLoaderManager;
 import org.horizon.loader.CacheStore;
 import org.horizon.loader.StoredEntry;
@@ -172,6 +173,7 @@
       return immortalData.size() + expirableData.size();
    }
 
+   @Stop(priority = 999)
    public void clear() {
       immortalData.clear();
       expirableData.clear();
@@ -205,6 +207,10 @@
          return new StoredEntry(key, immortal.getValue());
       ExpirableCachedValue ecv = expirableData.get(key);
       if (ecv == null) return null;
+      if (ecv.isExpired()) {
+         expirableData.remove(key);
+         return null;
+      }
       return new StoredEntry(key, ecv.getValue(), ecv.getCreatedTime(), ecv.getExpiryTime());
    }
 

Modified: core/branches/flat/src/main/java/org/horizon/factories/EntryFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/EntryFactory.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/factories/EntryFactory.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -32,7 +32,7 @@
  * @since 1.0
  */
 public interface EntryFactory {
-   void releaseLock(Object key);
+   void releaseLock(InvocationContext ctx, Object key);
 
    /**
     * Attempts to lock a node if the lock isn't already held in the current scope, and records the lock in the context.
@@ -47,11 +47,9 @@
     */
    boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException;
 
-   MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean createIfAbsent, boolean forceLockIfAbsent, long lifespan) throws InterruptedException;
+   MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean createIfAbsent, boolean forceLockIfAbsent) throws InterruptedException;
 
    MVCCEntry wrapEntryForReading(InvocationContext ctx, Object key, boolean putInContext, boolean forceWriteLock) throws InterruptedException;
 
    MVCCEntry wrapEntryForReading(InvocationContext ctx, Object key, boolean putInContext) throws InterruptedException;
-
-   MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert, long lifespan);
 }

Modified: core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/factories/EntryFactoryImpl.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -31,6 +31,7 @@
 import org.horizon.factories.annotations.Inject;
 import org.horizon.factories.annotations.Start;
 import org.horizon.invocation.Options;
+import org.horizon.loader.StoredEntry;
 import org.horizon.lock.IsolationLevel;
 import org.horizon.lock.LockManager;
 import org.horizon.lock.TimeoutException;
@@ -67,8 +68,7 @@
    public MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert, long lifespan) {
       if (value == null && !isForInsert) return useRepeatableRead ? NULL_MARKER : null;
 
-      MVCCEntry mvccEntry = useRepeatableRead ? new RepeatableReadEntry(key, value, lifespan) : new ReadCommittedEntry(key, value, lifespan);
-      return mvccEntry;
+      return useRepeatableRead ? new RepeatableReadEntry(key, value, lifespan) : new ReadCommittedEntry(key, value, lifespan);
    }
 
    public MVCCEntry wrapEntryForReading(InvocationContext ctx, Object key, boolean putInContext) throws InterruptedException {
@@ -82,12 +82,14 @@
       MVCCEntry mvccEntry;
       if (forceWriteLock) {
          if (trace) log.trace("Forcing lock on reading");
-         return wrapEntryForWriting(ctx, key, false, false, -1);
+         return wrapEntryForWriting(ctx, key, false, false);
       } else if ((mvccEntry = ctx.lookupEntry(key)) == null) {
          if (trace) log.trace("Key " + key + " is not in context, fetching from container.");
          // simple implementation.  Peek the node, wrap it, put wrapped node in the context.
-         Object value = container.get(key);
-         mvccEntry = createWrappedEntry(key, value, false, -1);
+         StoredEntry se = container.createEntryForStorage(key);
+         mvccEntry = se == null ?
+               createWrappedEntry(key, null, false, -1) :
+               createWrappedEntry(key, se.getValue(), false, se.getLifespan());
          if (mvccEntry != null && putInContext) ctx.putLookedUpEntry(key, mvccEntry);
          return mvccEntry;
       } else {
@@ -96,7 +98,7 @@
       }
    }
 
-   public MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean createIfAbsent, boolean forceLockIfAbsent, long lifespan) throws InterruptedException {
+   public MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean createIfAbsent, boolean forceLockIfAbsent) throws InterruptedException {
       MVCCEntry mvccEntry = ctx.lookupEntry(key);
       if (createIfAbsent && mvccEntry != null && mvccEntry.isNullEntry()) mvccEntry = null;
       if (mvccEntry != null) // exists in context!  Just acquire lock if needed, and wrap.
@@ -114,14 +116,14 @@
          }
       } else {
          // else, fetch from dataContainer.
-         Object value = container.get(key);
-         if (value != null) {
+         StoredEntry storedEntry = container.createEntryForStorage(key);
+         if (storedEntry != null) {
             if (trace) log.trace("Retrieved from container.");
             // exists in cache!  Just acquire lock if needed, and wrap.
             // do we need a lock?
             boolean needToCopy = false;
             if (acquireLock(ctx, key)) needToCopy = true;
-            mvccEntry = createWrappedEntry(key, value, false, lifespan);
+            mvccEntry = createWrappedEntry(key, storedEntry.getValue(), false, storedEntry.getLifespan());
             ctx.putLookedUpEntry(key, mvccEntry);
             if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
          } else if (createIfAbsent) {
@@ -130,7 +132,7 @@
             // now to lock and create the node.  Lock first to prevent concurrent creation!
             acquireLock(ctx, key);
             notifier.notifyCacheEntryCreated(key, true, ctx);
-            mvccEntry = createWrappedEntry(key, value, true, lifespan);
+            mvccEntry = createWrappedEntry(key, null, true, -1);
             mvccEntry.setCreated(true);
             ctx.putLookedUpEntry(key, mvccEntry);
             mvccEntry.copyForUpdate(container, writeSkewCheck);
@@ -166,7 +168,7 @@
             ctx.addKeyLocked(key);
          } else if (!lockManager.lockAndRecord(key, ctx)) {
             Object owner = lockManager.getOwner(key);
-            throw new TimeoutException("Unable to acquire lock on key [" + key + "] after [" + getLockAcquisitionTImeout(ctx)
+            throw new TimeoutException("Unable to acquire lock on key [" + key + "] after [" + getLockAcquisitionTimeout(ctx)
                   + "] milliseconds for requestor [" + lockManager.getLockOwner(ctx) + "]! Lock held by [" + owner + "]");
          }
          return true;
@@ -174,12 +176,13 @@
       return false;
    }
 
-   private long getLockAcquisitionTImeout(InvocationContext ctx) {
+   private long getLockAcquisitionTimeout(InvocationContext ctx) {
       return ctx.hasOption(Options.ZERO_LOCK_ACQUISITION_TIMEOUT) ?
             0 : configuration.getLockAcquisitionTimeout();
    }
 
-   public void releaseLock(Object key) {
+   public void releaseLock(InvocationContext ctx, Object key) {
       lockManager.unlock(key, lockManager.getOwner(key));
+      ctx.removeKeyLocked(key);
    }
 }

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -37,6 +37,7 @@
 import org.horizon.interceptors.base.JmxStatsCommandInterceptor;
 import org.horizon.loader.CacheLoader;
 import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.StoredEntry;
 import org.horizon.notifications.cachelistener.CacheNotifier;
 import org.horizon.transaction.TransactionTable;
 
@@ -86,97 +87,82 @@
 
    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
-      if (command.getKey() != null) {
-         loadIfNeeded(ctx, command.getKey());
-      }
+      if (command.getKey() != null) loadIfNeeded(ctx, command.getKey());
       return invokeNextInterceptor(ctx, command);
    }
 
 
    @Override
    public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
-      if (command.getKey() != null) {
-         loadIfNeeded(ctx, command.getKey());
-      }
+      if (command.getKey() != null) loadIfNeeded(ctx, command.getKey());
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
-      if (command.getKeys() != null) {
+      if (command.getKeys() != null)
          for (Object key : command.getKeys()) loadIfNeeded(ctx, key);
-      }
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
-      if (command.getKey() != null) {
-         loadIfNeeded(ctx, command.getKey());
-      }
+      if (command.getKey() != null) loadIfNeeded(ctx, command.getKey());
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
-      if (command.getKey() != null) {
-         loadIfNeeded(ctx, command.getKey());
-      }
+      if (command.getKey() != null) loadIfNeeded(ctx, command.getKey());
       return invokeNextInterceptor(ctx, command);
    }
 
    private void loadIfNeeded(InvocationContext ctx, Object key) throws Throwable {
-      if (dataContainer.containsKey(key) || !loader.containsKey(key))
+      if (!loader.containsKey(key)) {
+         log.trace("No need to load.  Key doesn't exist in the loader.");
          return;
+      }
 
       // Obtain a temporary lock to verify the key is not being concurrently added
       boolean release = entryFactory.acquireLock(ctx, key);
       if (dataContainer.containsKey(key)) {
-         if (release)
-            entryFactory.releaseLock(key);
+         if (release) entryFactory.releaseLock(ctx, key);
+         log.trace("No need to load.  Key exists in the data container.");
          return;
       }
 
       // Reuse the lock and create a new entry for loading
-      MVCCEntry n = entryFactory.wrapEntryForWriting(ctx, key, true, false, -1); // TODO: handle expiry information from loaded data
+      MVCCEntry n = entryFactory.wrapEntryForWriting(ctx, key, true, false);
       n = loadEntry(ctx, key, n);
    }
 
    /**
-    * Loads a node from disk; if it exists creates parent TreeNodes. If it doesn't exist on disk but in memory, clears
-    * the uninitialized flag, otherwise returns null.
+    * Loads a node from loader
     */
    private MVCCEntry loadEntry(InvocationContext ctx, Object key, MVCCEntry entry) throws Exception {
-      if (trace) log.trace("loading entry " + key + " entry is " + entry);
+      log.trace("Loading key {0}", key);
 
-      Object value = loader.load(key);
-      boolean nodeExists = (value != null);
-      if (trace) log.trace("nodeExists " + nodeExists);
+      StoredEntry storedEntry = loader.load(key);
+      boolean entryExists = (storedEntry != null);
+      log.trace("Entry exists in loader? " + entryExists);
 
       if (getStatisticsEnabled()) {
-         if (nodeExists) {
+         if (entryExists) {
             cacheLoads++;
          } else {
             cacheMisses++;
          }
       }
 
-      if (value != null) {
-         if (trace) log.trace("Entry is not null, loading");
-//         notifier.notifyNodeLoaded(fqn, true, Collections.emptyMap(), ctx);
-//         if (isActivation)
-//         {
-//            notifier.notifyNodeActivated(fqn, true, Collections.emptyMap(), ctx);
-//         }
-
-         entry.setValue(value);
+      if (entryExists) {
+         notifier.notifyCacheEntryLoaded(key, true, ctx);
+         if (isActivation) notifier.notifyCacheEntryActivated(key, true, ctx);
+         entry.setValue(storedEntry.getValue());
+         entry.setLifespan(storedEntry.getLifespan());
          entry.setValid(true);
 
-//         notifier.notifyNodeLoaded(fqn, false, nodeData, ctx);
-//         if (isActivation)
-//         {
-//            notifier.notifyNodeActivated(fqn, false, nodeData, ctx);
-//         }
+         notifier.notifyCacheEntryLoaded(key, false, ctx);
+         if (isActivation) notifier.notifyCacheEntryActivated(key, false, ctx);
       }
 
       return entry;

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -30,8 +30,9 @@
 import org.horizon.commands.write.PutKeyValueCommand;
 import org.horizon.commands.write.PutMapCommand;
 import org.horizon.commands.write.RemoveCommand;
+import org.horizon.commands.write.ReplaceCommand;
 import org.horizon.config.CacheLoaderManagerConfig;
-import org.horizon.container.DataContainer;
+import org.horizon.container.MVCCEntry;
 import org.horizon.context.InvocationContext;
 import org.horizon.context.TransactionContext;
 import org.horizon.factories.annotations.Inject;
@@ -51,6 +52,7 @@
 import org.horizon.transaction.GlobalTransaction;
 
 import javax.transaction.SystemException;
+import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -70,13 +72,12 @@
 public class CacheStoreInterceptor extends JmxStatsCommandInterceptor {
    private CacheLoaderManagerConfig loaderConfig = null;
    private TransactionManager txMgr = null;
-   private HashMap<GlobalTransaction, Integer> txStores = new HashMap<GlobalTransaction, Integer>();
-   private Map<GlobalTransaction, Set<Object>> preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Object>>();
+   private HashMap<Transaction, Integer> txStores = new HashMap<Transaction, Integer>();
+   private Map<Transaction, Set<Object>> preparingTxs = new ConcurrentHashMap<Transaction, Set<Object>>();
    private long cacheStores = 0;
    CacheStore store;
    private CacheLoaderManager loaderManager;
    private boolean statsEnabled;
-   private DataContainer dataContainer;
 
    public CacheStoreInterceptor() {
       log = LogFactory.getLog(getClass());
@@ -84,26 +85,23 @@
    }
 
    @Inject
-   protected void init(CacheLoaderManager loaderManager, TransactionManager txManager, CacheLoaderManagerConfig clConfig, DataContainer dataContainer) {
-      // never inject a CacheLoaderOld at this stage - only a CacheLoaderManager, since the CacheLoaderManager only creates a CacheLoaderOld instance when it @Starts.
+   protected void init(CacheLoaderManager loaderManager, TransactionManager txManager) {
       this.loaderManager = loaderManager;
-      this.loaderConfig = clConfig;
       txMgr = txManager;
-      this.dataContainer = dataContainer;
    }
 
    @Start
    protected void start() {
-      // this should only happen after the CacheLoaderManager has started, since the CacheLoaderManager only creates the CacheLoaderOld instance in its @Start method.
       store = loaderManager.getCacheStore();
       this.setStatisticsEnabled(configuration.isExposeManagementStatistics());
+      loaderConfig = configuration.getCacheLoaderConfig();
    }
 
    /**
     * if this is a shared cache loader and the call is of remote origin, pass up the chain
     */
    public final boolean skip(InvocationContext ctx, VisitableCommand command) {
-      if (store == null) return true;
+      if (store == null) return true;  // could be because the cache loader oes not implement cache store
       if ((!ctx.isOriginLocal() && loaderConfig.isShared()) || ctx.hasOption(Options.SKIP_CACHE_STORE)) {
          if (trace)
             log.trace("Passing up method call and bypassing this interceptor since the cache loader is shared and this call originated remotely.");
@@ -117,24 +115,21 @@
       if (!skip(ctx, command) && inTransaction()) {
          if (ctx.getTransactionContext().hasAnyModifications()) {
             // this is a commit call.
-            GlobalTransaction gtx = command.getGlobalTransaction();
-            if (trace) log.trace("Calling loader.commit() for gtx " + gtx);
-            // sync call (a write) on the loaderold
-            // ignore modified FQNs
-            // List fqnsModified = getFqnsFromModificationList(txTable.get(globalTransaction).getCacheLoaderModifications());
+            Transaction tx = ctx.getTransaction();
+            log.trace("Calling loader.commit() for transaction {0}", tx);
             try {
-//               store.commit(gtx);
+               store.commit(tx);
             }
             catch (Throwable t) {
-               preparingTxs.remove(gtx);
+               preparingTxs.remove(tx);
                throw t;
             }
             if (getStatisticsEnabled()) {
-               Integer puts = (Integer) txStores.get(gtx);
+               Integer puts = txStores.get(tx);
                if (puts != null) {
                   cacheStores = cacheStores + puts;
                }
-               txStores.remove(gtx);
+               txStores.remove(tx);
             }
             return invokeNextInterceptor(ctx, command);
          } else {
@@ -149,13 +144,13 @@
       if (!skip(ctx, command) && inTransaction()) {
          if (trace) log.trace("transactional so don't put stuff in the cloader yet.");
          if (ctx.getTransactionContext().hasAnyModifications()) {
-            GlobalTransaction gtx = command.getGlobalTransaction();
+            Transaction tx = ctx.getTransaction();
             // this is a rollback method
-            if (preparingTxs.containsKey(gtx)) {
-               preparingTxs.remove(gtx);
-//               store.rollback(gtx);
+            if (preparingTxs.containsKey(tx)) {
+               preparingTxs.remove(tx);
+               store.rollback(tx);
             }
-            if (getStatisticsEnabled()) txStores.remove(gtx);
+            if (getStatisticsEnabled()) txStores.remove(tx);
          } else {
             if (trace) log.trace("Rollback called with no modifications; ignoring.");
          }
@@ -167,17 +162,27 @@
    public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable {
       if (!skip(ctx, command) && inTransaction()) {
          if (trace) log.trace("transactional so don't put stuff in the cloader yet.");
-         prepareCacheLoader(command.getGlobalTransaction(), ctx.getTransactionContext(), command.isOnePhaseCommit());
+         prepareCacheLoader(ctx, command.getGlobalTransaction(), ctx.getTransactionContext(), command.isOnePhaseCommit());
       }
       return invokeNextInterceptor(ctx, command);
    }
 
    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
+      Object retval = invokeNextInterceptor(ctx, command);
+      if (!skip(ctx, command) && !inTransaction() && command.isSuccessful()) {
+         Object key = command.getKey();
+         boolean resp = store.remove(key);
+         log.trace("Removed entry under key {0} and got response {1} from CacheStore", key, resp);
+      }
+      return retval;
+   }
+
+   @Override
+   public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
       if (!skip(ctx, command) && !inTransaction()) {
-         Object returnValue = store.remove(command.getKey());
-         invokeNextInterceptor(ctx, command);
-         return returnValue;
+         store.clear();
+         log.trace("Cleared cache store");
       }
       return invokeNextInterceptor(ctx, command);
    }
@@ -185,66 +190,72 @@
    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
       Object returnValue = invokeNextInterceptor(ctx, command);
-      if (skip(ctx, command) || inTransaction())
-         return returnValue;
+      if (skip(ctx, command) || inTransaction() || !command.isSuccessful()) return returnValue;
 
-      store.store((StoredEntry) null);//command.getKey(), command.getValue());
+      Object key = command.getKey();
+      StoredEntry se = getStoredEntry(key, ctx);
+      store.store(se);
+      log.trace("Stored entry {0} under key {1}", se, key);
       if (getStatisticsEnabled()) cacheStores++;
 
       return returnValue;
    }
 
    @Override
-   public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
+   public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
       Object returnValue = invokeNextInterceptor(ctx, command);
-      if (skip(ctx, command) || inTransaction())
-         return returnValue;
+      if (skip(ctx, command) || inTransaction() || !command.isSuccessful()) return returnValue;
 
-      // Perhaps this should be optimized
-      Map<Object, Object> map = command.getMap();
-      List<Modification> modifications = toModifications(map);
-//      store.put(modifications);
-
+      Object key = command.getKey();
+      StoredEntry se = getStoredEntry(key, ctx);
+      store.store(se);
+      log.trace("Stored entry {0} under key {1}", se, key);
       if (getStatisticsEnabled()) cacheStores++;
 
       return returnValue;
    }
 
-   private static List<Modification> toModifications(Map<Object, Object> map) {
-      List<Modification> modifications = new ArrayList<Modification>(map.size());
-      // TODO fix me
-//      for (Map.Entry<Object, Object> entry : map.entrySet())
-//         modifications.add(new Modification(ModificationType.PUT, entry.getKey(), entry.getValue()));
-      return modifications;
+   @Override
+   public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
+      Object returnValue = invokeNextInterceptor(ctx, command);
+      if (skip(ctx, command) || inTransaction()) return returnValue;
+
+      Map<Object, Object> map = command.getMap();
+      for (Object key : map.keySet()) {
+         StoredEntry se = getStoredEntry(key, ctx);
+         store.store(se);
+         log.trace("Stored entry {0} under key {1}", se, key);
+      }
+      if (getStatisticsEnabled()) cacheStores += map.size();
+      return returnValue;
    }
 
    private boolean inTransaction() throws SystemException {
       return txMgr != null && txMgr.getTransaction() != null;
    }
 
-   private void prepareCacheLoader(GlobalTransaction gtx, TransactionContext transactionContext, boolean onePhase) throws Throwable {
+   private void prepareCacheLoader(InvocationContext ctx, GlobalTransaction gtx, TransactionContext transactionContext, boolean onePhase) throws Throwable {
       if (transactionContext == null) {
          throw new Exception("transactionContext for transaction " + gtx + " not found in transaction table");
       }
       List<VisitableCommand> modifications = transactionContext.getModifications();
       if (modifications.size() == 0) {
-         if (trace) log.trace("Transaction has not logged any modifications!");
+         log.trace("Transaction has not logged any modifications!");
          return;
       }
-      if (trace) log.trace("Cache loader modification list: " + modifications);
+      log.trace("Cache loader modification list: {0}", modifications);
       StoreModificationsBuilder modsBuilder = new StoreModificationsBuilder(getStatisticsEnabled());
-      for (VisitableCommand cacheCommand : modifications) {
-         cacheCommand.acceptVisitor(null, modsBuilder);
-      }
-      if (trace) {
-         log.trace("Converted method calls to cache loader modifications.  List size: " + modsBuilder.modifications.size());
-      }
-      if (modsBuilder.modifications.size() > 0) {
-//         loader.prepare(gtx, modsBuilder.modifications, onePhase);
+      for (VisitableCommand cacheCommand : modifications) cacheCommand.acceptVisitor(ctx, modsBuilder);
+      int numMods = modsBuilder.modifications.size();
+      log.trace("Converted method calls to cache loader modifications.  List size: {0}", numMods);
 
-         preparingTxs.put(gtx, modsBuilder.affectedKeys);
+      if (numMods > 0) {
+         Transaction tx = transactionContext.getTransaction();
+         store.prepare(modsBuilder.modifications, tx, onePhase);
+
+         preparingTxs.put(tx, modsBuilder.affectedKeys);
          if (getStatisticsEnabled() && modsBuilder.putCount > 0) {
-            txStores.put(gtx, modsBuilder.putCount);
+            txStores.put(tx, modsBuilder.putCount);
          }
       }
    }
@@ -267,7 +278,7 @@
       @SuppressWarnings("unchecked")
       public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
          if (generateStatistics) putCount++;
-         modifications.add(new Store(dataContainer.createEntryForStorage(command.getKey())));
+         modifications.add(new Store(getStoredEntry(command.getKey(), ctx)));
          affectedKeys.add(command.getKey());
          return null;
       }
@@ -278,8 +289,7 @@
          Map<Object, Object> map = command.getMap();
          if (generateStatistics) putCount += map.size();
          affectedKeys.addAll(map.keySet());
-         Set<StoredEntry> entries = new HashSet<StoredEntry>();
-         for (Object key : map.keySet()) modifications.add(new Store(dataContainer.createEntryForStorage(key)));
+         for (Object key : map.keySet()) modifications.add(new Store(getStoredEntry(key, ctx)));
          return null;
       }
 
@@ -326,4 +336,10 @@
       return cacheStores;
    }
 
+   private StoredEntry getStoredEntry(Object key, InvocationContext ctx) {
+      MVCCEntry entry = ctx.lookupEntry(key);
+      StoredEntry se = new StoredEntry(key, entry.getValue());
+      se.setLifespan(entry.getLifespan());
+      return se;
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/LockingInterceptor.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -133,7 +133,7 @@
    public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {
       try {
          // get a snapshot of all keys in the data container
-         for (Object key : dataContainer.keySet()) entryFactory.wrapEntryForWriting(ctx, key, false, false, -1);
+         for (Object key : dataContainer.keySet()) entryFactory.wrapEntryForWriting(ctx, key, false, false);
 
          return invokeNextInterceptor(ctx, command);
       }
@@ -152,7 +152,7 @@
    public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable {
       try {
          if (command.getKeys() != null) {
-            for (Object key : command.getKeys()) entryFactory.wrapEntryForWriting(ctx, key, false, true, -1);
+            for (Object key : command.getKeys()) entryFactory.wrapEntryForWriting(ctx, key, false, true);
          }
          return invokeNextInterceptor(ctx, command);
       }
@@ -164,7 +164,7 @@
    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
       try {
-         entryFactory.wrapEntryForWriting(ctx, command.getKey(), true, false, command.getLifespanMillis());
+         entryFactory.wrapEntryForWriting(ctx, command.getKey(), true, false);
          return invokeNextInterceptor(ctx, command);
       }
       finally {
@@ -176,7 +176,7 @@
    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
       try {
          for (Object key : command.getMap().keySet()) {
-            entryFactory.wrapEntryForWriting(ctx, key, true, false, command.getLifespanMillis());
+            entryFactory.wrapEntryForWriting(ctx, key, true, false);
          }
          return invokeNextInterceptor(ctx, command);
       }
@@ -188,7 +188,7 @@
    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
       try {
-         entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true, -1);
+         entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true);
          return invokeNextInterceptor(ctx, command);
       }
       finally {
@@ -199,7 +199,7 @@
    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
       try {
-         entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true, command.getLifespanMillis());
+         entryFactory.wrapEntryForWriting(ctx, command.getKey(), false, true);
          return invokeNextInterceptor(ctx, command);
       }
       finally {

Modified: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderConfig.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -9,8 +9,8 @@
    private boolean ignoreModifications;
    private boolean fetchPersistentState;
    private boolean purgeOnStartup;
-   private SingletonStoreConfig singletonStoreConfig;
-   private AsyncStoreConfig asyncStoreConfig;
+   private SingletonStoreConfig singletonStoreConfig = new SingletonStoreConfig();
+   private AsyncStoreConfig asyncStoreConfig = new AsyncStoreConfig();
 
    public boolean isPurgeOnStartup() {
       return purgeOnStartup;
@@ -113,6 +113,7 @@
          throw new RuntimeException("Should not happen!", e);
       }
       if (singletonStoreConfig != null) clone.setSingletonStoreConfig(singletonStoreConfig.clone());
+      if (asyncStoreConfig != null) clone.setAsyncStoreConfig((AsyncStoreConfig) asyncStoreConfig.clone());
       return clone;
    }
 }

Modified: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -17,9 +17,9 @@
  */
 public abstract class AbstractCacheStore extends AbstractCacheLoader implements CacheStore {
 
-   private final Map<Transaction, List<Modification>> transactions = new ConcurrentHashMap<Transaction, List<Modification>>();
+   private final Map<Transaction, List<? extends Modification>> transactions = new ConcurrentHashMap<Transaction, List<? extends Modification>>();
 
-   protected void applyModifications(List<Modification> mods) {
+   protected void applyModifications(List<? extends Modification> mods) {
       for (Modification m : mods) {
          switch (m.getType()) {
             case STORE:
@@ -39,7 +39,7 @@
       }
    }
 
-   public void prepare(List<Modification> mods, Transaction tx, boolean isOnePhase) {
+   public void prepare(List<? extends Modification> mods, Transaction tx, boolean isOnePhase) {
       if (isOnePhase) {
          applyModifications(mods);
       } else {
@@ -52,7 +52,7 @@
    }
 
    public void commit(Transaction tx) {
-      List<Modification> list = transactions.remove(tx);
+      List<? extends Modification> list = transactions.remove(tx);
       if (list != null && !list.isEmpty()) applyModifications(list);
    }
 }

Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -181,7 +181,7 @@
             // singleton?
             SingletonStoreConfig ssc = cfg.getSingletonStoreConfig();
             if (ssc != null && ssc.isSingletonStoreEnabled()) {
-               tmpStore = new SingletonStore(tmpStore);
+               tmpStore = new SingletonStore(tmpStore, cache, ssc);
                tmpLoader = tmpStore;
             }
          }

Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -73,7 +73,7 @@
     * @param isOnePhase    if true, there will not be a commit or rollback phase and changes should be flushed
     *                      immediately
     */
-   void prepare(List<Modification> modifications, Transaction tx, boolean isOnePhase);
+   void prepare(List<? extends Modification> modifications, Transaction tx, boolean isOnePhase);
 
    /**
     * Commits a transaction that has been previously prepared

Modified: core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -19,7 +19,7 @@
    }
 
    public StoredEntry(Object key, Object value) {
-      super(value, -1, -1);
+      super(value, System.currentTimeMillis(), -1);
       this.key = key;
    }
 
@@ -58,6 +58,9 @@
    public String toString() {
       return "StoredEntry{" +
             "key=" + key +
+            ", value=" + value +
+            ", createdTime=" + createdTime +
+            ", expiryTime=" + expiryTime +
             '}';
    }
 

Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -5,6 +5,7 @@
 import org.horizon.loader.CacheLoaderConfig;
 import org.horizon.loader.CacheStore;
 import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Modification;
 import org.horizon.marshall.Marshaller;
 
 import javax.transaction.Transaction;
@@ -69,7 +70,7 @@
       delegate.rollback(tx);
    }
 
-   public void prepare(List list, Transaction tx, boolean isOnePhase) {
+   public void prepare(List<? extends Modification> list, Transaction tx, boolean isOnePhase) {
       delegate.prepare(list, tx, isOnePhase);
    }
 

Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/AsyncStoreConfig.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -60,4 +60,13 @@
       this.threadPoolSize = threadPoolSize;
    }
 
+   @Override
+   public AsyncStoreConfig clone() {
+      try {
+         return (AsyncStoreConfig) super.clone();
+      } catch (CloneNotSupportedException e) {
+         throw new RuntimeException("Should not happen!", e);
+      }
+   }
+
 }

Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -2,6 +2,7 @@
 
 import org.horizon.loader.CacheStore;
 import org.horizon.loader.StoredEntry;
+import org.horizon.loader.modifications.Modification;
 
 import javax.transaction.Transaction;
 import java.io.InputStream;
@@ -56,7 +57,7 @@
    }
 
    @Override
-   public void prepare(List list, Transaction tx, boolean isOnePhase) {
+   public void prepare(List<? extends Modification> list, Transaction tx, boolean isOnePhase) {
       // no-op
    }
 }

Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -1,98 +1,356 @@
 package org.horizon.loader.decorators;
 
 import org.horizon.Cache;
-import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.container.DataContainer;
 import org.horizon.loader.CacheStore;
 import org.horizon.loader.StoredEntry;
-import org.horizon.marshall.Marshaller;
+import org.horizon.loader.modifications.Modification;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.manager.CacheManager;
+import org.horizon.notifications.Listener;
+import org.horizon.notifications.cachemanagerlistener.annotation.CacheStarted;
+import org.horizon.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.horizon.notifications.cachemanagerlistener.event.Event;
+import org.horizon.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.horizon.remoting.transport.Address;
 
 import javax.transaction.Transaction;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
- * // TODO: Manik: Document this!
+ * SingletonStore is a delegating cache store used for situations when only one instance should interact with the
+ * underlying store. The coordinator of the cluster will be responsible for the underlying CacheStore.
+ * <p/>
+ * SingletonStore is a simply facade to a real CacheStore implementation. It always delegates reads to the real
+ * CacheStore.
+ * <p/>
+ * Writes are delegated <i>only if,/i> this SingletonStore is currently the cordinator. This avoids having all stores in
+ * a cluster writing the same data to the same underlying store. Although not incorrect (e.g. a DB will just discard
+ * additional INSERTs for the same key, and throw an exception), this will avoid a lot of redundant work.
+ * <p/>
+ * Whenever the current coordinator dies (or leaves), the second in line will take over. That SingletonStore will then
+ * pass writes through to its underlying CacheStore. Optionally, when a new coordinator takes over the Singleton, it can
+ * push the in-memory state to the cache cacheStore, within a time constraint.
  *
+ * @author Bela Ban
+ * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
  * @author Manik Surtani
+ * @since 1.0
  */
 public class SingletonStore extends AbstractDelegatingStore {
-   public SingletonStore(CacheStore delegate) {
+   private static final Log log = LogFactory.getLog(SingletonStore.class);
+   private static final boolean trace = log.isTraceEnabled();
+
+   CacheManager cacheManager;
+   Cache cache;
+   SingletonStoreConfig config;
+
+   /**
+    * Name of thread that should pushing in-memory state to cache loader.
+    */
+   private static final String THREAD_NAME = "SingletonStorePusherThread";
+
+   /**
+    * Executor service used to submit tasks to push in-memory state.
+    */
+   private final ExecutorService executor;
+
+   /**
+    * Future result of the in-memory push state task. This allows SingletonStore to check whether there's any push taks
+    * on going.
+    */
+   Future<?> pushStateFuture; /* FutureTask guarantess a safe publication of the result */
+
+   /**
+    * Address instance that allows SingletonStore to find out whether it became the coordinator of the cluster, or
+    * whether it stopped being it. This dictates whether the SingletonStore is active or not.
+    */
+   private Address localAddress;
+
+   /**
+    * Whether the the current node is the coordinator and therefore SingletonStore is active. Being active means
+    * delegating calls to the underlying cache loader.
+    */
+   private volatile boolean active;
+
+
+   public SingletonStore(CacheStore delegate, Cache cache, SingletonStoreConfig config) {
       super(delegate);
-   }
+      this.cacheManager = cache == null ? null : cache.getCacheManager();
+      this.cache = cache;
+      this.config = config;
 
-   public void store(StoredEntry ed) {
-      // TODO: Manik: Customise this generated block
-   }
+      executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+         public Thread newThread(Runnable r) {
+            return new Thread(r, THREAD_NAME);
+         }
+      });
 
-   public void storeAll(Collection ed) {
-      // TODO: Manik: Customise this generated block
    }
 
-   public void store(InputStream inputStream) {
-      // TODO: Manik: Customise this generated block
+   // -------------- Basic write methods
+   // only delegate if the current instance is active
+
+   @Override
+   public void store(StoredEntry ed) {
+      if (active) {
+         if (trace) log.trace("Storing key {0}.  Instance: {1}", ed.getKey(), this);
+         super.store(ed);
+      } else if (trace) log.trace("Not storing key {0}.  Instance: {1}", ed.getKey(), this);
    }
 
-   public void load(OutputStream outputStream) throws IOException {
-      // TODO: Manik: Customise this generated block
+   @Override
+   public void store(InputStream inputStream) throws IOException, ClassNotFoundException {
+      if (active) super.store(inputStream);
    }
 
+   @Override
    public void clear() {
-      // TODO: Manik: Customise this generated block
+      if (active) super.clear();
    }
 
+   @Override
    public boolean remove(Object key) {
-      return false;  // TODO: Manik: Customise this generated block
+      return active && super.remove(key);
    }
 
+   @Override
    public void purgeExpired() {
-      // TODO: Manik: Customise this generated block
+      if (active) super.purgeExpired();
    }
 
+   @Override
    public void commit(Transaction tx) {
-      // TODO: Manik: Customise this generated block
+      if (active) super.commit(tx);
    }
 
+   @Override
    public void rollback(Transaction tx) {
-      // TODO: Manik: Customise this generated block
+      if (active) super.rollback(tx);
    }
 
-   public void prepare(List list, Transaction tx, boolean isOnePhase) {
-      // TODO: Manik: Customise this generated block
+   @Override
+   public void prepare(List<? extends Modification> list, Transaction tx, boolean isOnePhase) {
+      if (active) super.prepare(list, tx, isOnePhase);
    }
 
-   public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
-      // TODO: Manik: Customise this generated block
+   @Override
+   public void start() {
+      cacheManager.addListener(new SingletonStoreListener());
+      super.start();
    }
 
-   public StoredEntry load(Object key) {
-      return null;  // TODO: Manik: Customise this generated block
+   /**
+    * Factory method for the creation of a Callable task in charge of pushing in-memory state to cache loader.
+    *
+    * @return new instance of Callable<?> whose call() method either throws an exception or returns null if the task was
+    *         successfull.
+    */
+   protected Callable<?> createPushStateTask() {
+      return new Callable() {
+         public Object call() throws Exception {
+            final boolean debugEnabled = log.isDebugEnabled();
+
+            if (debugEnabled) log.debug("start pushing in-memory state to cache cacheLoader");
+            pushState(cache);
+            if (debugEnabled) log.debug("in-memory state passed to cache cacheLoader successfully");
+
+            return null;
+         }
+      };
    }
 
-   public Set loadAll(Collection keys) {
-      return null;  // TODO: Manik: Customise this generated block
+   /**
+    * Pushes the state of a specific node by reading the node's data from the cache and putting in the cache store via
+    * the cache loader. This method is call recursively so that it iterates through the whole cache.
+    *
+    * @throws Exception if there's any issues reading the data from the cache or pushing the node's data to the cache
+    *                   loader.
+    */
+   protected void pushState(final Cache cache) throws Exception {
+      DataContainer dc = cache.getAdvancedCache().getDataContainer();
+      Set keys = dc.keySet();
+      StoredEntry entry;
+      for (Object k : keys) if ((entry = dc.createEntryForStorage(k)) != null) store(entry);
    }
 
-   public Set loadAll() {
-      return null;  // TODO: Manik: Customise this generated block
+
+   /**
+    * Method that waits for the in-memory to cache loader state to finish. This method's called in case a push state is
+    * already in progress and we need to wait for it to finish.
+    *
+    * @param future  instance of Future representing the on going push task
+    * @param timeout time to wait for the push task to finish
+    * @param unit    instance of TimeUnit representing the unit of timeout
+    */
+   protected void awaitForPushToFinish(Future future, long timeout, TimeUnit unit) {
+      final boolean debugEnabled = log.isDebugEnabled();
+      try {
+         if (debugEnabled) log.debug("wait for state push to cache loader to finish");
+         future.get(timeout, unit);
+      }
+      catch (TimeoutException e) {
+         if (debugEnabled) log.debug("timed out waiting for state push to cache loader to finish");
+      }
+      catch (ExecutionException e) {
+         if (debugEnabled) log.debug("exception reported waiting for state push to cache loader to finish");
+      }
+      catch (InterruptedException ie) {
+         /* Re-assert the thread's interrupted status */
+         Thread.currentThread().interrupt();
+         if (trace) log.trace("wait for state push to cache loader to finish was interrupted");
+      }
    }
 
-   public boolean containsKey(Object key) {
-      return false;  // TODO: Manik: Customise this generated block
+
+   /**
+    * Method called when the node either becomes the coordinator or stops being the coordinator. If it becomes the
+    * coordinator, it can optionally start the in-memory state transfer to the underlying cache store.
+    *
+    * @param newActiveState true if the node just became the coordinator, false if the nodes stopped being the
+    *                       coordinator.
+    */
+   protected void activeStatusChanged(boolean newActiveState) throws PushStateException {
+      active = newActiveState;
+      log.debug("changed mode {0}", this);
+      if (active && config.isPushStateWhenCoordinator()) doPushState();
    }
 
-   public Class<? extends CacheLoaderConfig> getConfigurationClass() {
-      return null;
+   /**
+    * Indicates whether the current nodes is the coordinator of the cluster.  This implementation assumes that the
+    * coordinator is the first member in the list.
+    *
+    * @param newView View instance containing the new view of the cluster
+    * @return whether the current node is the coordinator or not.
+    */
+   private boolean isCoordinator(List<Address> newView, Address currentAddress) {
+      if (!currentAddress.equals(localAddress)) localAddress = currentAddress;
+      if (localAddress != null) {
+         return newView.size() > 0 && localAddress.equals(newView.get(0));
+      } else {
+         /* Invalid new view, so previous value returned */
+         return active;
+      }
    }
 
-   public void start() {
-      // TODO: Manik: Customise this generated block
+   /**
+    * Called when the SingletonStore discovers that the node has become the coordinator and push in memory state has
+    * been enabled. It might not actually push the state if there's an ongoing push task running, in which case will
+    * wait for the push task to finish.
+    *
+    * @throws PushStateException when the push state task reports an issue.
+    */
+   private void doPushState() throws PushStateException {
+      if (pushStateFuture == null || pushStateFuture.isDone()) {
+         Callable<?> task = createPushStateTask();
+         pushStateFuture = executor.submit(task);
+         try {
+            waitForTaskToFinish(pushStateFuture, config.getPushStateTimeout(), TimeUnit.MILLISECONDS);
+         }
+         catch (Exception e) {
+            throw new PushStateException("unable to complete in memory state push to cache loader", e);
+         }
+      } else {
+         /* at the most, we wait for push state timeout value. if it push task finishes earlier, this call
+* will stop when the push task finishes, otherwise a timeout exception will be reported */
+         awaitForPushToFinish(pushStateFuture, config.getPushStateTimeout(), TimeUnit.MILLISECONDS);
+      }
    }
 
-   public void stop() {
-      // TODO: Manik: Customise this generated block
+
+   /**
+    * Waits, within a time constraint, for a task to finish.
+    *
+    * @param future  represents the task waiting to finish.
+    * @param timeout maximum time to wait for the time to finish.
+    * @param unit    instance of TimeUnit representing the unit of timeout
+    * @throws Exception if any issues are reported while waiting for the task to finish
+    */
+   private void waitForTaskToFinish(Future future, long timeout, TimeUnit unit) throws Exception {
+      try {
+         future.get(timeout, unit);
+      }
+      catch (TimeoutException e) {
+         throw new Exception("task timed out", e);
+      }
+      catch (InterruptedException e) {
+         /* Re-assert the thread's interrupted status */
+         Thread.currentThread().interrupt();
+         if (trace) log.trace("task was interrupted");
+      }
+      finally {
+         /* no-op if task is completed */
+         future.cancel(true); /* interrupt if running */
+      }
    }
+
+
+   /**
+    * Cache listener that reacts to cluster topology changes to find out whether a new coordinator is elected.
+    * SingletonStore reacts to these changes in order to decide which node should interact with the underlying cache
+    * store.
+    */
+   @Listener
+   public class SingletonStoreListener {
+      /**
+       * Cache started, check whether the node is the coordinator and set the singleton store's active status.
+       */
+      @CacheStarted
+      public void cacheStarted(Event e) {
+         localAddress = cacheManager.getAddress();
+         active = cacheManager.isCoordinator();
+      }
+
+      /**
+       * The cluster formation changed, so determine whether the current node stopped being the coordinator or became
+       * the coordinator. This method can lead to an optional in memory to cache loader state push, if the current node
+       * became the coordinator. This method will report any issues that could potentially arise from this push.
+       */
+      @ViewChanged
+      public void viewChange(ViewChangedEvent event) {
+         boolean tmp = isCoordinator(event.getNewMemberList(), event.getLocalAddress());
+
+         if (active != tmp) {
+            try {
+               activeStatusChanged(tmp);
+            }
+            catch (PushStateException e) {
+               log.error("exception reported changing nodes active status", e);
+            }
+
+         }
+      }
+   }
+
+   /**
+    * Exception representing any issues that arise from pushing the in-memory state to the cache loader.
+    */
+   public static class PushStateException extends Exception {
+      private static final long serialVersionUID = 5542893943730200886L;
+
+      public PushStateException(String message, Throwable cause) {
+         super(message, cause);
+      }
+
+      public PushStateException(Throwable cause) {
+         super(cause);
+      }
+   }
+
+   @Override
+   public String toString() {
+      return "SingletonStore: localAddress=" + localAddress + ", active=" + active;
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStoreConfig.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStoreConfig.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -13,7 +13,7 @@
    private static final long serialVersionUID = 824251894176131850L;
 
    boolean singletonStoreEnabled;
-   boolean pushStateWhenCoordinator;
+   boolean pushStateWhenCoordinator = true;
    long pushStateTimeout = 10000;
 
    public boolean isSingletonStoreEnabled() {

Modified: core/branches/flat/src/test/java/org/horizon/expiry/ExpiryTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/expiry/ExpiryTest.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/test/java/org/horizon/expiry/ExpiryTest.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -1,6 +1,8 @@
 package org.horizon.expiry;
 
 import org.horizon.Cache;
+import org.horizon.container.DataContainer;
+import org.horizon.loader.StoredEntry;
 import org.horizon.manager.CacheManager;
 import org.horizon.manager.DefaultCacheManager;
 import org.horizon.util.TestingUtil;
@@ -29,17 +31,19 @@
 
    public void testExpiryInPut() throws InterruptedException {
       Cache cache = cm.getCache();
-      long startTime = System.currentTimeMillis();
       long lifespan = 1000;
       cache.put("k", "v", lifespan, TimeUnit.MILLISECONDS);
-      while (System.currentTimeMillis() < startTime + lifespan + 100) {
-         if (System.currentTimeMillis() < startTime + lifespan) {
-            assert cache.get("k").equals("v");
-         } else {
-            assert cache.get("k") == null;
-         }
-         Thread.sleep(50);
-      }
+
+      DataContainer dc = cache.getAdvancedCache().getDataContainer();
+      StoredEntry se = dc.createEntryForStorage("k");
+      assert se.getKey().equals("k");
+      assert se.getValue().equals("v");
+      assert se.getLifespan() == lifespan;
+      assert !se.isExpired();
+      assert cache.get("k").equals("v");
+      Thread.sleep(1100);
+      assert se.isExpired();
+      assert cache.get("k") == null;
    }
 
    public void testExpiryInPutAll() throws InterruptedException {

Deleted: core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -1,62 +0,0 @@
-package org.horizon.loader;
-
-import org.horizon.CacheException;
-import org.horizon.loader.decorators.AsyncStore;
-import org.horizon.loader.decorators.AsyncStoreConfig;
-import org.horizon.loader.dummy.DummyInMemoryCacheLoader;
-import org.horizon.util.TestingUtil;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.AfterTest;
-import org.testng.annotations.BeforeTest;
-import org.testng.annotations.Test;
-
-import java.util.concurrent.ExecutorService;
-
- at Test(groups = "unit", sequential = true)
-public class AsyncTest {
-
-   AsyncStore store;
-   ExecutorService asyncExecutor;
-
-
-   @BeforeTest
-   public void setUp() {
-      store = new AsyncStore(new DummyInMemoryCacheLoader(), new AsyncStoreConfig());
-      DummyInMemoryCacheLoader.Cfg cfg = new DummyInMemoryCacheLoader.Cfg();
-      cfg.setStore(AsyncTest.class.getName());
-      store.init(cfg, null, null);
-      store.start();
-      asyncExecutor = (ExecutorService) TestingUtil.extractField(store, "executor");
-   }
-
-   @AfterTest
-   public void tearDown() {
-      if (store != null) store.stop();
-   }
-
-   @AfterMethod
-   public void clearStore() {
-      if (store != null) store.clear();
-   }
-
-   public void testRestrictionOnAddingToAsyncQueue() throws Exception {
-      store.remove("blah");
-
-      store.store(new StoredEntry("one", "value"));
-      store.store(new StoredEntry("two", "value"));
-      store.store(new StoredEntry("three", "value"));
-      store.store(new StoredEntry("four", "value"));
-
-      // stop the cache store
-      store.stop();
-      try {
-         store.remove("blah");
-         assert false : "Should have restricted this entry from being made";
-      }
-      catch (CacheException expected) {
-      }
-
-      // clean up
-      store.start();
-   }
-}

Added: core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/CacheLoaderFunctionalTest.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -0,0 +1,340 @@
+package org.horizon.loader;
+
+import org.horizon.Cache;
+import org.horizon.config.CacheLoaderManagerConfig;
+import org.horizon.config.Configuration;
+import org.horizon.container.DataContainer;
+import org.horizon.lifecycle.ComponentStatus;
+import org.horizon.loader.dummy.DummyInMemoryCacheLoader;
+import org.horizon.manager.CacheManager;
+import org.horizon.manager.DefaultCacheManager;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.Collections;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+/**
+ * Tests the interceptor chain and surrounding logic
+ *
+ * @author Manik Surtani
+ */
+ at Test(groups = "functional", sequential = true)
+public class CacheLoaderFunctionalTest {
+   Cache cache;
+   CacheStore store;
+   TransactionManager tm;
+   Configuration cfg;
+   CacheManager cm;
+   long lifespan = 6000000; // very large lifespan so nothing actually expires
+
+   @BeforeTest
+   public void setUp() {
+      cfg = new Configuration();
+      cfg.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+      clmc.addIndividualCacheLoaderConfig(new DummyInMemoryCacheLoader.Cfg());
+      cfg.setCacheLoaderConfig(clmc);
+      cm = new DefaultCacheManager(cfg);
+      cache = cm.getCache();
+      store = TestingUtil.extractComponent(cache, CacheLoaderManager.class).getCacheStore();
+      tm = TestingUtil.getTransactionManager(cache);
+   }
+
+   @AfterTest
+   public void tearDown() {
+      TestingUtil.killCacheManagers(cm);
+   }
+
+   @AfterMethod
+   public void afterMethod() {
+      if (cache != null) cache.clear();
+      if (store != null) store.clear();
+   }
+
+   private void assertInCacheAndStore(Object key, Object value) {
+      assertInCacheAndStore(key, value, -1);
+   }
+
+   private void assertInCacheAndStore(Object key, Object value, long lifespanMillis) {
+      assertInCacheAndStore(cache, store, key, value, lifespanMillis);
+   }
+
+
+   private void assertInCacheAndStore(Cache cache, CacheStore store, Object key, Object value) {
+      assertInCacheAndStore(cache, store, key, value, -1);
+   }
+
+   private void assertInCacheAndStore(Cache cache, CacheStore store, Object key, Object value, long lifespanMillis) {
+      StoredEntry se = cache.getAdvancedCache().getDataContainer().createEntryForStorage(key);
+      testStoredEntry(se, value, lifespanMillis, "Cache", key);
+      se = store.load(key);
+      testStoredEntry(se, value, lifespanMillis, "Store", key);
+   }
+
+   private void testStoredEntry(StoredEntry entry, Object expectedValue, long expectedLifespan, String src, Object key) {
+      assert entry != null : src + " entry for key " + key + " should NOT be null";
+      assert entry.getValue().equals(expectedValue) : src + " should contain value " + expectedValue + " under key " + entry.getKey() + " but was " + entry.getValue() + ". Entry is " + entry;
+      assert entry.getLifespan() == expectedLifespan : src + " expected lifespan for key " + key + " to be " + expectedLifespan + " but was " + entry.getLifespan() + ". Entry is " + entry;
+   }
+
+   private void assertNotInCacheAndStore(Cache cache, CacheStore store, Object... keys) {
+      for (Object key : keys) {
+         assert !cache.getAdvancedCache().getDataContainer().containsKey(key) : "Cache should not contain key " + key;
+         assert !store.containsKey(key) : "Store should not contain key " + key;
+      }
+   }
+
+   private void assertNotInCacheAndStore(Object... keys) {
+      assertNotInCacheAndStore(cache, store, keys);
+   }
+
+   public void testStoreAndRetrieve() {
+      assertNotInCacheAndStore("k1", "k2", "k3", "k4", "k5", "k6", "k7");
+
+      cache.put("k1", "v1");
+      cache.put("k2", "v2", lifespan, MILLISECONDS);
+      cache.putAll(Collections.singletonMap("k3", "v3"));
+      cache.putAll(Collections.singletonMap("k4", "v4"), lifespan, MILLISECONDS);
+      cache.putIfAbsent("k5", "v5");
+      cache.putIfAbsent("k6", "v6", lifespan, MILLISECONDS);
+      cache.putIfAbsent("k5", "v5-SHOULD-NOT-PUT");
+      cache.putIfAbsent("k6", "v6-SHOULD-NOT-PUT", lifespan, MILLISECONDS);
+      cache.putForExternalRead("k7", "v7");
+      cache.putForExternalRead("k7", "v7-SHOULD-NOT-PUT");
+
+      for (int i = 1; i < 8; i++) {
+         // even numbers have lifespans
+         if (i % 2 == 1)
+            assertInCacheAndStore("k" + i, "v" + i);
+         else
+            assertInCacheAndStore("k" + i, "v" + i, lifespan);
+      }
+
+      cache.remove("k1", "some rubbish");
+
+      for (int i = 1; i < 8; i++) {
+         // even numbers have lifespans
+         if (i % 2 == 1)
+            assertInCacheAndStore("k" + i, "v" + i);
+         else
+            assertInCacheAndStore("k" + i, "v" + i, lifespan);
+      }
+
+      assert cache.remove("k1", "v1");
+      assert cache.remove("k2").equals("v2");
+
+      assertNotInCacheAndStore("k1", "k2");
+
+      for (int i = 3; i < 8; i++) {
+         // even numbers have lifespans
+         if (i % 2 == 1)
+            assertInCacheAndStore("k" + i, "v" + i);
+         else
+            assertInCacheAndStore("k" + i, "v" + i, lifespan);
+      }
+
+      cache.clear();
+      assertNotInCacheAndStore("k1", "k2", "k3", "k4", "k5", "k6", "k7");
+   }
+
+   public void testReplaceMethods() {
+      assertNotInCacheAndStore("k1", "k2", "k3", "k4");
+
+      cache.replace("k1", "v1-SHOULD-NOT-STORE");
+      cache.replace("k2", "v2-SHOULD-NOT-STORE", lifespan, MILLISECONDS);
+
+      assertNotInCacheAndStore("k1", "k2", "k3", "k4");
+
+      cache.put("k1", "v1");
+      cache.put("k2", "v2");
+      cache.put("k3", "v3");
+      cache.put("k4", "v4");
+
+      for (int i = 1; i < 5; i++) assertInCacheAndStore("k" + i, "v" + i);
+
+      cache.replace("k1", "v1-SHOULD-NOT-STORE", "v1-STILL-SHOULD-NOT-STORE");
+      cache.replace("k2", "v2-SHOULD-NOT-STORE", "v2-STILL-SHOULD-NOT-STORE", lifespan, MILLISECONDS);
+
+      for (int i = 1; i < 5; i++) assertInCacheAndStore("k" + i, "v" + i);
+
+      cache.replace("k1", "v1-REPLACED");
+      cache.replace("k2", "v2-REPLACED", lifespan, MILLISECONDS);
+      cache.replace("k3", "v3", "v3-REPLACED");
+      cache.replace("k4", "v4", "v4-REPLACED", lifespan, MILLISECONDS);
+
+      for (int i = 1; i < 5; i++) {
+         // even numbers have lifespans
+         if (i % 2 == 1)
+            assertInCacheAndStore("k" + i, "v" + i + "-REPLACED");
+         else
+            assertInCacheAndStore("k" + i, "v" + i + "-REPLACED", lifespan);
+      }
+
+   }
+
+   public void testLoading() {
+      assertNotInCacheAndStore("k1", "k2", "k3", "k4");
+      store.store(new StoredEntry("k1", "v1"));
+      store.store(new StoredEntry("k2", "v2"));
+      store.store(new StoredEntry("k3", "v3"));
+      store.store(new StoredEntry("k4", "v4"));
+
+      for (int i = 1; i < 5; i++) assert cache.get("k" + i).equals("v" + i);
+
+      for (int i = 1; i < 5; i++) cache.evict("k" + i);
+
+      assert cache.putIfAbsent("k1", "v1-SHOULD-NOT-STORE").equals("v1");
+      assert cache.remove("k2").equals("v2");
+      assert cache.replace("k3", "v3-REPLACED").equals("v3");
+      assert cache.replace("k4", "v4", "v4-REPLACED");
+
+      assert cache.size() == 3 : "Expected the cache to contain 3 elements but contained " + cache.size();
+
+      for (int i = 1; i < 5; i++) cache.evict("k" + i);
+      assert cache.size() == 0; // cache size ops will not trigger a load
+
+      cache.clear(); // this should propagate to the loader though
+      assertNotInCacheAndStore("k1", "k2", "k3", "k4");
+   }
+
+   public void testPreloading() {
+      Configuration preloadingCfg = cfg.clone();
+      preloadingCfg.getCacheLoaderConfig().setPreload(true);
+      ((DummyInMemoryCacheLoader.Cfg) preloadingCfg.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("preloadingCache");
+      cm.defineCache("preloadingCache", preloadingCfg);
+      Cache preloadingCache = cm.getCache("preloadingCache");
+      CacheStore preloadingStore = TestingUtil.extractComponent(preloadingCache, CacheLoaderManager.class).getCacheStore();
+
+      assert preloadingCache.getConfiguration().getCacheLoaderConfig().isPreload();
+
+      assertNotInCacheAndStore(preloadingCache, preloadingStore, "k1", "k2", "k3", "k4");
+
+      preloadingCache.put("k1", "v1");
+      preloadingCache.put("k2", "v2", lifespan, MILLISECONDS);
+      preloadingCache.put("k3", "v3");
+      preloadingCache.put("k4", "v4", lifespan, MILLISECONDS);
+
+      for (int i = 1; i < 5; i++) {
+         if (i % 2 == 1)
+            assertInCacheAndStore(preloadingCache, preloadingStore, "k" + i, "v" + i);
+         else
+            assertInCacheAndStore(preloadingCache, preloadingStore, "k" + i, "v" + i, lifespan);
+      }
+
+      DataContainer c = preloadingCache.getAdvancedCache().getDataContainer();
+      assert c.size() == 4;
+      preloadingCache.stop();
+      assert c.size() == 0;
+
+      preloadingCache.start();
+      assert preloadingCache.getConfiguration().getCacheLoaderConfig().isPreload();
+      c = preloadingCache.getAdvancedCache().getDataContainer();
+      assert c.size() == 4;
+
+      for (int i = 1; i < 5; i++) {
+         if (i % 2 == 1)
+            assertInCacheAndStore(preloadingCache, preloadingStore, "k" + i, "v" + i);
+         else
+            assertInCacheAndStore(preloadingCache, preloadingStore, "k" + i, "v" + i, lifespan);
+      }
+   }
+
+   public void testPurgeOnStartup() {
+      Configuration purgingCfg = cfg.clone();
+      purgingCfg.getCacheLoaderConfig().getFirstCacheLoaderConfig().setPurgeOnStartup(true);
+      ((DummyInMemoryCacheLoader.Cfg) purgingCfg.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("purgingCache");
+      cm.defineCache("purgingCache", purgingCfg);
+      Cache purgingCache = cm.getCache("purgingCache");
+      CacheStore purgingStore = TestingUtil.extractComponent(purgingCache, CacheLoaderManager.class).getCacheStore();
+
+      assertNotInCacheAndStore(purgingCache, purgingStore, "k1", "k2", "k3", "k4");
+
+      purgingCache.put("k1", "v1");
+      purgingCache.put("k2", "v2", lifespan, MILLISECONDS);
+      purgingCache.put("k3", "v3");
+      purgingCache.put("k4", "v4", lifespan, MILLISECONDS);
+
+      for (int i = 1; i < 5; i++) {
+         if (i % 2 == 1)
+            assertInCacheAndStore(purgingCache, purgingStore, "k" + i, "v" + i);
+         else
+            assertInCacheAndStore(purgingCache, purgingStore, "k" + i, "v" + i, lifespan);
+      }
+
+      DataContainer c = purgingCache.getAdvancedCache().getDataContainer();
+      assert c.size() == 4;
+      purgingCache.stop();
+      assert c.size() == 0;
+
+      purgingCache.start();
+      c = purgingCache.getAdvancedCache().getDataContainer();
+      assert c.size() == 0;
+
+      assertNotInCacheAndStore(purgingCache, purgingStore, "k1", "k2", "k3", "k4");
+   }
+
+   public void testTransactionalWrites() throws Exception {
+      assert cache.getStatus() == ComponentStatus.RUNNING;
+      assertNotInCacheAndStore("k1", "k2");
+
+      tm.begin();
+      cache.put("k1", "v1");
+      cache.put("k2", "v2", lifespan, MILLISECONDS);
+      Transaction t = tm.suspend();
+
+      assertNotInCacheAndStore("k1", "k2");
+
+      tm.resume(t);
+      tm.commit();
+
+      assertInCacheAndStore("k1", "v1");
+      assertInCacheAndStore("k2", "v2", lifespan);
+
+      tm.begin();
+      cache.clear();
+      t = tm.suspend();
+
+      assertInCacheAndStore("k1", "v1");
+      assertInCacheAndStore("k2", "v2", lifespan);
+      tm.resume(t);
+      tm.commit();
+
+      assertNotInCacheAndStore("k1", "k2");
+
+      tm.begin();
+      cache.put("k1", "v1");
+      cache.put("k2", "v2", lifespan, MILLISECONDS);
+      t = tm.suspend();
+
+      assertNotInCacheAndStore("k1", "k2");
+
+      tm.resume(t);
+      tm.rollback();
+
+      assertNotInCacheAndStore("k1", "k2");
+      cache.put("k1", "v1");
+      cache.put("k2", "v2", lifespan, MILLISECONDS);
+
+      assertInCacheAndStore("k1", "v1");
+      assertInCacheAndStore("k2", "v2", lifespan);
+
+      tm.begin();
+      cache.clear();
+      t = tm.suspend();
+
+      assertInCacheAndStore("k1", "v1");
+      assertInCacheAndStore("k2", "v2", lifespan);
+      tm.resume(t);
+      tm.rollback();
+
+      assertInCacheAndStore("k1", "v1");
+      assertInCacheAndStore("k2", "v2", lifespan);
+   }
+}

Copied: core/branches/flat/src/test/java/org/horizon/loader/decorators/AsyncTest.java (from rev 7695, core/branches/flat/src/test/java/org/horizon/loader/AsyncTest.java)
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/decorators/AsyncTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/decorators/AsyncTest.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -0,0 +1,61 @@
+package org.horizon.loader.decorators;
+
+import org.horizon.CacheException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.dummy.DummyInMemoryCacheLoader;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutorService;
+
+ at Test(groups = "unit", sequential = true)
+public class AsyncTest {
+
+   AsyncStore store;
+   ExecutorService asyncExecutor;
+
+
+   @BeforeTest
+   public void setUp() {
+      store = new AsyncStore(new DummyInMemoryCacheLoader(), new AsyncStoreConfig());
+      DummyInMemoryCacheLoader.Cfg cfg = new DummyInMemoryCacheLoader.Cfg();
+      cfg.setStore(AsyncTest.class.getName());
+      store.init(cfg, null, null);
+      store.start();
+      asyncExecutor = (ExecutorService) TestingUtil.extractField(store, "executor");
+   }
+
+   @AfterTest
+   public void tearDown() {
+      if (store != null) store.stop();
+   }
+
+   @AfterMethod
+   public void clearStore() {
+      if (store != null) store.clear();
+   }
+
+   public void testRestrictionOnAddingToAsyncQueue() throws Exception {
+      store.remove("blah");
+
+      store.store(new StoredEntry("one", "value"));
+      store.store(new StoredEntry("two", "value"));
+      store.store(new StoredEntry("three", "value"));
+      store.store(new StoredEntry("four", "value"));
+
+      // stop the cache store
+      store.stop();
+      try {
+         store.remove("blah");
+         assert false : "Should have restricted this entry from being made";
+      }
+      catch (CacheException expected) {
+      }
+
+      // clean up
+      store.start();
+   }
+}

Added: core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -0,0 +1,32 @@
+package org.horizon.loader.decorators;
+
+import static org.easymock.EasyMock.*;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.testng.annotations.Test;
+
+import java.io.InputStream;
+
+ at Test(groups = "unit")
+public class ReadOnlyCacheStoreTest {
+   public void testWriteMethods() {
+      CacheStore mock = createMock(CacheStore.class);
+      ReadOnlyStore store = new ReadOnlyStore(mock);
+      StoredEntry mockEntry = new StoredEntry();
+      expect(mock.load(eq("key"))).andReturn(mockEntry).once();
+      replay(mock);
+
+      // these should be "silent" no-ops and not actually change anything.
+      store.clear();
+      store.purgeExpired();
+      store.remove("key");
+      store.store((StoredEntry) null);
+      store.store((InputStream) null);
+      store.prepare(null, null, true);
+      store.commit(null);
+      store.rollback(null);
+      assert mockEntry == store.load("key");
+
+      verify(mock);
+   }
+}

Added: core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -0,0 +1,321 @@
+package org.horizon.loader.decorators;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
+import org.horizon.config.CacheLoaderManagerConfig;
+import org.horizon.config.Configuration;
+import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.dummy.DummyInMemoryCacheLoader;
+import org.horizon.manager.CacheManager;
+import org.horizon.util.TestingUtil;
+import org.horizon.util.internals.ViewChangeListener;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.fail;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+ at Test(groups = "functional", sequential = true)
+public class SingletonStoreTest extends BaseClusteredTest {
+   private static final Log log = LogFactory.getLog(SingletonStoreTest.class);
+   private static final AtomicInteger storeCounter = new AtomicInteger(0);
+   private CacheManager cm0, cm1, cm2;
+
+   @BeforeMethod
+   public void setUp() {
+      cm0 = addClusterEnabledCacheManager();
+      cm1 = addClusterEnabledCacheManager();
+      cm2 = addClusterEnabledCacheManager();
+
+      Configuration conf = new Configuration();
+      conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      DummyInMemoryCacheLoader.Cfg cfg = new DummyInMemoryCacheLoader.Cfg();
+      cfg.setStore("Store-" + storeCounter.getAndIncrement());
+      CacheLoaderManagerConfig pushingCfg = new CacheLoaderManagerConfig();
+      pushingCfg.addIndividualCacheLoaderConfig(cfg);
+      SingletonStoreConfig ssc = new SingletonStoreConfig();
+      ssc.setPushStateWhenCoordinator(true);
+      ssc.setSingletonStoreEnabled(true);
+      cfg.setSingletonStoreConfig(ssc);
+      conf.setCacheLoaderConfig(pushingCfg);
+
+      // cannot define on ALL cache managers since the same dummy in memory CL bin will be used!
+      cm0.defineCache("pushing", conf);
+      ((DummyInMemoryCacheLoader.Cfg) conf.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("Store-" + storeCounter.getAndIncrement());
+      cm1.defineCache("pushing", conf);
+      ((DummyInMemoryCacheLoader.Cfg) conf.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("Store-" + storeCounter.getAndIncrement());
+      cm2.defineCache("pushing", conf);
+
+      conf = new Configuration();
+      conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      cfg = new DummyInMemoryCacheLoader.Cfg();
+      cfg.setStore("Store-" + storeCounter.getAndIncrement());
+      CacheLoaderManagerConfig nonPushingCfg = new CacheLoaderManagerConfig();
+      nonPushingCfg.addIndividualCacheLoaderConfig(cfg);
+      ssc = new SingletonStoreConfig();
+      ssc.setPushStateWhenCoordinator(false);
+      ssc.setSingletonStoreEnabled(true);
+      cfg.setSingletonStoreConfig(ssc);
+      conf.setCacheLoaderConfig(nonPushingCfg);
+
+      // cannot define on ALL cache managers since the same dummy in memory CL bin will be used!
+      cm0.defineCache("nonPushing", conf);
+      ((DummyInMemoryCacheLoader.Cfg) conf.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("Store-" + storeCounter.getAndIncrement());
+      cm1.defineCache("nonPushing", conf);
+      ((DummyInMemoryCacheLoader.Cfg) conf.getCacheLoaderConfig().getFirstCacheLoaderConfig()).setStore("Store-" + storeCounter.getAndIncrement());
+      cm2.defineCache("nonPushing", conf);
+   }
+
+   private Cache[] getCaches(String name) {
+      return new Cache[]{
+            cm0.getCache(name), cm1.getCache(name), cm2.getCache(name)
+      };
+   }
+
+   private SingletonStore[] extractStores(Cache[] caches) {
+      SingletonStore[] stores = new SingletonStore[caches.length];
+
+      int i = 0;
+      for (Cache c : caches)
+         stores[i++] = (SingletonStore) TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+      return stores;
+   }
+
+   private Object load(CacheStore cs, Object key) {
+      StoredEntry se = cs.load(key);
+      return se == null ? null : se.getValue();
+   }
+
+   public void testPutCacheLoaderWithNoPush() throws Exception {
+      Cache[] caches = getCaches("nonPushing");
+      for (Cache c : caches) c.start();
+
+      int i = 1;
+      for (Cache c : caches) {
+         c.put("key" + i, "value" + i);
+         i++;
+      }
+
+      // all values should be on all caches since they are sync-repl
+      for (Cache c : caches) {
+         for (i = 1; i < 4; i++) assert c.get("key" + i).equals("value" + i);
+      }
+
+      // now test the stores.  These should *only* be on the store on cache 1.
+      CacheStore[] stores = extractStores(caches);
+
+      for (i = 1; i < 4; i++) {
+         // should ONLY be on the first loader!
+         assert load(stores[0], "key" + i).equals("value" + i);
+         assert load(stores[1], "key" + i) == null : "stores[1] should not have stored key key" + i;
+         assert load(stores[2], "key" + i) == null : "stores[2] should not have stored key key" + i;
+      }
+
+      cm0.stop();
+      TestingUtil.blockUntilViewsReceived(60000, cm1, cm2);
+
+      caches[1].put("key4", "value4");
+      caches[2].put("key5", "value5");
+
+      assert load(stores[1], "key4").equals("value4");
+      assert load(stores[1], "key5").equals("value5");
+
+      assert load(stores[2], "key4") == null;
+      assert load(stores[2], "key5") == null;
+
+      cm1.stop();
+      TestingUtil.blockUntilViewsReceived(60000, cm2);
+
+      caches[2].put("key6", "value6");
+      assert load(stores[2], "key6").equals("value6");
+   }
+
+   public void testPutCacheLoaderWithPush() throws Exception {
+      Cache[] caches = getCaches("pushing");
+      for (Cache c : caches) c.start();
+      Map<String, String> expected = new HashMap<String, String>();
+
+      expected.put("a-key", "a-value");
+      expected.put("aa-key", "aa-value");
+      expected.put("b-key", "b-value");
+      expected.put("bb-key", "bb-value");
+      expected.put("c-key", "c-value");
+      expected.put("d-key", "d-value");
+      expected.put("e-key", "e-value");
+      expected.put("g-key", "g-value");
+
+      caches[0].putAll(expected);
+
+      SingletonStore[] stores = extractStores(caches);
+
+      for (String key : expected.keySet()) {
+         assert load(stores[0], key).equals(expected.get(key));
+         assert load(stores[1], key) == null;
+         assert load(stores[2], key) == null;
+      }
+
+      ViewChangeListener viewChangeListener = new ViewChangeListener(caches[1]);
+
+      cm0.stop();
+
+      viewChangeListener.waitForViewChange(60, TimeUnit.SECONDS);
+
+      waitForPushStateCompletion(stores[1].pushStateFuture);
+
+      // cache store 1 should have all state now, and store 2 should have nothing
+
+      for (String key : expected.keySet()) {
+         assert load(stores[1], key).equals(expected.get(key));
+         assert load(stores[2], key) == null;
+      }
+
+      caches[1].put("h-key", "h-value");
+      caches[2].put("i-key", "i-value");
+      expected.put("h-key", "h-value");
+      expected.put("i-key", "i-value");
+
+      for (String key : expected.keySet()) {
+         assert load(stores[1], key).equals(expected.get(key));
+         assert load(stores[2], key) == null;
+      }
+
+      viewChangeListener = new ViewChangeListener(caches[2]);
+      cm1.stop();
+      viewChangeListener.waitForViewChange(60, TimeUnit.SECONDS);
+
+      waitForPushStateCompletion(stores[2].pushStateFuture);
+
+      for (String key : expected.keySet()) assert load(stores[2], key).equals(expected.get(key));
+
+      caches[2].put("aaa-key", "aaa-value");
+      expected.put("aaa-key", "aaa-value");
+
+      for (String key : expected.keySet()) assert load(stores[2], key).equals(expected.get(key));
+   }
+
+   public void testAvoidConcurrentStatePush() throws Exception {
+      final ExecutorService executor = Executors.newFixedThreadPool(2);
+      final CountDownLatch pushStateCanFinish = new CountDownLatch(1);
+      final CountDownLatch secondActiveStatusChangerCanStart = new CountDownLatch(1);
+      final TestingSingletonStore mscl = new TestingSingletonStore(pushStateCanFinish, secondActiveStatusChangerCanStart, new SingletonStoreConfig());
+
+      Future f1 = executor.submit(createActiveStatusChanger(mscl));
+      assert secondActiveStatusChangerCanStart.await(1000, TimeUnit.MILLISECONDS) : "Failed waiting on latch";
+
+      Future f2 = executor.submit(createActiveStatusChanger(mscl));
+
+      f1.get();
+      f2.get();
+
+      assertEquals(1, mscl.getNumberCreatedTasks());
+   }
+
+   public void testPushStateTimedOut() throws Throwable {
+      final CountDownLatch pushStateCanFinish = new CountDownLatch(1);
+      SingletonStoreConfig ssdc = new SingletonStoreConfig();
+      ssdc.setPushStateTimeout(100);
+      final TestingSingletonStore mscl = new TestingSingletonStore(pushStateCanFinish, null, ssdc);
+
+      Future f = Executors.newSingleThreadExecutor().submit(createActiveStatusChanger(mscl));
+      pushStateCanFinish.await(200, TimeUnit.MILLISECONDS);
+      pushStateCanFinish.countDown();
+
+      try {
+         f.get();
+         fail("Should have timed out");
+      }
+      catch (ExecutionException expected) {
+         Throwable e;
+         if ((e = expected.getCause().getCause().getCause()) instanceof TimeoutException) {
+            assert true : "This is expected";
+         } else {
+            throw e;
+         }
+      }
+   }
+
+   private void waitForPushStateCompletion(Future pushThreadFuture) throws Exception {
+      if (pushThreadFuture != null) pushThreadFuture.get();
+   }
+
+   private Callable<?> createActiveStatusChanger(SingletonStore mscl) {
+      return new ActiveStatusModifier(mscl);
+   }
+
+   class TestingSingletonStore extends SingletonStore {
+      private int numberCreatedTasks = 0;
+      private CountDownLatch pushStateCanFinish;
+      private CountDownLatch secondActiveStatusChangerCanStart;
+
+      public TestingSingletonStore(CountDownLatch pushStateCanFinish, CountDownLatch secondActiveStatusChangerCanStart, SingletonStoreConfig cfg) {
+         super(null, null, cfg);
+         this.pushStateCanFinish = pushStateCanFinish;
+         this.secondActiveStatusChangerCanStart = secondActiveStatusChangerCanStart;
+      }
+
+      public int getNumberCreatedTasks() {
+         return numberCreatedTasks;
+      }
+
+      public void setNumberCreatedTasks(int numberCreatedTasks) {
+         this.numberCreatedTasks = numberCreatedTasks;
+      }
+
+      @Override
+      protected Callable<?> createPushStateTask() {
+         return new Callable() {
+            public Object call() throws Exception {
+               numberCreatedTasks++;
+               try {
+                  if (secondActiveStatusChangerCanStart != null) {
+                     secondActiveStatusChangerCanStart.countDown();
+                  }
+                  pushStateCanFinish.await();
+               }
+               catch (InterruptedException e) {
+                  fail("ActiveStatusModifier interrupted");
+               }
+               return null;
+            }
+         };
+      }
+
+
+      @Override
+      protected void awaitForPushToFinish(Future future, long timeout, TimeUnit unit) {
+         pushStateCanFinish.countDown();
+         super.awaitForPushToFinish(future, timeout, unit);
+      }
+   }
+
+   class ActiveStatusModifier implements Callable {
+      private SingletonStore scl;
+
+      public ActiveStatusModifier(SingletonStore singleton) {
+         scl = singleton;
+      }
+
+      public Object call() throws Exception {
+         log.debug("active status modifier started");
+         scl.activeStatusChanged(true);
+         scl.pushStateFuture.get();
+
+         return null;
+      }
+   }
+}

Modified: core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java	2009-02-16 14:25:32 UTC (rev 7696)
+++ core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheLoader.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -5,6 +5,8 @@
 import org.horizon.loader.AbstractCacheStore;
 import org.horizon.loader.CacheLoaderConfig;
 import org.horizon.loader.StoredEntry;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
 import org.horizon.marshall.Marshaller;
 
 import java.io.IOException;
@@ -20,7 +22,7 @@
 import java.util.concurrent.ConcurrentMap;
 
 public class DummyInMemoryCacheLoader extends AbstractCacheStore {
-
+   private static final Log log = LogFactory.getLog(DummyInMemoryCacheLoader.class);
    static final ConcurrentMap<String, Map> stores = new ConcurrentHashMap<String, Map>();
    String storeName = "__DEFAULT_STORES__";
    Map<Object, StoredEntry> store;
@@ -73,6 +75,7 @@
       StoredEntry se = store.get(key);
       if (se == null) return null;
       if (se.isExpired()) {
+         log.debug("Key {0} exists, but has expired.  Entry is {1}", key, se);
          store.remove(key);
          return null;
       }
@@ -84,9 +87,10 @@
       Set<StoredEntry> s = new HashSet<StoredEntry>();
       for (Iterator<StoredEntry> i = store.values().iterator(); i.hasNext();) {
          StoredEntry se = i.next();
-         if (se.isExpired())
+         if (se.isExpired()) {
+            log.debug("Key {0} exists, but has expired.  Entry is {1}", se.getKey(), se);
             i.remove();
-         else
+         } else
             s.add(se);
       }
       return s;
@@ -109,13 +113,16 @@
    }
 
    public void stop() {
-      stores.remove(storeName);
    }
 
    public static class Cfg extends AbstractCacheLoaderConfig {
       boolean debug;
-      String store;
+      String store = "__DEFAULT_STORE__";
 
+      public Cfg() {
+         setClassName(DummyInMemoryCacheLoader.class.getName());
+      }
+
       public boolean isDebug() {
          return debug;
       }
@@ -131,5 +138,10 @@
       public void setStore(String store) {
          this.store = store;
       }
+
+      @Override
+      public Cfg clone() {
+         return (Cfg) super.clone();
+      }
    }
 }

Added: core/branches/flat/src/test/java/org/horizon/util/internals/ViewChangeListener.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/util/internals/ViewChangeListener.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/util/internals/ViewChangeListener.java	2009-02-17 01:07:15 UTC (rev 7697)
@@ -0,0 +1,45 @@
+package org.horizon.util.internals;
+
+import org.horizon.Cache;
+import org.horizon.manager.CacheManager;
+import org.horizon.notifications.Listener;
+import org.horizon.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.horizon.notifications.cachemanagerlistener.event.ViewChangedEvent;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Listens for view changes.  Note that you do NOT have to register this listener; it does so automatically when
+ * constructed.
+ */
+ at Listener
+public class ViewChangeListener {
+   CacheManager cm;
+   final CountDownLatch latch = new CountDownLatch(1);
+
+   public ViewChangeListener(Cache c) {
+      this(c.getCacheManager());
+   }
+
+   public ViewChangeListener(CacheManager cm) {
+      this.cm = cm;
+      cm.addListener(this);
+   }
+
+   @ViewChanged
+   public void onViewChange(ViewChangedEvent e) {
+      latch.countDown();
+   }
+
+   /**
+    * Blocks for a certain amount of time until a view change is received.  Note that this class will start listening
+    * for the view change the moment it is constructed.
+    *
+    * @param time time to wait
+    * @param unit time unit
+    */
+   public void waitForViewChange(long time, TimeUnit unit) throws InterruptedException {
+      if (!latch.await(time, unit)) assert false : "View change not seen after " + time + " " + unit;
+   }
+}




More information about the jbosscache-commits mailing list