[jbosscache-commits] JBoss Cache SVN: r7178 - in core/branches/flat/src: main/java/org/jboss/starobrno/config and 6 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Nov 19 23:06:04 EST 2008


Author: jason.greene at jboss.com
Date: 2008-11-19 23:06:04 -0500 (Wed, 19 Nov 2008)
New Revision: 7178

Added:
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheLoaderInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheStoreInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractDelegatingCacheLoader.java
   core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoader.java
   core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoaderConfig.java
   core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoaderManager.java
   core/branches/flat/src/main/java/org/jboss/starobrno/loader/ChainingCacheLoader.java
   core/branches/flat/src/main/java/org/jboss/starobrno/loader/ReadOnlyDelegatingCacheLoader.java
Removed:
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/SkipCheckChainedInterceptor.java
Modified:
   core/branches/flat/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java
   core/branches/flat/src/main/java/org/jboss/starobrno/config/CacheLoaderConfig.java
   core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractCacheLoader.java
   core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java
   core/branches/flat/src/main/java/org/jboss/starobrno/loader/FileCacheLoader.java
   core/branches/flat/src/test/java/org/jboss/starobrno/tree/api/NodeAPITest.java
Log:
More cacheloader work


Modified: core/branches/flat/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java	2008-11-20 04:05:09 UTC (rev 7177)
+++ core/branches/flat/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -21,28 +21,28 @@
  */
 package org.jboss.cache.loader;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.StringTokenizer;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.CacheSPI_Legacy;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.RegionManager;
-import org.jboss.starobrno.util.ReflectionUtil;
 import org.jboss.starobrno.CacheException;
 import org.jboss.starobrno.config.CacheLoaderConfig;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.config.ConfigurationException;
 import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
 import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig.SingletonStoreConfig;
-import org.jboss.starobrno.config.Configuration;
-import org.jboss.starobrno.config.ConfigurationException;
 import org.jboss.starobrno.factories.ComponentRegistry;
 import org.jboss.starobrno.factories.annotations.Inject;
 import org.jboss.starobrno.factories.annotations.Start;
 import org.jboss.starobrno.factories.annotations.Stop;
+import org.jboss.starobrno.util.ReflectionUtil;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.StringTokenizer;
-
 /**
  * Manages all cache loader functionality.  This class is typically initialised with an XML DOM Element,
  * represeting a cache loader configuration, or a {@link org.jboss.cache.config.CacheLoaderConfig} object.
@@ -225,7 +225,7 @@
    private CacheLoader createCacheLoader(CacheLoaderConfig.IndividualCacheLoaderConfig cfg, CacheSPI_Legacy cache) throws Exception
    {
       // create loader
-      CacheLoader tmpLoader = cfg.getCacheLoader() == null ? createInstance(cfg.getClassName()) : cfg.getCacheLoader();
+      CacheLoader tmpLoader = (CacheLoader)(cfg.getCacheLoader() == null ? createInstance(cfg.getClassName()) : cfg.getCacheLoader());
 
       if (tmpLoader != null)
       {

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/config/CacheLoaderConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/config/CacheLoaderConfig.java	2008-11-20 04:05:09 UTC (rev 7177)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/config/CacheLoaderConfig.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -21,14 +21,14 @@
  */
 package org.jboss.starobrno.config;
 
-import org.jboss.cache.loader.CacheLoader;
-import org.jboss.cache.loader.SingletonStoreCacheLoader;
-import org.jboss.starobrno.util.Util;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
+import org.jboss.cache.loader.SingletonStoreCacheLoader;
+import org.jboss.starobrno.loader.CacheLoader;
+import org.jboss.starobrno.util.Util;
+
 /**
  * Holds the configuration of the cache loader chain.  ALL cache loaders should be defined using this class, adding
  * individual cache loaders to the chain by calling {@link CacheLoaderConfig#addIndividualCacheLoaderConfig}
@@ -193,7 +193,7 @@
       private boolean purgeOnStartup;
 
       private SingletonStoreConfig singletonStoreConfig;
-      private transient CacheLoader cacheLoader;
+      private transient CacheLoader<Object, Object> cacheLoader;
 
       protected void populateFromBaseConfig(IndividualCacheLoaderConfig base)
       {
@@ -271,7 +271,7 @@
        * @return cache loader, if one exists
        * @since 2.1.0
        */
-      public CacheLoader getCacheLoader()
+      public CacheLoader<Object,Object> getCacheLoader()
       {
          return cacheLoader;
       }
@@ -283,7 +283,7 @@
        * @param cacheLoader cacheLoader to set
        * @since 2.1.0
        */
-      public void setCacheLoader(CacheLoader cacheLoader)
+      public void setCacheLoader(CacheLoader<Object,Object> cacheLoader)
       {
          this.cacheLoader = cacheLoader;
       }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java	2008-11-20 04:05:09 UTC (rev 7177)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -167,7 +167,7 @@
     * @throws org.jboss.starobrno.lock.TimeoutException
     *                              if we are unable to acquire the lock after a specified timeout.
     */
-   private boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException
+   public boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException
    {
       // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
       // lock which may be shared with another Fqn that we have a lock for already.
@@ -184,4 +184,9 @@
       }
       return false;
    }
+
+   public void releaseLock(InvocationContext ctx, Object key)
+   {
+      lockManager.unlock(key, lockManager.getOwner(key));
+   }
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java	2008-11-20 04:05:09 UTC (rev 7177)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -124,22 +124,22 @@
       }
 
       // TODO: Uncomment once the CacheLoader has been moved to Starobrno
-//      if (configuration.isUsingCacheLoaders())
-//      {
+      if (configuration.isUsingCacheLoaders())
+      {
 //         if (configuration.getCacheLoaderConfig().isPassivation())
 //         {
 //            interceptorChain.appendIntereceptor(createInterceptor(ActivationInterceptor.class));
 //         }
 //         else
-//         {
-//            interceptorChain.appendIntereceptor(createInterceptor(CacheLoaderInterceptor.class));
-//         }
-//      }
+         {
+            interceptorChain.appendIntereceptor(createInterceptor(CacheLoaderInterceptor.class));
+         }
+      }
       interceptorChain.appendIntereceptor(createInterceptor(LockingInterceptor.class));
 
       // TODO: Uncomment once the CacheLoader has been moved to Starobrno
-//      if (configuration.isUsingCacheLoaders())
-//      {
+      if (configuration.isUsingCacheLoaders())
+      {
 //         if (configuration.getCacheLoaderConfig().isPassivation())
 //         {
 //
@@ -147,13 +147,13 @@
 //
 //         }
 //         else
-//         {
+         {
+
+            interceptorChain.appendIntereceptor(createInterceptor(CacheStoreInterceptor.class));
+
+         }
+      }
 //
-//            interceptorChain.appendIntereceptor(createInterceptor(CacheStoreInterceptor.class));
-//
-//         }
-//      }
-//
 //      if (configuration.isUsingBuddyReplication())
 //      {
 //

Added: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheLoaderInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheLoaderInterceptor.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheLoaderInterceptor.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -0,0 +1,228 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.starobrno.commands.read.GetKeyValueCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.container.DataContainer;
+import org.jboss.starobrno.container.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntryCreator;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.interceptors.base.JmxStatsCommandInterceptor;
+import org.jboss.starobrno.jmx.annotations.ManagedAttribute;
+import org.jboss.starobrno.jmx.annotations.ManagedOperation;
+import org.jboss.starobrno.loader.CacheLoader;
+import org.jboss.starobrno.loader.CacheLoaderManager;
+import org.jboss.starobrno.notifications.Notifier;
+
+/**
+ * Loads nodes that don't exist at the time of the call into memory from the CacheLoader
+ *
+ * @author Bela Ban
+ * @version $Id$
+ */
+public class CacheLoaderInterceptor extends JmxStatsCommandInterceptor
+{
+   private long cacheLoads = 0;
+   private long cacheMisses = 0;
+   private CacheLoaderManager clm;
+
+   protected TransactionTable txTable = null;
+   protected CacheLoader<Object, Object> loader;
+   protected DataContainer<Object, Object> dataContainer;
+   protected Notifier notifier;
+   protected MVCCEntryCreator creator;
+
+   protected boolean isActivation = false;
+//   protected boolean usingVersionedInvalidation = false;
+
+
+   /**
+    * True if CacheStoreInterceptor is in place.
+    * This allows us to skip loading keys for remove(Fqn, key) and put(Fqn, key).
+    * It also affects removal of node data and listing children.
+    */
+   protected boolean useCacheStore = true;
+
+   @Inject
+   protected void injectDependencies(TransactionTable txTable, CacheLoaderManager clm, Configuration configuration,
+                                     DataContainer<Object, Object> dataContainer, MVCCEntryCreator creator, Notifier notifier)
+   {
+      this.txTable = txTable;
+      this.clm = clm;
+//      CacheMode mode = configuration.getCacheMode();
+//      usingVersionedInvalidation = mode.isInvalidation();
+      this.dataContainer = dataContainer;
+      this.notifier = notifier;
+      this.creator = creator;
+   }
+
+   @Start
+   protected void startInterceptor()
+   {
+      loader = clm.getCacheLoader();
+   }
+
+   @Override
+   public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+   {
+      if (command.getKey() != null)
+      {
+         loadIfNeeded(ctx, command.getKey());
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+
+   @Override
+   public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable
+   {
+      if (command.getKey() != null)
+      {
+         loadIfNeeded(ctx, command.getKey());
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
+   public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
+   {
+      if (command.getKey() != null)
+      {
+         loadIfNeeded(ctx, command.getKey());
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
+   public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable
+   {
+      if (command.getKey() != null)
+      {
+         loadIfNeeded(ctx, command.getKey());
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   private void loadIfNeeded(InvocationContext ctx, Object key) throws Throwable
+   {
+      if (dataContainer.containsKey(key) || !loader.exists(key))
+         return;
+
+      // Obtain a temporary lock to verify the key is not being concurrently added
+      boolean release = creator.acquireLock(ctx, key);
+      if (dataContainer.containsKey(key))
+      {
+         if (release)
+            creator.releaseLock(ctx, key);
+         return;
+      }
+
+      // Reuse the lock and create a new entry for loading
+      MVCCEntry n = creator.wrapEntryForWriting(ctx, key, false, true);
+      n = loadEntry(ctx, key, n);
+   }
+
+   /**
+    * Loads a node from disk; if it exists creates parent TreeNodes.
+    * If it doesn't exist on disk but in memory, clears the
+    * uninitialized flag, otherwise returns null.
+    */
+   private MVCCEntry loadEntry(InvocationContext ctx, Object key, MVCCEntry entry) throws Exception
+   {
+      if (trace) log.trace("loading entry " + key + " entry is " + entry);
+
+      Object value = loader.get(key);
+      boolean nodeExists = (value != null);
+      if (trace) log.trace("nodeExists " + nodeExists);
+
+      if (getStatisticsEnabled())
+      {
+         if (nodeExists)
+         {
+            cacheLoads++;
+         }
+         else
+         {
+            cacheMisses++;
+         }
+      }
+
+      if (value != null)
+      {
+         if (trace) log.trace("Entry is not null, loading");
+//         notifier.notifyNodeLoaded(fqn, true, Collections.emptyMap(), ctx);
+//         if (isActivation)
+//         {
+//            notifier.notifyNodeActivated(fqn, true, Collections.emptyMap(), ctx);
+//         }
+
+         entry.setValue(value);
+         entry.setValid(true);
+
+//         notifier.notifyNodeLoaded(fqn, false, nodeData, ctx);
+//         if (isActivation)
+//         {
+//            notifier.notifyNodeActivated(fqn, false, nodeData, ctx);
+//         }
+      }
+
+      return entry;
+   }
+
+   @ManagedAttribute(description = "number of cache loader node loads")
+   public long getCacheLoaderLoads()
+   {
+      return cacheLoads;
+   }
+
+   @ManagedAttribute(description = "number of cache loader node misses")
+   public long getCacheLoaderMisses()
+   {
+      return cacheMisses;
+   }
+
+   @ManagedOperation
+   public void resetStatistics()
+   {
+      cacheLoads = 0;
+      cacheMisses = 0;
+   }
+
+   @ManagedOperation
+   public Map<String, Object> dumpStatistics()
+   {
+      Map<String, Object> retval = new HashMap<String, Object>();
+      retval.put("CacheLoaderLoads", cacheLoads);
+      retval.put("CacheLoaderMisses", cacheMisses);
+      return retval;
+   }
+}
\ No newline at end of file


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheLoaderInterceptor.java
___________________________________________________________________
Name: svn:executable
   + *
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheStoreInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheStoreInterceptor.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheStoreInterceptor.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -0,0 +1,363 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.jmx.annotations.ManagedAttribute;
+import org.jboss.cache.jmx.annotations.ManagedOperation;
+import org.jboss.starobrno.commands.AbstractVisitor;
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.commands.tx.RollbackCommand;
+import org.jboss.starobrno.commands.write.ClearCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.PutMapCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.config.CacheLoaderConfig;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.interceptors.base.JmxStatsCommandInterceptor;
+import org.jboss.starobrno.loader.CacheLoader;
+import org.jboss.starobrno.loader.CacheLoaderManager;
+import org.jboss.starobrno.loader.Modification;
+import org.jboss.starobrno.loader.Modification.ModificationType;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+
+/**
+ * Writes modifications back to the store on the way out: stores modifications back
+ * through the CacheLoader, either after each method call (no TXs), or at TX commit.
+ *
+ * @author Bela Ban
+ * @version $Id$
+ */
+public class CacheStoreInterceptor extends JmxStatsCommandInterceptor
+{
+   private CacheLoaderConfig loaderConfig = null;
+   private TransactionManager txMgr = null;
+   private HashMap<GlobalTransaction, Integer> txStores = new HashMap<GlobalTransaction, Integer>();
+   private Map<GlobalTransaction, Set<Object>> preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Object>>();
+   private long cacheStores = 0;
+   CacheLoader<Object, Object> loader;
+   private CacheLoaderManager loaderManager;
+   private boolean statsEnabled;
+
+   public CacheStoreInterceptor()
+   {
+      log = LogFactory.getLog(getClass());
+      trace = log.isTraceEnabled();
+   }
+
+   @Inject
+   protected void init(CacheLoaderManager loaderManager, TransactionManager txManager, CacheLoaderConfig clConfig)
+   {
+      // never inject a CacheLoader at this stage - only a CacheLoaderManager, since the CacheLoaderManager only creates a CacheLoader instance when it @Starts.
+      this.loaderManager = loaderManager;
+      this.loaderConfig = clConfig;
+      txMgr = txManager;
+   }
+
+   @Start
+   protected void start()
+   {
+      // this should only happen after the CacheLoaderManager has started, since the CacheLoaderManager only creates the CacheLoader instance in its @Start method.
+      loader = loaderManager.getCacheLoader();
+      this.setStatisticsEnabled(configuration.getExposeManagementStatistics());
+   }
+
+   /**
+    * if this is a shared cache loader and the call is of remote origin, pass up the chain
+    */
+   public final boolean skip(InvocationContext ctx, VisitableCommand command)
+   {
+      if ((!ctx.isOriginLocal() && loaderConfig.isShared()) || ctx.getOptionOverrides().isSuppressPersistence())
+      {
+         if (trace)
+            log.trace("Passing up method call and bypassing this interceptor since the cache loader is shared and this call originated remotely.");
+         return true;
+      }
+      return false;
+   }
+
+   @Override
+   public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+   {
+      if (!skip(ctx, command) && inTransaction())
+      {
+         if (ctx.getTransactionContext().hasAnyModifications())
+         {
+            // this is a commit call.
+            GlobalTransaction gtx = command.getGlobalTransaction();
+            if (trace) log.trace("Calling loader.commit() for gtx " + gtx);
+            // sync call (a write) on the loader
+            // ignore modified FQNs
+            // List fqnsModified = getFqnsFromModificationList(txTable.get(globalTransaction).getCacheLoaderModifications());
+            try
+            {
+               loader.commit(gtx);
+            }
+            catch (Throwable t)
+            {
+               preparingTxs.remove(gtx);
+               throw t;
+            }
+            if (getStatisticsEnabled())
+            {
+               Integer puts = (Integer) txStores.get(gtx);
+               if (puts != null)
+               {
+                  cacheStores = cacheStores + puts;
+               }
+               txStores.remove(gtx);
+            }
+            return invokeNextInterceptor(ctx, command);
+         }
+         else
+         {
+            if (trace) log.trace("Commit called with no modifications; ignoring.");
+         }
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
+   public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+   {
+      if (!skip(ctx, command) && inTransaction())
+      {
+         if (trace) log.trace("transactional so don't put stuff in the cloader yet.");
+         if (ctx.getTransactionContext().hasAnyModifications())
+         {
+            GlobalTransaction gtx = command.getGlobalTransaction();
+            // this is a rollback method
+            if (preparingTxs.containsKey(gtx))
+            {
+               preparingTxs.remove(gtx);
+               loader.rollback(gtx);
+            }
+            if (getStatisticsEnabled()) txStores.remove(gtx);
+         }
+         else
+         {
+            if (trace) log.trace("Rollback called with no modifications; ignoring.");
+         }
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
+   public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
+   {
+      if (!skip(ctx, command) && inTransaction())
+      {
+         if (trace) log.trace("transactional so don't put stuff in the cloader yet.");
+         prepareCacheLoader(command.getGlobalTransaction(), ctx.getTransactionContext(), command.isOnePhaseCommit());
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
+   public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
+   {
+      if (!skip(ctx, command) && !inTransaction())
+      {
+         Object returnValue = loader.remove(command.getKey());
+         invokeNextInterceptor(ctx, command);
+         return returnValue;
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
+   public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+   {
+      Object returnValue = invokeNextInterceptor(ctx, command);
+      if (skip(ctx, command) || inTransaction())
+         return returnValue;
+
+      returnValue = loader.put(command.getKey(), command.getValue());
+      if (getStatisticsEnabled()) cacheStores++;
+
+      return returnValue;
+   }
+
+   @Override
+   public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable
+   {
+      Object returnValue = invokeNextInterceptor(ctx, command);
+      if (skip(ctx, command) || inTransaction())
+         return returnValue;
+
+      // Perhaps this should be optimized
+      Map<Object, Object> map = command.getMap();
+      List<Modification> modifications = toModifications(map);
+      loader.put(modifications);
+
+      if (getStatisticsEnabled()) cacheStores++;
+
+      return returnValue;
+   }
+
+   private static List<Modification> toModifications(Map<Object, Object> map)
+   {
+      List<Modification> modifications = new ArrayList<Modification>(map.size());
+      for (Map.Entry<Object, Object> entry : map.entrySet())
+         modifications.add(new Modification(ModificationType.PUT, entry.getKey(), entry.getValue()));
+      return modifications;
+   }
+
+   private boolean inTransaction() throws SystemException
+   {
+      return txMgr != null && txMgr.getTransaction() != null;
+   }
+
+   private void prepareCacheLoader(GlobalTransaction gtx, TransactionContext transactionContext, boolean onePhase) throws Throwable
+   {
+      if (transactionContext == null)
+      {
+         throw new Exception("transactionContext for transaction " + gtx + " not found in transaction table");
+      }
+      List<VisitableCommand> modifications = transactionContext.getModifications();
+      if (modifications.size() == 0)
+      {
+         if (trace) log.trace("Transaction has not logged any modifications!");
+         return;
+      }
+      if (trace) log.trace("Cache loader modification list: " + modifications);
+      StoreModificationsBuilder modsBuilder = new StoreModificationsBuilder(getStatisticsEnabled());
+      for (VisitableCommand cacheCommand : modifications)
+      {
+         cacheCommand.acceptVisitor(null, modsBuilder);
+      }
+      if (trace)
+      {
+         log.trace("Converted method calls to cache loader modifications.  List size: " + modsBuilder.modifications.size());
+      }
+      if (modsBuilder.modifications.size() > 0)
+      {
+         loader.prepare(gtx, modsBuilder.modifications, onePhase);
+
+         preparingTxs.put(gtx, modsBuilder.affectedKeys);
+         if (getStatisticsEnabled() && modsBuilder.putCount > 0)
+         {
+            txStores.put(gtx, modsBuilder.putCount);
+         }
+      }
+   }
+
+   public static class StoreModificationsBuilder extends AbstractVisitor
+   {
+
+      boolean generateStatistics;
+
+      int putCount;
+
+      Set<Object> affectedKeys = new HashSet<Object>();
+
+      List<Modification> modifications = new ArrayList<Modification>();
+
+      public StoreModificationsBuilder(boolean generateStatistics)
+      {
+         this.generateStatistics = generateStatistics;
+      }
+
+      @Override
+      public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+      {
+         if (generateStatistics) putCount++;
+         modifications.add(new Modification(Modification.ModificationType.PUT, command.getKey(), command.getValue()));
+         affectedKeys.add(command.getKey());
+         return null;
+      }
+
+      @Override
+      public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable
+      {
+         Map<Object, Object> map = command.getMap();
+         if (generateStatistics) putCount += map.size();
+         affectedKeys.addAll(map.keySet());
+         modifications.addAll(toModifications(map));
+         return null;
+      }
+
+      @Override
+      public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
+      {
+         modifications.add(new Modification(Modification.ModificationType.REMOVE, command.getKey(), null));
+         affectedKeys.add(command.getKey());
+         return null;
+      }
+
+      @Override
+      public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable
+      {
+         modifications.add(new Modification(Modification.ModificationType.CLEAR, null, null));
+         return null;
+      }
+   }
+
+   @ManagedOperation
+   public void resetStatistics()
+   {
+      cacheStores = 0;
+   }
+
+   @ManagedOperation
+   public Map<String, Object> dumpStatistics()
+   {
+      Map<String, Object> retval = new HashMap<String, Object>();
+      retval.put("CacheLoaderStores", cacheStores);
+      return retval;
+   }
+
+   @ManagedAttribute
+   public boolean getStatisticsEnabled()
+   {
+      return statsEnabled;
+   }
+
+   @ManagedAttribute
+   public void setStatisticsEnabled(boolean enabled)
+   {
+      this.statsEnabled = enabled;
+   }
+
+   @ManagedAttribute(description = "number of cache loader stores")
+   public long getCacheLoaderStores()
+   {
+      return cacheStores;
+   }
+
+}


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheStoreInterceptor.java
___________________________________________________________________
Name: svn:executable
   + *
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/SkipCheckChainedInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/SkipCheckChainedInterceptor.java	2008-11-20 04:05:09 UTC (rev 7177)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/SkipCheckChainedInterceptor.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -1,77 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno.interceptors.base;
-
-import org.jboss.starobrno.commands.VisitableCommand;
-import org.jboss.starobrno.context.InvocationContext;
-
-/**
- * This interceptor will call {@link #skipInterception(org.jboss.cache.InvocationContext ,org.jboss.cache.commands.VisitableCommand)} before invoking each visit method
- * (and the {@link #handleDefault(org.jboss.cache.InvocationContext , org.jboss.cache.commands.VisitableCommand)} method).  If
- * {@link #skipInterception(org.jboss.cache.InvocationContext ,org.jboss.cache.commands.VisitableCommand)} returns <tt>false</tt>, the invocation will be skipped
- * and passed up the interceptor chain instead.
- * <p/>
- * Instead of overriding visitXXX() methods, implementations should override their handleXXX() counterparts defined in this class
- * instead, as well as the {@link #skipInterception(org.jboss.cache.InvocationContext ,org.jboss.cache.commands.VisitableCommand)} method.
- * Also, instead of overriding {@link #handleDefault(org.jboss.cache.InvocationContext , org.jboss.cache.commands.VisitableCommand)}, implementors
- * should override {@link #handleAll(org.jboss.cache.InvocationContext , org.jboss.cache.commands.VisitableCommand)}.
- *
- * @author Mircea.Markus at jboss.com
- * @since 2.2
- */
-public abstract class SkipCheckChainedInterceptor extends CommandInterceptor
-{
-
-   // TODO!!!
-
-   @Override
-   public final Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
-   {
-      if (skipInterception(ctx, command))
-      {
-         return invokeNextInterceptor(ctx, command);
-      }
-      return handleAll(ctx, command);
-   }
-
-   /**
-    * Default implementation, which just passes the call up the interceptor chain
-    *
-    * @param ctx     invocation context
-    * @param command command
-    * @return return value
-    * @throws Throwable in the event of problems
-    */
-   protected Object handleAll(InvocationContext ctx, VisitableCommand command) throws Throwable
-   {
-      return invokeNextInterceptor(ctx, command);
-   }
-
-   /**
-    * Tests whether the command should be intercepted or not.  This is invoked before any of the handleXXX() methods.
-    *
-    * @param ctx     invocation context
-    * @param command command
-    * @return true if the invocation should skip the current interceptor and move on to the next in the chain, false otherwise.
-    */
-   protected abstract boolean skipInterception(InvocationContext ctx, VisitableCommand command);
-}
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractCacheLoader.java	2008-11-20 04:05:09 UTC (rev 7177)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractCacheLoader.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -124,7 +124,7 @@
          switch (m.getType())
          {
             case PUT:
-               put(m.getKey(), (V)m.getValue());
+               put((K)m.getKey(), (V)m.getValue());
                break;
             case REMOVE:
                remove(m.getKey());

Added: core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractDelegatingCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractDelegatingCacheLoader.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractDelegatingCacheLoader.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -0,0 +1,155 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.jboss.starobrno.marshall.EntryData;
+
+/**
+ * AbstractDelegatingCacheLoader provides standard functionality for a cache loader that simply delegates each
+ * operation defined in the cache loader interface to the underlying cache loader, basically acting as a proxy to the
+ * real cache loader.
+ * <p/>
+ * Any cache loader implementation that extends this class would be required to override any of the methods in
+ * order to provide a different or added behaviour.
+ *
+ * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
+ */
+public abstract class AbstractDelegatingCacheLoader<K,V> extends AbstractCacheLoader<K,V>
+{
+   private CacheLoader<K,V> cacheLoader;
+
+   public AbstractDelegatingCacheLoader(CacheLoader<K,V> cacheLoader)
+   {
+      this.cacheLoader = cacheLoader;
+   }
+
+   public void clear()
+   {
+      cacheLoader.clear();
+   }
+
+   public void commit(Object tx)
+   {
+      cacheLoader.commit(tx);
+   }
+
+   public void create()
+   {
+      cacheLoader.create();
+   }
+
+   public void destroy()
+   {
+      cacheLoader.destroy();
+   }
+
+   public boolean exists(Object key)
+   {
+      return cacheLoader.exists(key);
+   }
+
+   public V get(Object key)
+   {
+      return cacheLoader.get(key);
+   }
+
+   public List<EntryData<K, V>> getAllEntries()
+   {
+      return cacheLoader.getAllEntries();
+   }
+
+   public IndividualCacheLoaderConfig getConfig()
+   {
+      return cacheLoader.getConfig();
+   }
+
+   public void loadEntireState(ObjectOutputStream os)
+   {
+      cacheLoader.loadEntireState(os);
+   }
+
+   public void prepare(Object tx, List<Modification> modifications, boolean one_phase)
+   {
+      cacheLoader.prepare(tx, modifications, one_phase);
+   }
+
+   public V put(K key, V value)
+   {
+      return cacheLoader.put(key, value);
+   }
+
+   public void put(List<Modification> modifications)
+   {
+      cacheLoader.put(modifications);
+   }
+
+   public V remove(Object key)
+   {
+      return cacheLoader.remove(key);
+   }
+
+   public void rollback(Object tx)
+   {
+      cacheLoader.rollback(tx);
+   }
+
+   public void setCache(CacheSPI<K, V> c)
+   {
+      cacheLoader.setCache(c);
+   }
+
+   public void setConfig(IndividualCacheLoaderConfig config)
+   {
+      cacheLoader.setConfig(config);
+   }
+
+   public void start()
+   {
+      cacheLoader.start();
+   }
+
+   public void stop()
+   {
+      cacheLoader.stop();
+   }
+
+   public void storeEntireState(ObjectInputStream is)
+   {
+      cacheLoader.storeEntireState(is);
+   }
+
+   public CacheLoader<K,V> getCacheLoader()
+   {
+      return cacheLoader;
+   }
+
+   public void setCacheLoader(CacheLoader<K,V> cacheLoader)
+   {
+      this.cacheLoader = cacheLoader;
+   }
+}


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractDelegatingCacheLoader.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoader.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoader.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -0,0 +1,341 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.util.Immutables;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+
+/**
+ * The AsyncCacheLoader is a delegating cache loader that extends
+ * AbstractDelegatingCacheLoader overriding methods to that should not
+ * just delegate the operation to the underlying cache loader.
+ * <p/>
+ * Read operations are done synchronously, while write (CRUD - Create, Remove,
+ * Update, Delete) operations are done asynchronously.  There is no provision
+ * for exception handling at the moment for problems encountered with the
+ * underlying CacheLoader during a CRUD operation, and the exception is just
+ * logged.
+ * <p/>
+ * When configuring the CacheLoader, use the following attribute:
+ * <p/>
+ * <code>
+ * &lt;attribute name="CacheLoaderAsynchronous"&gt;true&lt;/attribute&gt;
+ * </code>
+ * <p/>
+ * to define whether cache loader operations are to be asynchronous.  If not
+ * specified, a cache loader operation is assumed synchronous.
+ * <p/>
+ * <p/>
+ * The following additional parameters are available:
+ * <dl>
+ * <dt>cache.async.batchSize</dt>
+ * <dd>Number of modifications to commit in one transaction, default is
+ * 100. The minimum batch size is 1.</dd>
+ * <dt>cache.async.pollWait</dt>
+ * <dd>How long to wait before processing an incomplete batch, in
+ * milliseconds.  Default is 100.  Set this to 0 to not wait before processing
+ * available records.</dd>
+ * <dt>cache.async.returnOld</dt>
+ * <dd>If <code>true</code>, this loader returns the old values from {@link
+ * #put} and {@link #remove} methods.  Otherwise, these methods always return
+ * null.  Default is true.  <code>false</code> improves the performance of these
+ * operations.</dd>
+ * <dt>cache.async.queueSize</dt>
+ * <dd>Maximum number of entries to enqueue for asynchronous processing.
+ * Lowering this size may help prevent out-of-memory conditions.  It also may
+ * help to prevent less records lost in the case of JVM failure.  Default is
+ * 10,000 operations.</dd>
+ * <dt>cache.async.put</dt>
+ * <dd>If set to false, all {@link #put} operations will be processed
+ * synchronously, and then only the {@link #remove} operations will be
+ * processed asynchronously. This mode may be useful for processing
+ * expiration of messages within a separate thread and keeping other
+ * operations synchronous for reliability.
+ * </dd>
+ * <dt>cache.async.threadPoolSize</dt>
+ * <dd>The size of the async processor thread pool.  Defaults to <tt>1</tt>.  This
+ * property is new in JBoss Cache 3.0.</dd>
+ * </dl>
+ * For increased performance for many smaller transactions, use higher values
+ * for <code>cache.async.batchSize</code> and
+ * <code>cache.async.pollWait</code>.  For larger sized records, use a smaller
+ * value for <code>cache.async.queueSize</code>.
+ *
+ * @author Manik Surtani (manik.surtani at jboss.com)
+ */
+public class AsyncCacheLoader<K,V> extends AbstractDelegatingCacheLoader<K,V>
+{
+
+   private static final Log log = LogFactory.getLog(AsyncCacheLoader.class);
+   private static final boolean trace = log.isTraceEnabled();
+
+   private static AtomicInteger threadId = new AtomicInteger(0);
+
+   /**
+    * Default limit on entries to process asynchronously.
+    */
+   private static final int DEFAULT_QUEUE_SIZE = 10000;
+
+   private AsyncCacheLoaderConfig config;
+   private ExecutorService executor;
+   private AtomicBoolean stopped = new AtomicBoolean(true);
+   private BlockingQueue<Modification> queue = new ArrayBlockingQueue<Modification>(DEFAULT_QUEUE_SIZE);
+   private List<Future> processorFutures;
+
+   public AsyncCacheLoader()
+   {
+      super(null);
+   }
+
+   public AsyncCacheLoader(CacheLoader<K,V> cacheLoader)
+   {
+      super(cacheLoader);
+   }
+
+   @Override
+   public void setConfig(IndividualCacheLoaderConfig base)
+   {
+      if (base instanceof AsyncCacheLoaderConfig)
+      {
+         config = (AsyncCacheLoaderConfig) base;
+      }
+      else
+      {
+         config = new AsyncCacheLoaderConfig(base);
+      }
+
+      if (config.getQueueSize() > 0)
+      {
+         queue = new ArrayBlockingQueue<Modification>(config.getQueueSize());
+      }
+
+      super.setConfig(base);
+   }
+
+   @Override
+   public V put(K key, V value)
+   {
+      if (config.getUseAsyncPut())
+      {
+         V oldValue = get(key);
+         Modification mod = new Modification(Modification.ModificationType.PUT, key, value);
+         enqueue(mod);
+         return oldValue;
+      }
+      else
+      {
+         return super.put(key, value);
+      }
+   }
+
+   @Override
+   public void put(List<Modification> modifications)
+   {
+      if (config.getUseAsyncPut())
+      {
+         for (Modification modification : modifications)
+         {
+            enqueue(modification);
+         }
+      }
+      else
+      {
+         super.put(modifications);
+      }
+   }
+
+   @Override
+   public V remove(Object key)
+   {
+      V oldValue = get(key);
+      Modification mod = new Modification(Modification.ModificationType.REMOVE, key, null);
+      enqueue(mod);
+      return oldValue;
+   }
+
+   @Override
+   public void start()
+   {
+      if (log.isInfoEnabled()) log.info("Async cache loader starting: " + this);
+      stopped.set(false);
+      super.start();
+      executor = Executors.newFixedThreadPool(config.getThreadPoolSize(), new ThreadFactory()
+      {
+         public Thread newThread(Runnable r)
+         {
+            Thread t = new Thread(r, "AsyncCacheLoader-" + threadId.getAndIncrement());
+            t.setDaemon(true);
+            return t;
+         }
+      });
+      processorFutures = new ArrayList<Future>(config.getThreadPoolSize());
+      for (int i = 0; i < config.getThreadPoolSize(); i++) processorFutures.add(executor.submit(new AsyncProcessor()));
+   }
+
+   @Override
+   public void stop()
+   {
+      stopped.set(true);
+      if (executor != null)
+      {
+         for (Future f : processorFutures) f.cancel(true);
+         executor.shutdown();
+         try
+         {
+            boolean terminated = executor.isTerminated();
+            while (!terminated)
+            {
+               terminated = executor.awaitTermination(60, TimeUnit.SECONDS);
+            }
+         }
+         catch (InterruptedException e)
+         {
+            Thread.currentThread().interrupt();
+         }
+      }
+      executor = null;
+      super.stop();
+   }
+
+   private void enqueue(final Modification mod)
+   {
+      if (stopped.get())
+      {
+         throw new CacheException("AsyncCacheLoader stopped; no longer accepting more entries.");
+      }
+      if (trace) log.trace("Enqueuing modification " + mod);
+      try
+      {
+         queue.put(mod);
+      }
+      catch (InterruptedException e)
+      {
+         Thread.currentThread().interrupt();
+      }
+   }
+
+   /**
+    * Processes (by batch if possible) a queue of {@link Modification}s.
+    *
+    * @author manik surtani
+    */
+   private class AsyncProcessor implements Runnable
+   {
+      // Modifications to invoke as a single put
+      private final List<Modification> mods = new ArrayList<Modification>(config.getBatchSize());
+
+      public void run()
+      {
+         while (!Thread.interrupted())
+         {
+            try
+            {
+               run0();
+            }
+            catch (InterruptedException e)
+            {
+               break;
+            }
+         }
+
+         try
+         {
+            if (trace) log.trace("process remaining batch " + mods.size());
+            put(mods);
+            if (trace) log.trace("process remaining queued " + queue.size());
+            while (!queue.isEmpty())
+            {
+               run0();
+            }
+         }
+         catch (InterruptedException e)
+         {
+            log.trace("remaining interrupted");
+         }
+      }
+
+      private void run0() throws InterruptedException
+      {
+         log.trace("Checking for modifications");
+         int i = queue.drainTo(mods, config.getBatchSize());
+         if (i == 0)
+         {
+            Modification m = queue.take();
+            mods.add(m);
+         }
+
+         if (trace)
+         {
+            log.trace("Calling put(List) with " + mods.size() + " modifications");
+         }
+         put(mods);
+         mods.clear();
+      }
+
+      private void put(List<Modification> mods)
+      {
+         try
+         {
+            AsyncCacheLoader.super.put(mods);
+         }
+         catch (Exception e)
+         {
+            if (log.isWarnEnabled()) log.warn("Failed to process async modifications: " + e);
+            if (log.isDebugEnabled()) log.debug("Exception: ", e);
+         }
+      }
+   }
+
+   @Override
+   public String toString()
+   {
+      return super.toString() +
+            " delegate=[" + super.getCacheLoader() + "]" +
+            " stopped=" + stopped +
+            " batchSize=" + config.getBatchSize() +
+            " returnOld=" + config.getReturnOld() +
+            " asyncPut=" + config.getUseAsyncPut() +
+            " threadPoolSize=" + config.getThreadPoolSize() +
+            " queue.remainingCapacity()=" + queue.remainingCapacity() +
+            " queue.peek()=" + queue.peek();
+   }
+
+}


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoader.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoaderConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoaderConfig.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoaderConfig.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -0,0 +1,174 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+
+import java.util.Properties;
+
+public class AsyncCacheLoaderConfig extends IndividualCacheLoaderConfig
+{
+   /**
+    * The serialVersionUID
+    */
+   private static final long serialVersionUID = 5038037589485991681L;
+
+   private int batchSize = 100;
+   private boolean returnOld = true;
+   private int queueSize = 0;
+   private boolean useAsyncPut = true;
+   private int threadPoolSize = 1;
+
+   /**
+    * Default constructor.
+    */
+   public AsyncCacheLoaderConfig()
+   {
+      setClassName(AsyncCacheLoader.class.getName());
+   }
+
+   /**
+    * For use by {@link AsyncCacheLoader}.
+    *
+    * @param base generic config object created by XML parsing.
+    */
+   AsyncCacheLoaderConfig(IndividualCacheLoaderConfig base)
+   {
+      setClassName(AsyncCacheLoader.class.getName());
+      populateFromBaseConfig(base);
+   }
+
+   public int getThreadPoolSize()
+   {
+      return threadPoolSize;
+   }
+
+   public void setThreadPoolSize(int threadPoolSize)
+   {
+      testImmutability("threadPoolSize");
+      this.threadPoolSize = threadPoolSize;
+   }
+
+   public int getBatchSize()
+   {
+      return batchSize;
+   }
+
+   public void setBatchSize(int batchSize)
+   {
+      testImmutability("batchSize");
+      this.batchSize = batchSize;
+   }
+
+   public int getQueueSize()
+   {
+      return queueSize;
+   }
+
+   public void setQueueSize(int queueSize)
+   {
+      testImmutability("queueSize");
+      this.queueSize = queueSize;
+   }
+
+   public boolean getReturnOld()
+   {
+      return returnOld;
+   }
+
+   public void setReturnOld(boolean returnOld)
+   {
+      testImmutability("returnOld");
+      this.returnOld = returnOld;
+   }
+
+   public boolean getUseAsyncPut()
+   {
+      return useAsyncPut;
+   }
+
+   public void setUseAsyncPut(boolean useAsyncPut)
+   {
+      testImmutability("useAsyncPut");
+      this.useAsyncPut = useAsyncPut;
+   }
+
+   @Override
+   public void setProperties(Properties props)
+   {
+      super.setProperties(props);
+      String s;
+
+      s = props.getProperty("cache.async.batchSize");
+      if (s != null) batchSize = Integer.parseInt(s);
+      if (batchSize <= 0) throw new IllegalArgumentException("Invalid batch size: " + batchSize);
+
+      s = props.getProperty("cache.async.threadPoolSize");
+      if (s != null) threadPoolSize = Integer.parseInt(s);
+      if (threadPoolSize <= 0) throw new IllegalArgumentException("Invalid thread pool size: " + threadPoolSize);
+
+
+      s = props.getProperty("cache.async.returnOld");
+      if (s != null) returnOld = Boolean.valueOf(s);
+
+      s = props.getProperty("cache.async.queueSize");
+      if (s != null) queueSize = Integer.parseInt(s);
+
+      s = props.getProperty("cache.async.put");
+      if (s != null) useAsyncPut = Boolean.valueOf(s);
+
+
+   }
+
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (obj instanceof AsyncCacheLoaderConfig && equalsExcludingProperties(obj))
+      {
+         AsyncCacheLoaderConfig other = (AsyncCacheLoaderConfig) obj;
+         return (batchSize == other.batchSize)
+               && (queueSize == other.queueSize)
+               && (returnOld == other.returnOld)
+               && (useAsyncPut == other.useAsyncPut);
+      }
+      return false;
+   }
+
+   @Override
+   public int hashCode()
+   {
+      int result = hashCodeExcludingProperties();
+      result = 31 * result + batchSize;
+      result = 31 * result + queueSize;
+      result = 31 * result + (returnOld ? 0 : 1);
+      result = 31 * result + (useAsyncPut ? 0 : 1);
+      return result;
+   }
+
+   @Override
+   public AsyncCacheLoaderConfig clone() throws CloneNotSupportedException
+   {
+      return (AsyncCacheLoaderConfig) super.clone();
+   }
+
+
+}
\ No newline at end of file


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoaderConfig.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java	2008-11-20 04:05:09 UTC (rev 7177)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -115,7 +115,7 @@
     * node does not exist, all parent nodes from the root down are created
     * automatically.  Returns the old value.
     */
-   V put(Object key, V value) ;
+   V put(K key, V value) ;
 
    /**
     * Removes everything from this cache-loader
@@ -139,6 +139,14 @@
 
 
    /**
+    * Applies all modifications to the backend store.
+    * Changes may be applied in a single operation.
+    *
+    * @param modifications A List<Modification> of modifications
+    */
+   void put(List<Modification> modifications);
+
+   /**
     * Prepares a list of modifications. For example, for a DB-based CacheLoader:
     * <ol>
     * <li>Create a local (JDBC) transaction

Added: core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoaderManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoaderManager.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoaderManager.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -0,0 +1,447 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.config.CacheLoaderConfig;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.config.ConfigurationException;
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig.SingletonStoreConfig;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.factories.annotations.Stop;
+import org.jboss.starobrno.util.ReflectionUtil;
+
+/**
+ * Manages all cache loader functionality.  This class is typically initialised with an XML DOM Element,
+ * represeting a cache loader configuration, or a {@link org.jboss.cache.config.CacheLoaderConfig} object.
+ * <p/>
+ * Usage:
+ * <p/>
+ * <code>
+ * CacheLoaderManager manager = new CacheLoaderManager();
+ * manager.setConfig(myXmlSnippet, myTreeCache);
+ * CacheLoader loader = manager.getCacheLoader();
+ * </code>
+ * <p/>
+ * The XML configuration passed in would typically look like:
+ * <p/>
+ * <code><![CDATA[
+ * <p/>
+ * <config>
+ * <passivation>false</passivation>
+ * <preload>/</preload>
+ * <p/>
+ * <cacheloader>
+ * <class>org.jboss.cache.loader.FileCacheLoader</class>
+ * <async>true</async>
+ * <fetchPersistentState>false</fetchPersistentState>
+ * <ignoreModifications>false</ignoreModifications>
+ * <properties>
+ * location=/tmp/file
+ * </properties>
+ * </cacheloader>
+ * </config>
+ * ]]>
+ * </code>
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ * @author <a href="mailto:galder.zamarreno at jboss.com">Galder Zamarreno</a>
+ */
+public class CacheLoaderManager
+{
+   private static final Log log = LogFactory.getLog(CacheLoaderManager.class);
+   private CacheLoaderConfig config;
+   private CacheSPI<Object,Object> cache;
+   private CacheLoader<Object,Object> loader;
+   private boolean fetchPersistentState;
+   private Configuration configuration;
+   private ComponentRegistry registry;
+
+   @Inject
+   public void injectDependencies(CacheSPI<Object, Object> cache, Configuration configuration, ComponentRegistry registry)
+   {
+      // TODO: Inject CacheSPI once we have the cache loaders not relying on a tree structure
+      this.config = configuration.getCacheLoaderConfig();
+//      this.cache = cache;
+      this.configuration = configuration;
+      this.registry = registry;
+
+      if (config != null)
+      {
+         try
+         {
+            loader = createCacheLoader();
+         }
+         catch (Exception e)
+         {
+            throw new CacheException("Unable to create cache loaders", e);
+         }
+      }
+   }
+
+   /**
+    * Sets a configuration object and creates a cacheloader accordingly.  Primarily used for testing.
+    *
+    * @param config
+    * @param cache
+    * @throws CacheException
+    */
+   public void setConfig(CacheLoaderConfig config, CacheSPI<Object,Object> cache, Configuration configuration) throws CacheException
+   {
+      this.config = config == null ? configuration.getCacheLoaderConfig() : config;
+      this.cache = cache;
+      this.configuration = configuration;
+
+      if (config != null)
+      {
+         try
+         {
+            loader = createCacheLoader();
+         }
+         catch (Exception e)
+         {
+            throw new CacheException("Unable to create cache loaders", e);
+         }
+      }
+   }
+
+   /**
+    * Creates the cache loader based on a cache loader config passed in.
+    *
+    * @return a configured cacheloader
+    * @throws IllegalAccessException
+    * @throws InstantiationException
+    * @throws ClassNotFoundException
+    */
+   private CacheLoader<Object, Object> createCacheLoader() throws Exception
+   {
+      CacheLoader<Object,Object> tmpLoader;
+      // if we only have a single cache loader configured in the chaining cacheloader then
+      // don't use a chaining cache loader at all.
+
+      ArrayList<IndividualCacheLoaderConfig> finalConfigs =
+            new ArrayList<IndividualCacheLoaderConfig>();
+
+      // also if we are using passivation then just directly use the first cache loader.
+      if (config.useChainingCacheLoader())
+      {
+         // create chaining cache loader.
+         ChainingCacheLoader<Object,Object> ccl =  new ChainingCacheLoader<Object,Object>();
+         tmpLoader = ccl;
+         Iterator<IndividualCacheLoaderConfig> it = config.getIndividualCacheLoaderConfigs().iterator();
+
+         // only one cache loader may have fetchPersistentState to true.
+         int numLoadersWithFetchPersistentState = 0;
+         while (it.hasNext())
+         {
+            CacheLoaderConfig.IndividualCacheLoaderConfig cfg = (CacheLoaderConfig.IndividualCacheLoaderConfig) it.next();
+            if (cfg.isFetchPersistentState())
+            {
+               numLoadersWithFetchPersistentState++;
+               fetchPersistentState = true;
+            }
+            if (numLoadersWithFetchPersistentState > 1)
+            {
+               throw new Exception("Invalid cache loader configuration!!  Only ONE cache loader may have fetchPersistentState set to true.  Cache will not start!");
+            }
+
+            assertNotSingletonAndShared(cfg);
+
+            CacheLoader<Object,Object> l = createCacheLoader(cfg, cache);
+            cfg = l.getConfig();
+            finalConfigs.add(cfg);
+            // Only loaders that deal w/ state transfer factor into
+            // whether the overall chain supports ExtendedCacheLoader
+            ccl.addCacheLoader(l, cfg);
+
+         }
+      }
+      else
+      {
+         CacheLoaderConfig.IndividualCacheLoaderConfig cfg = config.getIndividualCacheLoaderConfigs().get(0);
+         tmpLoader = createCacheLoader(cfg, cache);
+         finalConfigs.add(tmpLoader.getConfig() == null ? cfg : tmpLoader.getConfig());
+         fetchPersistentState = cfg.isFetchPersistentState();
+         assertNotSingletonAndShared(cfg);
+      }
+
+      // Update the config with those actually used by the loaders
+      ReflectionUtil.setValue(config, "accessible", true);
+      config.setIndividualCacheLoaderConfigs(finalConfigs);
+
+      return tmpLoader;
+   }
+
+   private void assertNotSingletonAndShared(IndividualCacheLoaderConfig cfg)
+   {
+      SingletonStoreConfig ssc = cfg.getSingletonStoreConfig();
+      if (ssc != null && ssc.isSingletonStoreEnabled() && config.isShared())
+         throw new ConfigurationException("Invalid cache loader configuration!!  If a cache loader is configured as a singleton, the cache loader cannot be shared in a cluster!");
+   }
+
+   /**
+    * Creates the cache loader based on the configuration.
+    *
+    * @param cfg
+    * @param cache
+    * @return a cache loader
+    * @throws Exception
+    */
+   @SuppressWarnings("deprecation")
+   private CacheLoader<Object, Object> createCacheLoader(CacheLoaderConfig.IndividualCacheLoaderConfig cfg, CacheSPI<Object, Object> cache) throws Exception
+   {
+      // create loader
+      CacheLoader<Object, Object> tmpLoader = cfg.getCacheLoader() == null ? createInstance(cfg.getClassName()) : cfg.getCacheLoader();
+
+      if (tmpLoader != null)
+      {
+         // async?
+         if (cfg.isAsync())
+         {
+            CacheLoader<Object, Object> asyncDecorator;
+            asyncDecorator = new AsyncCacheLoader<Object, Object>(tmpLoader);
+            tmpLoader = asyncDecorator;
+         }
+
+         if (cfg.isIgnoreModifications())
+         {
+            AbstractDelegatingCacheLoader<Object, Object> readOnlyDecorator;
+            readOnlyDecorator = new ReadOnlyDelegatingCacheLoader<Object, Object>(tmpLoader);
+            tmpLoader = readOnlyDecorator;
+         }
+
+         // singleton?
+         SingletonStoreConfig ssc = cfg.getSingletonStoreConfig();
+         if (ssc != null && ssc.isSingletonStoreEnabled())
+         {
+            Object decorator = createInstance(ssc.getSingletonStoreClass());
+
+            /* class providing singleton store functionality must extend AbstractDelegatingCacheLoader so that
+            * underlying cacheloader can be set. */
+            if (decorator instanceof AbstractDelegatingCacheLoader)
+            {
+               @SuppressWarnings("unchecked")
+               AbstractDelegatingCacheLoader<Object, Object> singletonDecorator = (AbstractDelegatingCacheLoader<Object, Object>) decorator;
+               /* set the cache loader to where calls will be delegated by the class providing the singleton
+               * store functionality. */
+               singletonDecorator.setCacheLoader(tmpLoader);
+               tmpLoader = singletonDecorator;
+            }
+            else
+            {
+               throw new Exception("Invalid cache loader configuration!! Singleton store implementation class must extend org.jboss.cache.loader.AbstractDelegatingCacheLoader");
+            }
+         }
+
+         // load props
+         tmpLoader.setConfig(cfg);
+
+         setCacheInLoader(cache, tmpLoader);
+         // we should not be creating/starting the cache loader here - this should be done in the separate
+         // startCacheLoader() method.
+         //           tmpLoader.create();
+         //           tmpLoader.start();
+      }
+      return tmpLoader;
+   }
+
+   /**
+    * Sets the cache instance associated with the given cache loader. This method was created for testing purpouses
+    * so that it can be overriden in the mock version of the CacheLoaderManager.
+    *
+    * @param c      instance of cache to be set in cache loader
+    * @param loader cache loader to which assign the cache instance
+    */
+   protected void setCacheInLoader(CacheSPI<Object,Object> c, CacheLoader<Object,Object> loader)
+   {
+      loader.setCache(c);
+   }
+
+   @SuppressWarnings("unchecked")
+   private CacheLoader<Object, Object> createInstance(String className) throws ClassNotFoundException, IllegalAccessException, InstantiationException
+   {
+      if (log.isTraceEnabled()) log.trace("instantiating class " + className);
+      Class<?> cl = Thread.currentThread().getContextClassLoader().loadClass(className);
+      return (CacheLoader<Object, Object>) cl.newInstance();
+   }
+
+   /**
+    * Performs a preload on the cache based on the cache loader preload configs used when configuring the cache.
+    *
+    * @throws Exception
+    */
+   @Start(priority = 50)
+   public void preloadCache() throws CacheException
+   {
+      if (loader != null)
+      {
+         if (config.getPreload() == null || config.getPreload().equals("")) return;
+         if (log.isDebugEnabled()) log.debug("preloading transient state from cache loader " + loader);
+         StringTokenizer st = new StringTokenizer(config.getPreload(), ",");
+         long start, stop, total;
+         start = System.currentTimeMillis();
+         while (st.hasMoreTokens())
+         {
+            String tok = st.nextToken().trim();
+            if (log.isTraceEnabled()) log.trace("preloading " + tok);
+            preload(tok);
+         }
+
+         stop = System.currentTimeMillis();
+         total = stop - start;
+         if (log.isDebugEnabled())
+         {
+            log.debug("preloading transient state from cache loader was successful (in " + total + " milliseconds)");
+         }
+      }
+   }
+
+   /**
+    * Preloads a specific Fqn into the cache from the configured cacheloader
+    *
+    * @param fqn             fqn to preload
+    * @param preloadParents  whether we preload parents
+    * @param preloadChildren whether we preload children
+    * @throws CacheException if we are unable to preload
+    */
+   public void preload(String key) throws CacheException
+   {
+
+      cache.getInvocationContext().getOptionOverrides().setSkipDataGravitation(true);
+      cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
+      // 1. Load the attributes first
+      //  but this will go down the entire damn chain!!  :S
+      cache.get(key);
+   }
+
+   /**
+    * Returns the configuration element of the cache loaders
+    */
+   public CacheLoaderConfig getCacheLoaderConfig()
+   {
+      return config;
+   }
+
+   /**
+    * Returns the cache loader
+    */
+   public CacheLoader<Object, Object> getCacheLoader()
+   {
+      return loader;
+   }
+
+   /**
+    * Tests if we're using passivation
+    */
+   public boolean isPassivation()
+   {
+      return config.isPassivation();
+   }
+
+   /**
+    * Returns true if at least one of the configured cache loaders has set fetchPersistentState to true.
+    */
+   public boolean isFetchPersistentState()
+   {
+      return fetchPersistentState;
+   }
+
+   @Stop
+   public void stopCacheLoader()
+   {
+      if (loader != null)
+      {
+         // stop the cache loader
+         loader.stop();
+         // destroy the cache loader
+         loader.destroy();
+      }
+   }
+
+   @Start
+   public void startCacheLoader() throws CacheException
+   {
+      if (config == null) config = configuration.getCacheLoaderConfig();
+
+      if (config != null && loader == null)
+      {
+         try
+         {
+            loader = createCacheLoader();
+         }
+         catch (Exception e)
+         {
+            throw new CacheException("Unable to create cache loaders", e);
+         }
+      }
+
+
+      if (loader != null)
+      {
+         try
+         {
+            // wire any deps.
+            registry.wireDependencies(loader);
+
+            // create the cache loader
+            loader.create();
+            // start the cache loader
+            loader.start();
+
+            purgeLoaders(false);
+         }
+         catch (Exception e)
+         {
+            throw new CacheException("Unable to start cache loaders", e);
+         }
+         fetchPersistentState = fetchPersistentState || (loader.getConfig() != null && loader.getConfig().isFetchPersistentState());
+         fetchPersistentState = fetchPersistentState || (config != null && config.isFetchPersistentState());
+      }
+   }
+
+   public void purgeLoaders(boolean force) throws Exception
+   {
+      if ((loader instanceof ChainingCacheLoader) && !force)
+      {
+         ((ChainingCacheLoader<?,?>) loader).purgeIfNecessary();
+      }
+      else
+      {
+         CacheLoaderConfig.IndividualCacheLoaderConfig first = getCacheLoaderConfig().getFirstCacheLoaderConfig();
+         if (force ||
+               (first != null && first.isPurgeOnStartup()))
+         {
+            loader.clear();
+         }
+      }
+   }
+}


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoaderManager.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: core/branches/flat/src/main/java/org/jboss/starobrno/loader/ChainingCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/ChainingCacheLoader.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/loader/ChainingCacheLoader.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -0,0 +1,389 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.starobrno.config.CacheLoaderConfig;
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.marshall.EntryData;
+
+/**
+ * This decorator is used whenever more than one cache loader is configured.  READ operations are directed to
+ * each of the cache loaders (in the order which they were configured) until a non-null (or non-empty in the case
+ * of retrieving collection objects) result is achieved.
+ * <p/>
+ * WRITE operations are propagated to ALL registered cacheloaders that specified set ignoreModifications to false.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ */
+public class ChainingCacheLoader<K,V> extends AbstractCacheLoader<K,V>
+{
+
+   private final List<CacheLoader<K,V>> cacheLoaders = new ArrayList<CacheLoader<K,V>>(2);
+   private final List<CacheLoader<K,V>> writeCacheLoaders = new ArrayList<CacheLoader<K,V>>(2);
+   private final List<CacheLoaderConfig.IndividualCacheLoaderConfig> cacheLoaderConfigs = new ArrayList<CacheLoaderConfig.IndividualCacheLoaderConfig>(2);
+   private ComponentRegistry registry;
+
+   /**
+    * Sets the configuration. Will be called before {@link #create()} and {@link #start()}
+    *
+    * @param config ignored
+    */
+   public void setConfig(IndividualCacheLoaderConfig config)
+   {
+      // don't do much here?
+   }
+
+   public IndividualCacheLoaderConfig getConfig()
+   {
+      return null;
+   }
+
+   @Inject
+   public void injectDependencies(ComponentRegistry registry)
+   {
+      this.registry = registry;
+   }
+
+
+   public V get(Object key)
+   {
+      V answer = null;
+      for (CacheLoader<K,V> l : cacheLoaders)
+      {
+         answer = l.get(key);
+         if (answer != null) break;
+      }
+      return answer;
+   }
+
+   /**
+    * Checks whether the CacheLoader has a node with Fqn
+    *
+    * @param name
+    * @return True if node exists, false otherwise
+    */
+   public boolean exists(Object key)
+   {
+      boolean answer = false;
+      for (CacheLoader<K,V> l : cacheLoaders)
+      {
+         answer = l.exists(key);
+         if (answer) break;
+      }
+      return answer;
+   }
+
+   /**
+    * Inserts key and value into the attributes hashmap of the given node. If the node does not exist, all
+    * parent nodes from the root down are created automatically. Returns the old value
+    */
+   public V put(K key, V value)
+   {
+      V answer = null;
+      boolean isFirst = true;
+      for (CacheLoader<K,V> l : writeCacheLoaders)
+      {
+         V tAnswer = l.put(key, value);
+         if (isFirst)
+         {
+            answer = tAnswer;
+            isFirst = false;
+         }
+
+      }
+      return answer;
+   }
+
+   /**
+    * Inserts all modifications to the backend store. Overwrite whatever is already in
+    * the datastore.
+    *
+    * @param modifications A List<Modification> of modifications
+    * @throws Exception
+    */
+   @Override
+   public void put(List<Modification> modifications)
+   {
+      for (CacheLoader<K,V> l : writeCacheLoaders)
+      {
+         l.put(modifications);
+      }
+   }
+
+   /**
+    * Removes the given key and value. No-op if key doesn't exist.
+    * Returns the first response from the loader chain.
+    */
+   public V remove(Object key)
+   {
+      V answer = null;
+      boolean isFirst = true;
+      for (CacheLoader<K,V> l : writeCacheLoaders)
+      {
+         V tAnswer = l.remove(key);
+         if (isFirst)
+         {
+            answer = tAnswer;
+            isFirst = false;
+         }
+      }
+      return answer;
+   }
+
+   /**
+    * Prepare the modifications. For example, for a DB-based CacheLoader:
+    * <ol>
+    * <li>Create a local (JDBC) transaction
+    * <li>Associate the local transaction with <code>tx</code> (tx is the key)
+    * <li>Execute the coresponding SQL statements against the DB (statements derived from modifications)
+    * </ol>
+    * For non-transactional CacheLoader (e.g. file-based), this could be a null operation
+    *
+    * @param tx            The transaction, just used as a hashmap key
+    * @param modifications List<Modification>, a list of all modifications within the given transaction
+    * @param one_phase     Persist immediately and (for example) commit the local JDBC transaction as well. When true,
+    *                      we won't get a {@link #commit(Object)} or {@link #rollback(Object)} method call later
+    * @throws Exception
+    */
+   @Override
+   public void prepare(Object tx, List<Modification> modifications, boolean one_phase)
+   {
+      for (CacheLoader<K,V> l : writeCacheLoaders)
+      {
+         l.prepare(tx, modifications, one_phase);
+      }
+   }
+
+   /**
+    * Commit the transaction. A DB-based CacheLoader would look up the local JDBC transaction asociated
+    * with <code>tx</code> and commit that transaction<br/>
+    * Non-transactional CacheLoaders could simply write the data that was previously saved transiently under the
+    * given <code>tx</code> key, to (for example) a file system (note this only holds if the previous prepare() did
+    * not define one_phase=true
+    *
+    * @param tx
+    */
+   @Override
+   public void commit(Object tx)
+   {
+      for (CacheLoader<K,V> l : writeCacheLoaders)
+      {
+         l.commit(tx);
+      }
+   }
+
+   /**
+    * Roll the transaction back. A DB-based CacheLoader would look up the local JDBC transaction asociated
+    * with <code>tx</code> and roll back that transaction
+    *
+    * @param tx
+    */
+   @Override
+   public void rollback(Object tx)
+   {
+      for (CacheLoader<K,V> l : writeCacheLoaders)
+      {
+         l.rollback(tx);
+      }
+   }
+
+
+   /**
+    * Creates individual cache loaders.
+    *
+    * @throws Exception
+    */
+   @Override
+   public void create()
+   {
+      Iterator<CacheLoader<K, V>> it = cacheLoaders.iterator();
+      Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgIt = cacheLoaderConfigs.iterator();
+      while (it.hasNext() && cfgIt.hasNext())
+      {
+         CacheLoader<K,V> cl = it.next();
+         CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgIt.next();
+         cl.setConfig(cfg);
+         registry.wireDependencies(cl);
+         cl.create();
+      }
+   }
+
+   @Override
+   public void start()
+   {
+      for (CacheLoader<K,V> cacheLoader : cacheLoaders)
+      {
+         cacheLoader.start();
+      }
+   }
+
+   @Override
+   public void stop()
+   {
+      for (CacheLoader<K,V> cacheLoader : cacheLoaders)
+      {
+         cacheLoader.stop();
+      }
+   }
+
+   @Override
+   public void destroy()
+   {
+      for (CacheLoader<K,V> cacheLoader : cacheLoaders)
+      {
+         cacheLoader.destroy();
+      }
+   }
+   @Override
+   public void loadEntireState(ObjectOutputStream os)
+   {
+      Iterator<CacheLoader<K, V>> i = cacheLoaders.iterator();
+      Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgs = cacheLoaderConfigs.iterator();
+      while (i.hasNext() && cfgs.hasNext())
+      {
+         CacheLoader<K,V> l = i.next();
+         CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgs.next();
+         if (cfg.isFetchPersistentState())
+         {
+            l.loadEntireState(os);
+            break;
+         }
+      }
+   }
+
+   @Override
+   public void storeEntireState(ObjectInputStream is)
+   {
+      Iterator<CacheLoader<K, V>> i = writeCacheLoaders.iterator();
+      Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgs = cacheLoaderConfigs.iterator();
+      while (i.hasNext())
+      {
+         CacheLoader<K,V> l = i.next();
+         CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgs.next();
+         if (cfg.isFetchPersistentState())
+         {
+            l.storeEntireState(is);
+            break;
+         }
+      }
+
+   }
+
+   /**
+    * Returns the number of cache loaders in the chain.
+    */
+   public int getSize()
+   {
+      return cacheLoaders.size();
+   }
+
+   /**
+    * Returns a List<CacheLoader> of individual cache loaders configured.
+    */
+   public List<CacheLoader<K,V>> getCacheLoaders()
+   {
+      return Collections.unmodifiableList(cacheLoaders);
+   }
+
+   /**
+    * Adds a cache loader to the chain (always added at the end of the chain)
+    *
+    * @param l   the cache loader to add
+    * @param cfg and its configuration
+    */
+   public void addCacheLoader(CacheLoader<K, V> l, CacheLoaderConfig.IndividualCacheLoaderConfig cfg)
+   {
+      synchronized (this)
+      {
+         cacheLoaderConfigs.add(cfg);
+         cacheLoaders.add(l);
+
+         if (!cfg.isIgnoreModifications())
+         {
+            writeCacheLoaders.add(l);
+         }
+      }
+   }
+
+   @Override
+   public String toString()
+   {
+      StringBuilder buf = new StringBuilder("ChainingCacheLoader{");
+      Iterator<CacheLoader<K, V>> i = cacheLoaders.iterator();
+      Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> c = cacheLoaderConfigs.iterator();
+      int count = 0;
+      while (i.hasNext() && c.hasNext())
+      {
+         CacheLoader<K,V> loader = i.next();
+         CacheLoaderConfig.IndividualCacheLoaderConfig cfg = c.next();
+
+         buf.append(++count);
+         buf.append(": IgnoreMods? ");
+         buf.append(cfg.isIgnoreModifications());
+         buf.append(" CLoader: ");
+         buf.append(loader);
+         buf.append("; ");
+      }
+      buf.append("}");
+      return buf.toString();
+   }
+
+   public void purgeIfNecessary() throws Exception
+   {
+      Iterator<CacheLoader<K, V>> loaders = cacheLoaders.iterator();
+      Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> configs = cacheLoaderConfigs.iterator();
+
+      while (loaders.hasNext() && configs.hasNext())
+      {
+         CacheLoader<K,V> myLoader = loaders.next();
+         CacheLoaderConfig.IndividualCacheLoaderConfig myConfig = configs.next();
+
+         if (!myConfig.isIgnoreModifications() && myConfig.isPurgeOnStartup()) myLoader.clear();
+      }
+   }
+
+   public void clear()
+   {
+      for (CacheLoader<K,V> l : writeCacheLoaders)
+         l.clear();
+   }
+
+   public List<EntryData<K, V>> getAllEntries()
+   {
+      ArrayList<EntryData<K,V>> full = new ArrayList<EntryData<K,V>>();
+
+      for (CacheLoader<K,V> l : writeCacheLoaders)
+         full.addAll(l.getAllEntries());
+
+      return full;
+   }
+
+}


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/loader/ChainingCacheLoader.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/loader/FileCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/FileCacheLoader.java	2008-11-20 04:05:09 UTC (rev 7177)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/loader/FileCacheLoader.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -202,7 +202,7 @@
       }
    }
 
-   public V put(Object key, V value)
+   public V put(K key, V value)
    {
       lock(key);
       try

Added: core/branches/flat/src/main/java/org/jboss/starobrno/loader/ReadOnlyDelegatingCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/ReadOnlyDelegatingCacheLoader.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/loader/ReadOnlyDelegatingCacheLoader.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import java.io.ObjectInputStream;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Provides ignoreModifications features to all cache loaders.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
+ * @since 2.1.0
+ */
+public class ReadOnlyDelegatingCacheLoader<K,V> extends AbstractDelegatingCacheLoader<K,V>
+{
+   private static final Log log = LogFactory.getLog(ReadOnlyDelegatingCacheLoader.class);
+
+   public ReadOnlyDelegatingCacheLoader(CacheLoader cl)
+   {
+      super(cl);
+   }
+
+   @Override
+   public V put(K key, V value)
+   {
+      log.trace("Not delegating write operation to underlying cache loader");
+      return get(key);
+   }
+
+   @Override
+   public void put(List<Modification> modifications)
+   {
+      log.trace("Not delegating write operation to underlying cache loader");
+   }
+
+   @Override
+   public V remove(Object key)
+   {
+      log.trace("Not delegating write operation to underlying cache loader");
+      return get(key);
+   }
+
+   @Override
+   public void prepare(Object tx, List<Modification> modifications, boolean one_phase)
+   {
+      log.trace("Not delegating write operation to underlying cache loader");
+   }
+
+   @Override
+   public void commit(Object tx)
+   {
+      log.trace("Not delegating write operation to underlying cache loader");
+   }
+
+   @Override
+   public void rollback(Object tx)
+   {
+      log.trace("Not delegating write operation to underlying cache loader");
+   }
+
+   @Override
+   public void storeEntireState(ObjectInputStream is)
+   {
+      log.trace("Not delegating write operation to underlying cache loader");
+   }
+
+   @Override
+   public void clear()
+   {
+      log.trace("Not delegating write operation to underlying cache loader");
+   }
+}


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/loader/ReadOnlyDelegatingCacheLoader.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/tree/api/NodeAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/tree/api/NodeAPITest.java	2008-11-20 04:05:09 UTC (rev 7177)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/tree/api/NodeAPITest.java	2008-11-20 04:06:04 UTC (rev 7178)
@@ -1,4 +1,4 @@
-package org.jboss.cache.api;
+package org.jboss.starobrno.tree.api;
 
 import org.jboss.cache.transaction.DummyTransactionManager;
 import org.jboss.cache.transaction.DummyTransactionManagerLookup;




More information about the jbosscache-commits mailing list