Author: jason.greene(a)jboss.com
Date: 2008-11-19 23:06:04 -0500 (Wed, 19 Nov 2008)
New Revision: 7178
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheLoaderInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheStoreInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractDelegatingCacheLoader.java
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoader.java
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoaderConfig.java
core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoaderManager.java
core/branches/flat/src/main/java/org/jboss/starobrno/loader/ChainingCacheLoader.java
core/branches/flat/src/main/java/org/jboss/starobrno/loader/ReadOnlyDelegatingCacheLoader.java
Removed:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/SkipCheckChainedInterceptor.java
Modified:
core/branches/flat/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java
core/branches/flat/src/main/java/org/jboss/starobrno/config/CacheLoaderConfig.java
core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractCacheLoader.java
core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java
core/branches/flat/src/main/java/org/jboss/starobrno/loader/FileCacheLoader.java
core/branches/flat/src/test/java/org/jboss/starobrno/tree/api/NodeAPITest.java
Log:
More cacheloader work
Modified: core/branches/flat/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java 2008-11-20
04:05:09 UTC (rev 7177)
+++
core/branches/flat/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -21,28 +21,28 @@
*/
package org.jboss.cache.loader;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.StringTokenizer;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheSPI_Legacy;
import org.jboss.cache.Fqn;
import org.jboss.cache.RegionManager;
-import org.jboss.starobrno.util.ReflectionUtil;
import org.jboss.starobrno.CacheException;
import org.jboss.starobrno.config.CacheLoaderConfig;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.config.ConfigurationException;
import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
import
org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig.SingletonStoreConfig;
-import org.jboss.starobrno.config.Configuration;
-import org.jboss.starobrno.config.ConfigurationException;
import org.jboss.starobrno.factories.ComponentRegistry;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.Start;
import org.jboss.starobrno.factories.annotations.Stop;
+import org.jboss.starobrno.util.ReflectionUtil;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.StringTokenizer;
-
/**
* Manages all cache loader functionality. This class is typically initialised with an
XML DOM Element,
* represeting a cache loader configuration, or a {@link
org.jboss.cache.config.CacheLoaderConfig} object.
@@ -225,7 +225,7 @@
private CacheLoader createCacheLoader(CacheLoaderConfig.IndividualCacheLoaderConfig
cfg, CacheSPI_Legacy cache) throws Exception
{
// create loader
- CacheLoader tmpLoader = cfg.getCacheLoader() == null ?
createInstance(cfg.getClassName()) : cfg.getCacheLoader();
+ CacheLoader tmpLoader = (CacheLoader)(cfg.getCacheLoader() == null ?
createInstance(cfg.getClassName()) : cfg.getCacheLoader());
if (tmpLoader != null)
{
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/config/CacheLoaderConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/config/CacheLoaderConfig.java 2008-11-20
04:05:09 UTC (rev 7177)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/config/CacheLoaderConfig.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -21,14 +21,14 @@
*/
package org.jboss.starobrno.config;
-import org.jboss.cache.loader.CacheLoader;
-import org.jboss.cache.loader.SingletonStoreCacheLoader;
-import org.jboss.starobrno.util.Util;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import org.jboss.cache.loader.SingletonStoreCacheLoader;
+import org.jboss.starobrno.loader.CacheLoader;
+import org.jboss.starobrno.util.Util;
+
/**
* Holds the configuration of the cache loader chain. ALL cache loaders should be
defined using this class, adding
* individual cache loaders to the chain by calling {@link
CacheLoaderConfig#addIndividualCacheLoaderConfig}
@@ -193,7 +193,7 @@
private boolean purgeOnStartup;
private SingletonStoreConfig singletonStoreConfig;
- private transient CacheLoader cacheLoader;
+ private transient CacheLoader<Object, Object> cacheLoader;
protected void populateFromBaseConfig(IndividualCacheLoaderConfig base)
{
@@ -271,7 +271,7 @@
* @return cache loader, if one exists
* @since 2.1.0
*/
- public CacheLoader getCacheLoader()
+ public CacheLoader<Object,Object> getCacheLoader()
{
return cacheLoader;
}
@@ -283,7 +283,7 @@
* @param cacheLoader cacheLoader to set
* @since 2.1.0
*/
- public void setCacheLoader(CacheLoader cacheLoader)
+ public void setCacheLoader(CacheLoader<Object,Object> cacheLoader)
{
this.cacheLoader = cacheLoader;
}
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java 2008-11-20
04:05:09 UTC (rev 7177)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -167,7 +167,7 @@
* @throws org.jboss.starobrno.lock.TimeoutException
* if we are unable to acquire the lock after a specified
timeout.
*/
- private boolean acquireLock(InvocationContext ctx, Object key) throws
InterruptedException, TimeoutException
+ public boolean acquireLock(InvocationContext ctx, Object key) throws
InterruptedException, TimeoutException
{
// don't EVER use lockManager.isLocked() since with lock striping it may be the
case that we hold the relevant
// lock which may be shared with another Fqn that we have a lock for already.
@@ -184,4 +184,9 @@
}
return false;
}
+
+ public void releaseLock(InvocationContext ctx, Object key)
+ {
+ lockManager.unlock(key, lockManager.getOwner(key));
+ }
}
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java 2008-11-20
04:05:09 UTC (rev 7177)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -124,22 +124,22 @@
}
// TODO: Uncomment once the CacheLoader has been moved to Starobrno
-// if (configuration.isUsingCacheLoaders())
-// {
+ if (configuration.isUsingCacheLoaders())
+ {
// if (configuration.getCacheLoaderConfig().isPassivation())
// {
//
interceptorChain.appendIntereceptor(createInterceptor(ActivationInterceptor.class));
// }
// else
-// {
-//
interceptorChain.appendIntereceptor(createInterceptor(CacheLoaderInterceptor.class));
-// }
-// }
+ {
+
interceptorChain.appendIntereceptor(createInterceptor(CacheLoaderInterceptor.class));
+ }
+ }
interceptorChain.appendIntereceptor(createInterceptor(LockingInterceptor.class));
// TODO: Uncomment once the CacheLoader has been moved to Starobrno
-// if (configuration.isUsingCacheLoaders())
-// {
+ if (configuration.isUsingCacheLoaders())
+ {
// if (configuration.getCacheLoaderConfig().isPassivation())
// {
//
@@ -147,13 +147,13 @@
//
// }
// else
-// {
+ {
+
+
interceptorChain.appendIntereceptor(createInterceptor(CacheStoreInterceptor.class));
+
+ }
+ }
//
-//
interceptorChain.appendIntereceptor(createInterceptor(CacheStoreInterceptor.class));
-//
-// }
-// }
-//
// if (configuration.isUsingBuddyReplication())
// {
//
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheLoaderInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheLoaderInterceptor.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheLoaderInterceptor.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -0,0 +1,228 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.starobrno.commands.read.GetKeyValueCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.container.DataContainer;
+import org.jboss.starobrno.container.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntryCreator;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.interceptors.base.JmxStatsCommandInterceptor;
+import org.jboss.starobrno.jmx.annotations.ManagedAttribute;
+import org.jboss.starobrno.jmx.annotations.ManagedOperation;
+import org.jboss.starobrno.loader.CacheLoader;
+import org.jboss.starobrno.loader.CacheLoaderManager;
+import org.jboss.starobrno.notifications.Notifier;
+
+/**
+ * Loads nodes that don't exist at the time of the call into memory from the
CacheLoader
+ *
+ * @author Bela Ban
+ * @version $Id$
+ */
+public class CacheLoaderInterceptor extends JmxStatsCommandInterceptor
+{
+ private long cacheLoads = 0;
+ private long cacheMisses = 0;
+ private CacheLoaderManager clm;
+
+ protected TransactionTable txTable = null;
+ protected CacheLoader<Object, Object> loader;
+ protected DataContainer<Object, Object> dataContainer;
+ protected Notifier notifier;
+ protected MVCCEntryCreator creator;
+
+ protected boolean isActivation = false;
+// protected boolean usingVersionedInvalidation = false;
+
+
+ /**
+ * True if CacheStoreInterceptor is in place.
+ * This allows us to skip loading keys for remove(Fqn, key) and put(Fqn, key).
+ * It also affects removal of node data and listing children.
+ */
+ protected boolean useCacheStore = true;
+
+ @Inject
+ protected void injectDependencies(TransactionTable txTable, CacheLoaderManager clm,
Configuration configuration,
+ DataContainer<Object, Object> dataContainer,
MVCCEntryCreator creator, Notifier notifier)
+ {
+ this.txTable = txTable;
+ this.clm = clm;
+// CacheMode mode = configuration.getCacheMode();
+// usingVersionedInvalidation = mode.isInvalidation();
+ this.dataContainer = dataContainer;
+ this.notifier = notifier;
+ this.creator = creator;
+ }
+
+ @Start
+ protected void startInterceptor()
+ {
+ loader = clm.getCacheLoader();
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
+ {
+ if (command.getKey() != null)
+ {
+ loadIfNeeded(ctx, command.getKey());
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+
+ @Override
+ public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand
command) throws Throwable
+ {
+ if (command.getKey() != null)
+ {
+ loadIfNeeded(ctx, command.getKey());
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws
Throwable
+ {
+ if (command.getKey() != null)
+ {
+ loadIfNeeded(ctx, command.getKey());
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
throws Throwable
+ {
+ if (command.getKey() != null)
+ {
+ loadIfNeeded(ctx, command.getKey());
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ private void loadIfNeeded(InvocationContext ctx, Object key) throws Throwable
+ {
+ if (dataContainer.containsKey(key) || !loader.exists(key))
+ return;
+
+ // Obtain a temporary lock to verify the key is not being concurrently added
+ boolean release = creator.acquireLock(ctx, key);
+ if (dataContainer.containsKey(key))
+ {
+ if (release)
+ creator.releaseLock(ctx, key);
+ return;
+ }
+
+ // Reuse the lock and create a new entry for loading
+ MVCCEntry n = creator.wrapEntryForWriting(ctx, key, false, true);
+ n = loadEntry(ctx, key, n);
+ }
+
+ /**
+ * Loads a node from disk; if it exists creates parent TreeNodes.
+ * If it doesn't exist on disk but in memory, clears the
+ * uninitialized flag, otherwise returns null.
+ */
+ private MVCCEntry loadEntry(InvocationContext ctx, Object key, MVCCEntry entry) throws
Exception
+ {
+ if (trace) log.trace("loading entry " + key + " entry is " +
entry);
+
+ Object value = loader.get(key);
+ boolean nodeExists = (value != null);
+ if (trace) log.trace("nodeExists " + nodeExists);
+
+ if (getStatisticsEnabled())
+ {
+ if (nodeExists)
+ {
+ cacheLoads++;
+ }
+ else
+ {
+ cacheMisses++;
+ }
+ }
+
+ if (value != null)
+ {
+ if (trace) log.trace("Entry is not null, loading");
+// notifier.notifyNodeLoaded(fqn, true, Collections.emptyMap(), ctx);
+// if (isActivation)
+// {
+// notifier.notifyNodeActivated(fqn, true, Collections.emptyMap(), ctx);
+// }
+
+ entry.setValue(value);
+ entry.setValid(true);
+
+// notifier.notifyNodeLoaded(fqn, false, nodeData, ctx);
+// if (isActivation)
+// {
+// notifier.notifyNodeActivated(fqn, false, nodeData, ctx);
+// }
+ }
+
+ return entry;
+ }
+
+ @ManagedAttribute(description = "number of cache loader node loads")
+ public long getCacheLoaderLoads()
+ {
+ return cacheLoads;
+ }
+
+ @ManagedAttribute(description = "number of cache loader node misses")
+ public long getCacheLoaderMisses()
+ {
+ return cacheMisses;
+ }
+
+ @ManagedOperation
+ public void resetStatistics()
+ {
+ cacheLoads = 0;
+ cacheMisses = 0;
+ }
+
+ @ManagedOperation
+ public Map<String, Object> dumpStatistics()
+ {
+ Map<String, Object> retval = new HashMap<String, Object>();
+ retval.put("CacheLoaderLoads", cacheLoads);
+ retval.put("CacheLoaderMisses", cacheMisses);
+ return retval;
+ }
+}
\ No newline at end of file
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheLoaderInterceptor.java
___________________________________________________________________
Name: svn:executable
+ *
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheStoreInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheStoreInterceptor.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheStoreInterceptor.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -0,0 +1,363 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.jmx.annotations.ManagedAttribute;
+import org.jboss.cache.jmx.annotations.ManagedOperation;
+import org.jboss.starobrno.commands.AbstractVisitor;
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.commands.tx.RollbackCommand;
+import org.jboss.starobrno.commands.write.ClearCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.PutMapCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.config.CacheLoaderConfig;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.interceptors.base.JmxStatsCommandInterceptor;
+import org.jboss.starobrno.loader.CacheLoader;
+import org.jboss.starobrno.loader.CacheLoaderManager;
+import org.jboss.starobrno.loader.Modification;
+import org.jboss.starobrno.loader.Modification.ModificationType;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+
+/**
+ * Writes modifications back to the store on the way out: stores modifications back
+ * through the CacheLoader, either after each method call (no TXs), or at TX commit.
+ *
+ * @author Bela Ban
+ * @version $Id$
+ */
+public class CacheStoreInterceptor extends JmxStatsCommandInterceptor
+{
+ private CacheLoaderConfig loaderConfig = null;
+ private TransactionManager txMgr = null;
+ private HashMap<GlobalTransaction, Integer> txStores = new
HashMap<GlobalTransaction, Integer>();
+ private Map<GlobalTransaction, Set<Object>> preparingTxs = new
ConcurrentHashMap<GlobalTransaction, Set<Object>>();
+ private long cacheStores = 0;
+ CacheLoader<Object, Object> loader;
+ private CacheLoaderManager loaderManager;
+ private boolean statsEnabled;
+
+ public CacheStoreInterceptor()
+ {
+ log = LogFactory.getLog(getClass());
+ trace = log.isTraceEnabled();
+ }
+
+ @Inject
+ protected void init(CacheLoaderManager loaderManager, TransactionManager txManager,
CacheLoaderConfig clConfig)
+ {
+ // never inject a CacheLoader at this stage - only a CacheLoaderManager, since the
CacheLoaderManager only creates a CacheLoader instance when it @Starts.
+ this.loaderManager = loaderManager;
+ this.loaderConfig = clConfig;
+ txMgr = txManager;
+ }
+
+ @Start
+ protected void start()
+ {
+ // this should only happen after the CacheLoaderManager has started, since the
CacheLoaderManager only creates the CacheLoader instance in its @Start method.
+ loader = loaderManager.getCacheLoader();
+ this.setStatisticsEnabled(configuration.getExposeManagementStatistics());
+ }
+
+ /**
+ * if this is a shared cache loader and the call is of remote origin, pass up the
chain
+ */
+ public final boolean skip(InvocationContext ctx, VisitableCommand command)
+ {
+ if ((!ctx.isOriginLocal() && loaderConfig.isShared()) ||
ctx.getOptionOverrides().isSuppressPersistence())
+ {
+ if (trace)
+ log.trace("Passing up method call and bypassing this interceptor since
the cache loader is shared and this call originated remotely.");
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
+ {
+ if (!skip(ctx, command) && inTransaction())
+ {
+ if (ctx.getTransactionContext().hasAnyModifications())
+ {
+ // this is a commit call.
+ GlobalTransaction gtx = command.getGlobalTransaction();
+ if (trace) log.trace("Calling loader.commit() for gtx " + gtx);
+ // sync call (a write) on the loader
+ // ignore modified FQNs
+ // List fqnsModified =
getFqnsFromModificationList(txTable.get(globalTransaction).getCacheLoaderModifications());
+ try
+ {
+ loader.commit(gtx);
+ }
+ catch (Throwable t)
+ {
+ preparingTxs.remove(gtx);
+ throw t;
+ }
+ if (getStatisticsEnabled())
+ {
+ Integer puts = (Integer) txStores.get(gtx);
+ if (puts != null)
+ {
+ cacheStores = cacheStores + puts;
+ }
+ txStores.remove(gtx);
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+ else
+ {
+ if (trace) log.trace("Commit called with no modifications;
ignoring.");
+ }
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command)
throws Throwable
+ {
+ if (!skip(ctx, command) && inTransaction())
+ {
+ if (trace) log.trace("transactional so don't put stuff in the cloader
yet.");
+ if (ctx.getTransactionContext().hasAnyModifications())
+ {
+ GlobalTransaction gtx = command.getGlobalTransaction();
+ // this is a rollback method
+ if (preparingTxs.containsKey(gtx))
+ {
+ preparingTxs.remove(gtx);
+ loader.rollback(gtx);
+ }
+ if (getStatisticsEnabled()) txStores.remove(gtx);
+ }
+ else
+ {
+ if (trace) log.trace("Rollback called with no modifications;
ignoring.");
+ }
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable
+ {
+ if (!skip(ctx, command) && inTransaction())
+ {
+ if (trace) log.trace("transactional so don't put stuff in the cloader
yet.");
+ prepareCacheLoader(command.getGlobalTransaction(), ctx.getTransactionContext(),
command.isOnePhaseCommit());
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws
Throwable
+ {
+ if (!skip(ctx, command) && !inTransaction())
+ {
+ Object returnValue = loader.remove(command.getKey());
+ invokeNextInterceptor(ctx, command);
+ return returnValue;
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
+ {
+ Object returnValue = invokeNextInterceptor(ctx, command);
+ if (skip(ctx, command) || inTransaction())
+ return returnValue;
+
+ returnValue = loader.put(command.getKey(), command.getValue());
+ if (getStatisticsEnabled()) cacheStores++;
+
+ return returnValue;
+ }
+
+ @Override
+ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws
Throwable
+ {
+ Object returnValue = invokeNextInterceptor(ctx, command);
+ if (skip(ctx, command) || inTransaction())
+ return returnValue;
+
+ // Perhaps this should be optimized
+ Map<Object, Object> map = command.getMap();
+ List<Modification> modifications = toModifications(map);
+ loader.put(modifications);
+
+ if (getStatisticsEnabled()) cacheStores++;
+
+ return returnValue;
+ }
+
+ private static List<Modification> toModifications(Map<Object, Object>
map)
+ {
+ List<Modification> modifications = new
ArrayList<Modification>(map.size());
+ for (Map.Entry<Object, Object> entry : map.entrySet())
+ modifications.add(new Modification(ModificationType.PUT, entry.getKey(),
entry.getValue()));
+ return modifications;
+ }
+
+ private boolean inTransaction() throws SystemException
+ {
+ return txMgr != null && txMgr.getTransaction() != null;
+ }
+
+ private void prepareCacheLoader(GlobalTransaction gtx, TransactionContext
transactionContext, boolean onePhase) throws Throwable
+ {
+ if (transactionContext == null)
+ {
+ throw new Exception("transactionContext for transaction " + gtx +
" not found in transaction table");
+ }
+ List<VisitableCommand> modifications =
transactionContext.getModifications();
+ if (modifications.size() == 0)
+ {
+ if (trace) log.trace("Transaction has not logged any
modifications!");
+ return;
+ }
+ if (trace) log.trace("Cache loader modification list: " +
modifications);
+ StoreModificationsBuilder modsBuilder = new
StoreModificationsBuilder(getStatisticsEnabled());
+ for (VisitableCommand cacheCommand : modifications)
+ {
+ cacheCommand.acceptVisitor(null, modsBuilder);
+ }
+ if (trace)
+ {
+ log.trace("Converted method calls to cache loader modifications. List
size: " + modsBuilder.modifications.size());
+ }
+ if (modsBuilder.modifications.size() > 0)
+ {
+ loader.prepare(gtx, modsBuilder.modifications, onePhase);
+
+ preparingTxs.put(gtx, modsBuilder.affectedKeys);
+ if (getStatisticsEnabled() && modsBuilder.putCount > 0)
+ {
+ txStores.put(gtx, modsBuilder.putCount);
+ }
+ }
+ }
+
+ public static class StoreModificationsBuilder extends AbstractVisitor
+ {
+
+ boolean generateStatistics;
+
+ int putCount;
+
+ Set<Object> affectedKeys = new HashSet<Object>();
+
+ List<Modification> modifications = new ArrayList<Modification>();
+
+ public StoreModificationsBuilder(boolean generateStatistics)
+ {
+ this.generateStatistics = generateStatistics;
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
+ {
+ if (generateStatistics) putCount++;
+ modifications.add(new Modification(Modification.ModificationType.PUT,
command.getKey(), command.getValue()));
+ affectedKeys.add(command.getKey());
+ return null;
+ }
+
+ @Override
+ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command)
throws Throwable
+ {
+ Map<Object, Object> map = command.getMap();
+ if (generateStatistics) putCount += map.size();
+ affectedKeys.addAll(map.keySet());
+ modifications.addAll(toModifications(map));
+ return null;
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command)
throws Throwable
+ {
+ modifications.add(new Modification(Modification.ModificationType.REMOVE,
command.getKey(), null));
+ affectedKeys.add(command.getKey());
+ return null;
+ }
+
+ @Override
+ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws
Throwable
+ {
+ modifications.add(new Modification(Modification.ModificationType.CLEAR, null,
null));
+ return null;
+ }
+ }
+
+ @ManagedOperation
+ public void resetStatistics()
+ {
+ cacheStores = 0;
+ }
+
+ @ManagedOperation
+ public Map<String, Object> dumpStatistics()
+ {
+ Map<String, Object> retval = new HashMap<String, Object>();
+ retval.put("CacheLoaderStores", cacheStores);
+ return retval;
+ }
+
+ @ManagedAttribute
+ public boolean getStatisticsEnabled()
+ {
+ return statsEnabled;
+ }
+
+ @ManagedAttribute
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ this.statsEnabled = enabled;
+ }
+
+ @ManagedAttribute(description = "number of cache loader stores")
+ public long getCacheLoaderStores()
+ {
+ return cacheStores;
+ }
+
+}
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/CacheStoreInterceptor.java
___________________________________________________________________
Name: svn:executable
+ *
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/SkipCheckChainedInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/SkipCheckChainedInterceptor.java 2008-11-20
04:05:09 UTC (rev 7177)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/SkipCheckChainedInterceptor.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -1,77 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.starobrno.interceptors.base;
-
-import org.jboss.starobrno.commands.VisitableCommand;
-import org.jboss.starobrno.context.InvocationContext;
-
-/**
- * This interceptor will call {@link #skipInterception(org.jboss.cache.InvocationContext
,org.jboss.cache.commands.VisitableCommand)} before invoking each visit method
- * (and the {@link #handleDefault(org.jboss.cache.InvocationContext ,
org.jboss.cache.commands.VisitableCommand)} method). If
- * {@link #skipInterception(org.jboss.cache.InvocationContext
,org.jboss.cache.commands.VisitableCommand)} returns <tt>false</tt>, the
invocation will be skipped
- * and passed up the interceptor chain instead.
- * <p/>
- * Instead of overriding visitXXX() methods, implementations should override their
handleXXX() counterparts defined in this class
- * instead, as well as the {@link #skipInterception(org.jboss.cache.InvocationContext
,org.jboss.cache.commands.VisitableCommand)} method.
- * Also, instead of overriding {@link #handleDefault(org.jboss.cache.InvocationContext ,
org.jboss.cache.commands.VisitableCommand)}, implementors
- * should override {@link #handleAll(org.jboss.cache.InvocationContext ,
org.jboss.cache.commands.VisitableCommand)}.
- *
- * @author Mircea.Markus(a)jboss.com
- * @since 2.2
- */
-public abstract class SkipCheckChainedInterceptor extends CommandInterceptor
-{
-
- // TODO!!!
-
- @Override
- public final Object handleDefault(InvocationContext ctx, VisitableCommand command)
throws Throwable
- {
- if (skipInterception(ctx, command))
- {
- return invokeNextInterceptor(ctx, command);
- }
- return handleAll(ctx, command);
- }
-
- /**
- * Default implementation, which just passes the call up the interceptor chain
- *
- * @param ctx invocation context
- * @param command command
- * @return return value
- * @throws Throwable in the event of problems
- */
- protected Object handleAll(InvocationContext ctx, VisitableCommand command) throws
Throwable
- {
- return invokeNextInterceptor(ctx, command);
- }
-
- /**
- * Tests whether the command should be intercepted or not. This is invoked before any
of the handleXXX() methods.
- *
- * @param ctx invocation context
- * @param command command
- * @return true if the invocation should skip the current interceptor and move on to
the next in the chain, false otherwise.
- */
- protected abstract boolean skipInterception(InvocationContext ctx, VisitableCommand
command);
-}
\ No newline at end of file
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractCacheLoader.java 2008-11-20
04:05:09 UTC (rev 7177)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractCacheLoader.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -124,7 +124,7 @@
switch (m.getType())
{
case PUT:
- put(m.getKey(), (V)m.getValue());
+ put((K)m.getKey(), (V)m.getValue());
break;
case REMOVE:
remove(m.getKey());
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractDelegatingCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractDelegatingCacheLoader.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractDelegatingCacheLoader.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -0,0 +1,155 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.jboss.starobrno.marshall.EntryData;
+
+/**
+ * AbstractDelegatingCacheLoader provides standard functionality for a cache loader that
simply delegates each
+ * operation defined in the cache loader interface to the underlying cache loader,
basically acting as a proxy to the
+ * real cache loader.
+ * <p/>
+ * Any cache loader implementation that extends this class would be required to override
any of the methods in
+ * order to provide a different or added behaviour.
+ *
+ * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
+ */
+public abstract class AbstractDelegatingCacheLoader<K,V> extends
AbstractCacheLoader<K,V>
+{
+ private CacheLoader<K,V> cacheLoader;
+
+ public AbstractDelegatingCacheLoader(CacheLoader<K,V> cacheLoader)
+ {
+ this.cacheLoader = cacheLoader;
+ }
+
+ public void clear()
+ {
+ cacheLoader.clear();
+ }
+
+ public void commit(Object tx)
+ {
+ cacheLoader.commit(tx);
+ }
+
+ public void create()
+ {
+ cacheLoader.create();
+ }
+
+ public void destroy()
+ {
+ cacheLoader.destroy();
+ }
+
+ public boolean exists(Object key)
+ {
+ return cacheLoader.exists(key);
+ }
+
+ public V get(Object key)
+ {
+ return cacheLoader.get(key);
+ }
+
+ public List<EntryData<K, V>> getAllEntries()
+ {
+ return cacheLoader.getAllEntries();
+ }
+
+ public IndividualCacheLoaderConfig getConfig()
+ {
+ return cacheLoader.getConfig();
+ }
+
+ public void loadEntireState(ObjectOutputStream os)
+ {
+ cacheLoader.loadEntireState(os);
+ }
+
+ public void prepare(Object tx, List<Modification> modifications, boolean
one_phase)
+ {
+ cacheLoader.prepare(tx, modifications, one_phase);
+ }
+
+ public V put(K key, V value)
+ {
+ return cacheLoader.put(key, value);
+ }
+
+ public void put(List<Modification> modifications)
+ {
+ cacheLoader.put(modifications);
+ }
+
+ public V remove(Object key)
+ {
+ return cacheLoader.remove(key);
+ }
+
+ public void rollback(Object tx)
+ {
+ cacheLoader.rollback(tx);
+ }
+
+ public void setCache(CacheSPI<K, V> c)
+ {
+ cacheLoader.setCache(c);
+ }
+
+ public void setConfig(IndividualCacheLoaderConfig config)
+ {
+ cacheLoader.setConfig(config);
+ }
+
+ public void start()
+ {
+ cacheLoader.start();
+ }
+
+ public void stop()
+ {
+ cacheLoader.stop();
+ }
+
+ public void storeEntireState(ObjectInputStream is)
+ {
+ cacheLoader.storeEntireState(is);
+ }
+
+ public CacheLoader<K,V> getCacheLoader()
+ {
+ return cacheLoader;
+ }
+
+ public void setCacheLoader(CacheLoader<K,V> cacheLoader)
+ {
+ this.cacheLoader = cacheLoader;
+ }
+}
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AbstractDelegatingCacheLoader.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoader.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoader.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -0,0 +1,341 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.util.Immutables;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+
+/**
+ * The AsyncCacheLoader is a delegating cache loader that extends
+ * AbstractDelegatingCacheLoader overriding methods to that should not
+ * just delegate the operation to the underlying cache loader.
+ * <p/>
+ * Read operations are done synchronously, while write (CRUD - Create, Remove,
+ * Update, Delete) operations are done asynchronously. There is no provision
+ * for exception handling at the moment for problems encountered with the
+ * underlying CacheLoader during a CRUD operation, and the exception is just
+ * logged.
+ * <p/>
+ * When configuring the CacheLoader, use the following attribute:
+ * <p/>
+ * <code>
+ * <attribute
name="CacheLoaderAsynchronous">true</attribute>
+ * </code>
+ * <p/>
+ * to define whether cache loader operations are to be asynchronous. If not
+ * specified, a cache loader operation is assumed synchronous.
+ * <p/>
+ * <p/>
+ * The following additional parameters are available:
+ * <dl>
+ * <dt>cache.async.batchSize</dt>
+ * <dd>Number of modifications to commit in one transaction, default is
+ * 100. The minimum batch size is 1.</dd>
+ * <dt>cache.async.pollWait</dt>
+ * <dd>How long to wait before processing an incomplete batch, in
+ * milliseconds. Default is 100. Set this to 0 to not wait before processing
+ * available records.</dd>
+ * <dt>cache.async.returnOld</dt>
+ * <dd>If <code>true</code>, this loader returns the old values from
{@link
+ * #put} and {@link #remove} methods. Otherwise, these methods always return
+ * null. Default is true. <code>false</code> improves the performance of
these
+ * operations.</dd>
+ * <dt>cache.async.queueSize</dt>
+ * <dd>Maximum number of entries to enqueue for asynchronous processing.
+ * Lowering this size may help prevent out-of-memory conditions. It also may
+ * help to prevent less records lost in the case of JVM failure. Default is
+ * 10,000 operations.</dd>
+ * <dt>cache.async.put</dt>
+ * <dd>If set to false, all {@link #put} operations will be processed
+ * synchronously, and then only the {@link #remove} operations will be
+ * processed asynchronously. This mode may be useful for processing
+ * expiration of messages within a separate thread and keeping other
+ * operations synchronous for reliability.
+ * </dd>
+ * <dt>cache.async.threadPoolSize</dt>
+ * <dd>The size of the async processor thread pool. Defaults to
<tt>1</tt>. This
+ * property is new in JBoss Cache 3.0.</dd>
+ * </dl>
+ * For increased performance for many smaller transactions, use higher values
+ * for <code>cache.async.batchSize</code> and
+ * <code>cache.async.pollWait</code>. For larger sized records, use a
smaller
+ * value for <code>cache.async.queueSize</code>.
+ *
+ * @author Manik Surtani (manik.surtani(a)jboss.com)
+ */
+public class AsyncCacheLoader<K,V> extends
AbstractDelegatingCacheLoader<K,V>
+{
+
+ private static final Log log = LogFactory.getLog(AsyncCacheLoader.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ private static AtomicInteger threadId = new AtomicInteger(0);
+
+ /**
+ * Default limit on entries to process asynchronously.
+ */
+ private static final int DEFAULT_QUEUE_SIZE = 10000;
+
+ private AsyncCacheLoaderConfig config;
+ private ExecutorService executor;
+ private AtomicBoolean stopped = new AtomicBoolean(true);
+ private BlockingQueue<Modification> queue = new
ArrayBlockingQueue<Modification>(DEFAULT_QUEUE_SIZE);
+ private List<Future> processorFutures;
+
+ public AsyncCacheLoader()
+ {
+ super(null);
+ }
+
+ public AsyncCacheLoader(CacheLoader<K,V> cacheLoader)
+ {
+ super(cacheLoader);
+ }
+
+ @Override
+ public void setConfig(IndividualCacheLoaderConfig base)
+ {
+ if (base instanceof AsyncCacheLoaderConfig)
+ {
+ config = (AsyncCacheLoaderConfig) base;
+ }
+ else
+ {
+ config = new AsyncCacheLoaderConfig(base);
+ }
+
+ if (config.getQueueSize() > 0)
+ {
+ queue = new ArrayBlockingQueue<Modification>(config.getQueueSize());
+ }
+
+ super.setConfig(base);
+ }
+
+ @Override
+ public V put(K key, V value)
+ {
+ if (config.getUseAsyncPut())
+ {
+ V oldValue = get(key);
+ Modification mod = new Modification(Modification.ModificationType.PUT, key,
value);
+ enqueue(mod);
+ return oldValue;
+ }
+ else
+ {
+ return super.put(key, value);
+ }
+ }
+
+ @Override
+ public void put(List<Modification> modifications)
+ {
+ if (config.getUseAsyncPut())
+ {
+ for (Modification modification : modifications)
+ {
+ enqueue(modification);
+ }
+ }
+ else
+ {
+ super.put(modifications);
+ }
+ }
+
+ @Override
+ public V remove(Object key)
+ {
+ V oldValue = get(key);
+ Modification mod = new Modification(Modification.ModificationType.REMOVE, key,
null);
+ enqueue(mod);
+ return oldValue;
+ }
+
+ @Override
+ public void start()
+ {
+ if (log.isInfoEnabled()) log.info("Async cache loader starting: " +
this);
+ stopped.set(false);
+ super.start();
+ executor = Executors.newFixedThreadPool(config.getThreadPoolSize(), new
ThreadFactory()
+ {
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(r, "AsyncCacheLoader-" +
threadId.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ processorFutures = new ArrayList<Future>(config.getThreadPoolSize());
+ for (int i = 0; i < config.getThreadPoolSize(); i++)
processorFutures.add(executor.submit(new AsyncProcessor()));
+ }
+
+ @Override
+ public void stop()
+ {
+ stopped.set(true);
+ if (executor != null)
+ {
+ for (Future f : processorFutures) f.cancel(true);
+ executor.shutdown();
+ try
+ {
+ boolean terminated = executor.isTerminated();
+ while (!terminated)
+ {
+ terminated = executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ executor = null;
+ super.stop();
+ }
+
+ private void enqueue(final Modification mod)
+ {
+ if (stopped.get())
+ {
+ throw new CacheException("AsyncCacheLoader stopped; no longer accepting
more entries.");
+ }
+ if (trace) log.trace("Enqueuing modification " + mod);
+ try
+ {
+ queue.put(mod);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Processes (by batch if possible) a queue of {@link Modification}s.
+ *
+ * @author manik surtani
+ */
+ private class AsyncProcessor implements Runnable
+ {
+ // Modifications to invoke as a single put
+ private final List<Modification> mods = new
ArrayList<Modification>(config.getBatchSize());
+
+ public void run()
+ {
+ while (!Thread.interrupted())
+ {
+ try
+ {
+ run0();
+ }
+ catch (InterruptedException e)
+ {
+ break;
+ }
+ }
+
+ try
+ {
+ if (trace) log.trace("process remaining batch " + mods.size());
+ put(mods);
+ if (trace) log.trace("process remaining queued " + queue.size());
+ while (!queue.isEmpty())
+ {
+ run0();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.trace("remaining interrupted");
+ }
+ }
+
+ private void run0() throws InterruptedException
+ {
+ log.trace("Checking for modifications");
+ int i = queue.drainTo(mods, config.getBatchSize());
+ if (i == 0)
+ {
+ Modification m = queue.take();
+ mods.add(m);
+ }
+
+ if (trace)
+ {
+ log.trace("Calling put(List) with " + mods.size() + "
modifications");
+ }
+ put(mods);
+ mods.clear();
+ }
+
+ private void put(List<Modification> mods)
+ {
+ try
+ {
+ AsyncCacheLoader.super.put(mods);
+ }
+ catch (Exception e)
+ {
+ if (log.isWarnEnabled()) log.warn("Failed to process async
modifications: " + e);
+ if (log.isDebugEnabled()) log.debug("Exception: ", e);
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return super.toString() +
+ " delegate=[" + super.getCacheLoader() + "]" +
+ " stopped=" + stopped +
+ " batchSize=" + config.getBatchSize() +
+ " returnOld=" + config.getReturnOld() +
+ " asyncPut=" + config.getUseAsyncPut() +
+ " threadPoolSize=" + config.getThreadPoolSize() +
+ " queue.remainingCapacity()=" + queue.remainingCapacity() +
+ " queue.peek()=" + queue.peek();
+ }
+
+}
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoader.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoaderConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoaderConfig.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoaderConfig.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -0,0 +1,174 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+
+import java.util.Properties;
+
+public class AsyncCacheLoaderConfig extends IndividualCacheLoaderConfig
+{
+ /**
+ * The serialVersionUID
+ */
+ private static final long serialVersionUID = 5038037589485991681L;
+
+ private int batchSize = 100;
+ private boolean returnOld = true;
+ private int queueSize = 0;
+ private boolean useAsyncPut = true;
+ private int threadPoolSize = 1;
+
+ /**
+ * Default constructor.
+ */
+ public AsyncCacheLoaderConfig()
+ {
+ setClassName(AsyncCacheLoader.class.getName());
+ }
+
+ /**
+ * For use by {@link AsyncCacheLoader}.
+ *
+ * @param base generic config object created by XML parsing.
+ */
+ AsyncCacheLoaderConfig(IndividualCacheLoaderConfig base)
+ {
+ setClassName(AsyncCacheLoader.class.getName());
+ populateFromBaseConfig(base);
+ }
+
+ public int getThreadPoolSize()
+ {
+ return threadPoolSize;
+ }
+
+ public void setThreadPoolSize(int threadPoolSize)
+ {
+ testImmutability("threadPoolSize");
+ this.threadPoolSize = threadPoolSize;
+ }
+
+ public int getBatchSize()
+ {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize)
+ {
+ testImmutability("batchSize");
+ this.batchSize = batchSize;
+ }
+
+ public int getQueueSize()
+ {
+ return queueSize;
+ }
+
+ public void setQueueSize(int queueSize)
+ {
+ testImmutability("queueSize");
+ this.queueSize = queueSize;
+ }
+
+ public boolean getReturnOld()
+ {
+ return returnOld;
+ }
+
+ public void setReturnOld(boolean returnOld)
+ {
+ testImmutability("returnOld");
+ this.returnOld = returnOld;
+ }
+
+ public boolean getUseAsyncPut()
+ {
+ return useAsyncPut;
+ }
+
+ public void setUseAsyncPut(boolean useAsyncPut)
+ {
+ testImmutability("useAsyncPut");
+ this.useAsyncPut = useAsyncPut;
+ }
+
+ @Override
+ public void setProperties(Properties props)
+ {
+ super.setProperties(props);
+ String s;
+
+ s = props.getProperty("cache.async.batchSize");
+ if (s != null) batchSize = Integer.parseInt(s);
+ if (batchSize <= 0) throw new IllegalArgumentException("Invalid batch size:
" + batchSize);
+
+ s = props.getProperty("cache.async.threadPoolSize");
+ if (s != null) threadPoolSize = Integer.parseInt(s);
+ if (threadPoolSize <= 0) throw new IllegalArgumentException("Invalid thread
pool size: " + threadPoolSize);
+
+
+ s = props.getProperty("cache.async.returnOld");
+ if (s != null) returnOld = Boolean.valueOf(s);
+
+ s = props.getProperty("cache.async.queueSize");
+ if (s != null) queueSize = Integer.parseInt(s);
+
+ s = props.getProperty("cache.async.put");
+ if (s != null) useAsyncPut = Boolean.valueOf(s);
+
+
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof AsyncCacheLoaderConfig &&
equalsExcludingProperties(obj))
+ {
+ AsyncCacheLoaderConfig other = (AsyncCacheLoaderConfig) obj;
+ return (batchSize == other.batchSize)
+ && (queueSize == other.queueSize)
+ && (returnOld == other.returnOld)
+ && (useAsyncPut == other.useAsyncPut);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = hashCodeExcludingProperties();
+ result = 31 * result + batchSize;
+ result = 31 * result + queueSize;
+ result = 31 * result + (returnOld ? 0 : 1);
+ result = 31 * result + (useAsyncPut ? 0 : 1);
+ return result;
+ }
+
+ @Override
+ public AsyncCacheLoaderConfig clone() throws CloneNotSupportedException
+ {
+ return (AsyncCacheLoaderConfig) super.clone();
+ }
+
+
+}
\ No newline at end of file
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/AsyncCacheLoaderConfig.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java 2008-11-20
04:05:09 UTC (rev 7177)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoader.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -115,7 +115,7 @@
* node does not exist, all parent nodes from the root down are created
* automatically. Returns the old value.
*/
- V put(Object key, V value) ;
+ V put(K key, V value) ;
/**
* Removes everything from this cache-loader
@@ -139,6 +139,14 @@
/**
+ * Applies all modifications to the backend store.
+ * Changes may be applied in a single operation.
+ *
+ * @param modifications A List<Modification> of modifications
+ */
+ void put(List<Modification> modifications);
+
+ /**
* Prepares a list of modifications. For example, for a DB-based CacheLoader:
* <ol>
* <li>Create a local (JDBC) transaction
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoaderManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoaderManager.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoaderManager.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -0,0 +1,447 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.config.CacheLoaderConfig;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.config.ConfigurationException;
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import
org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig.SingletonStoreConfig;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.factories.annotations.Stop;
+import org.jboss.starobrno.util.ReflectionUtil;
+
+/**
+ * Manages all cache loader functionality. This class is typically initialised with an
XML DOM Element,
+ * represeting a cache loader configuration, or a {@link
org.jboss.cache.config.CacheLoaderConfig} object.
+ * <p/>
+ * Usage:
+ * <p/>
+ * <code>
+ * CacheLoaderManager manager = new CacheLoaderManager();
+ * manager.setConfig(myXmlSnippet, myTreeCache);
+ * CacheLoader loader = manager.getCacheLoader();
+ * </code>
+ * <p/>
+ * The XML configuration passed in would typically look like:
+ * <p/>
+ * <code><![CDATA[
+ * <p/>
+ * <config>
+ * <passivation>false</passivation>
+ * <preload>/</preload>
+ * <p/>
+ * <cacheloader>
+ * <class>org.jboss.cache.loader.FileCacheLoader</class>
+ * <async>true</async>
+ * <fetchPersistentState>false</fetchPersistentState>
+ * <ignoreModifications>false</ignoreModifications>
+ * <properties>
+ * location=/tmp/file
+ * </properties>
+ * </cacheloader>
+ * </config>
+ * ]]>
+ * </code>
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
+ * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
+ */
+public class CacheLoaderManager
+{
+ private static final Log log = LogFactory.getLog(CacheLoaderManager.class);
+ private CacheLoaderConfig config;
+ private CacheSPI<Object,Object> cache;
+ private CacheLoader<Object,Object> loader;
+ private boolean fetchPersistentState;
+ private Configuration configuration;
+ private ComponentRegistry registry;
+
+ @Inject
+ public void injectDependencies(CacheSPI<Object, Object> cache, Configuration
configuration, ComponentRegistry registry)
+ {
+ // TODO: Inject CacheSPI once we have the cache loaders not relying on a tree
structure
+ this.config = configuration.getCacheLoaderConfig();
+// this.cache = cache;
+ this.configuration = configuration;
+ this.registry = registry;
+
+ if (config != null)
+ {
+ try
+ {
+ loader = createCacheLoader();
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to create cache loaders", e);
+ }
+ }
+ }
+
+ /**
+ * Sets a configuration object and creates a cacheloader accordingly. Primarily used
for testing.
+ *
+ * @param config
+ * @param cache
+ * @throws CacheException
+ */
+ public void setConfig(CacheLoaderConfig config, CacheSPI<Object,Object> cache,
Configuration configuration) throws CacheException
+ {
+ this.config = config == null ? configuration.getCacheLoaderConfig() : config;
+ this.cache = cache;
+ this.configuration = configuration;
+
+ if (config != null)
+ {
+ try
+ {
+ loader = createCacheLoader();
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to create cache loaders", e);
+ }
+ }
+ }
+
+ /**
+ * Creates the cache loader based on a cache loader config passed in.
+ *
+ * @return a configured cacheloader
+ * @throws IllegalAccessException
+ * @throws InstantiationException
+ * @throws ClassNotFoundException
+ */
+ private CacheLoader<Object, Object> createCacheLoader() throws Exception
+ {
+ CacheLoader<Object,Object> tmpLoader;
+ // if we only have a single cache loader configured in the chaining cacheloader
then
+ // don't use a chaining cache loader at all.
+
+ ArrayList<IndividualCacheLoaderConfig> finalConfigs =
+ new ArrayList<IndividualCacheLoaderConfig>();
+
+ // also if we are using passivation then just directly use the first cache loader.
+ if (config.useChainingCacheLoader())
+ {
+ // create chaining cache loader.
+ ChainingCacheLoader<Object,Object> ccl = new
ChainingCacheLoader<Object,Object>();
+ tmpLoader = ccl;
+ Iterator<IndividualCacheLoaderConfig> it =
config.getIndividualCacheLoaderConfigs().iterator();
+
+ // only one cache loader may have fetchPersistentState to true.
+ int numLoadersWithFetchPersistentState = 0;
+ while (it.hasNext())
+ {
+ CacheLoaderConfig.IndividualCacheLoaderConfig cfg =
(CacheLoaderConfig.IndividualCacheLoaderConfig) it.next();
+ if (cfg.isFetchPersistentState())
+ {
+ numLoadersWithFetchPersistentState++;
+ fetchPersistentState = true;
+ }
+ if (numLoadersWithFetchPersistentState > 1)
+ {
+ throw new Exception("Invalid cache loader configuration!! Only ONE
cache loader may have fetchPersistentState set to true. Cache will not start!");
+ }
+
+ assertNotSingletonAndShared(cfg);
+
+ CacheLoader<Object,Object> l = createCacheLoader(cfg, cache);
+ cfg = l.getConfig();
+ finalConfigs.add(cfg);
+ // Only loaders that deal w/ state transfer factor into
+ // whether the overall chain supports ExtendedCacheLoader
+ ccl.addCacheLoader(l, cfg);
+
+ }
+ }
+ else
+ {
+ CacheLoaderConfig.IndividualCacheLoaderConfig cfg =
config.getIndividualCacheLoaderConfigs().get(0);
+ tmpLoader = createCacheLoader(cfg, cache);
+ finalConfigs.add(tmpLoader.getConfig() == null ? cfg : tmpLoader.getConfig());
+ fetchPersistentState = cfg.isFetchPersistentState();
+ assertNotSingletonAndShared(cfg);
+ }
+
+ // Update the config with those actually used by the loaders
+ ReflectionUtil.setValue(config, "accessible", true);
+ config.setIndividualCacheLoaderConfigs(finalConfigs);
+
+ return tmpLoader;
+ }
+
+ private void assertNotSingletonAndShared(IndividualCacheLoaderConfig cfg)
+ {
+ SingletonStoreConfig ssc = cfg.getSingletonStoreConfig();
+ if (ssc != null && ssc.isSingletonStoreEnabled() &&
config.isShared())
+ throw new ConfigurationException("Invalid cache loader configuration!! If
a cache loader is configured as a singleton, the cache loader cannot be shared in a
cluster!");
+ }
+
+ /**
+ * Creates the cache loader based on the configuration.
+ *
+ * @param cfg
+ * @param cache
+ * @return a cache loader
+ * @throws Exception
+ */
+ @SuppressWarnings("deprecation")
+ private CacheLoader<Object, Object>
createCacheLoader(CacheLoaderConfig.IndividualCacheLoaderConfig cfg, CacheSPI<Object,
Object> cache) throws Exception
+ {
+ // create loader
+ CacheLoader<Object, Object> tmpLoader = cfg.getCacheLoader() == null ?
createInstance(cfg.getClassName()) : cfg.getCacheLoader();
+
+ if (tmpLoader != null)
+ {
+ // async?
+ if (cfg.isAsync())
+ {
+ CacheLoader<Object, Object> asyncDecorator;
+ asyncDecorator = new AsyncCacheLoader<Object, Object>(tmpLoader);
+ tmpLoader = asyncDecorator;
+ }
+
+ if (cfg.isIgnoreModifications())
+ {
+ AbstractDelegatingCacheLoader<Object, Object> readOnlyDecorator;
+ readOnlyDecorator = new ReadOnlyDelegatingCacheLoader<Object,
Object>(tmpLoader);
+ tmpLoader = readOnlyDecorator;
+ }
+
+ // singleton?
+ SingletonStoreConfig ssc = cfg.getSingletonStoreConfig();
+ if (ssc != null && ssc.isSingletonStoreEnabled())
+ {
+ Object decorator = createInstance(ssc.getSingletonStoreClass());
+
+ /* class providing singleton store functionality must extend
AbstractDelegatingCacheLoader so that
+ * underlying cacheloader can be set. */
+ if (decorator instanceof AbstractDelegatingCacheLoader)
+ {
+ @SuppressWarnings("unchecked")
+ AbstractDelegatingCacheLoader<Object, Object> singletonDecorator =
(AbstractDelegatingCacheLoader<Object, Object>) decorator;
+ /* set the cache loader to where calls will be delegated by the class
providing the singleton
+ * store functionality. */
+ singletonDecorator.setCacheLoader(tmpLoader);
+ tmpLoader = singletonDecorator;
+ }
+ else
+ {
+ throw new Exception("Invalid cache loader configuration!! Singleton
store implementation class must extend
org.jboss.cache.loader.AbstractDelegatingCacheLoader");
+ }
+ }
+
+ // load props
+ tmpLoader.setConfig(cfg);
+
+ setCacheInLoader(cache, tmpLoader);
+ // we should not be creating/starting the cache loader here - this should be
done in the separate
+ // startCacheLoader() method.
+ // tmpLoader.create();
+ // tmpLoader.start();
+ }
+ return tmpLoader;
+ }
+
+ /**
+ * Sets the cache instance associated with the given cache loader. This method was
created for testing purpouses
+ * so that it can be overriden in the mock version of the CacheLoaderManager.
+ *
+ * @param c instance of cache to be set in cache loader
+ * @param loader cache loader to which assign the cache instance
+ */
+ protected void setCacheInLoader(CacheSPI<Object,Object> c,
CacheLoader<Object,Object> loader)
+ {
+ loader.setCache(c);
+ }
+
+ @SuppressWarnings("unchecked")
+ private CacheLoader<Object, Object> createInstance(String className) throws
ClassNotFoundException, IllegalAccessException, InstantiationException
+ {
+ if (log.isTraceEnabled()) log.trace("instantiating class " + className);
+ Class<?> cl =
Thread.currentThread().getContextClassLoader().loadClass(className);
+ return (CacheLoader<Object, Object>) cl.newInstance();
+ }
+
+ /**
+ * Performs a preload on the cache based on the cache loader preload configs used when
configuring the cache.
+ *
+ * @throws Exception
+ */
+ @Start(priority = 50)
+ public void preloadCache() throws CacheException
+ {
+ if (loader != null)
+ {
+ if (config.getPreload() == null || config.getPreload().equals(""))
return;
+ if (log.isDebugEnabled()) log.debug("preloading transient state from cache
loader " + loader);
+ StringTokenizer st = new StringTokenizer(config.getPreload(), ",");
+ long start, stop, total;
+ start = System.currentTimeMillis();
+ while (st.hasMoreTokens())
+ {
+ String tok = st.nextToken().trim();
+ if (log.isTraceEnabled()) log.trace("preloading " + tok);
+ preload(tok);
+ }
+
+ stop = System.currentTimeMillis();
+ total = stop - start;
+ if (log.isDebugEnabled())
+ {
+ log.debug("preloading transient state from cache loader was successful
(in " + total + " milliseconds)");
+ }
+ }
+ }
+
+ /**
+ * Preloads a specific Fqn into the cache from the configured cacheloader
+ *
+ * @param fqn fqn to preload
+ * @param preloadParents whether we preload parents
+ * @param preloadChildren whether we preload children
+ * @throws CacheException if we are unable to preload
+ */
+ public void preload(String key) throws CacheException
+ {
+
+ cache.getInvocationContext().getOptionOverrides().setSkipDataGravitation(true);
+ cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
+ // 1. Load the attributes first
+ // but this will go down the entire damn chain!! :S
+ cache.get(key);
+ }
+
+ /**
+ * Returns the configuration element of the cache loaders
+ */
+ public CacheLoaderConfig getCacheLoaderConfig()
+ {
+ return config;
+ }
+
+ /**
+ * Returns the cache loader
+ */
+ public CacheLoader<Object, Object> getCacheLoader()
+ {
+ return loader;
+ }
+
+ /**
+ * Tests if we're using passivation
+ */
+ public boolean isPassivation()
+ {
+ return config.isPassivation();
+ }
+
+ /**
+ * Returns true if at least one of the configured cache loaders has set
fetchPersistentState to true.
+ */
+ public boolean isFetchPersistentState()
+ {
+ return fetchPersistentState;
+ }
+
+ @Stop
+ public void stopCacheLoader()
+ {
+ if (loader != null)
+ {
+ // stop the cache loader
+ loader.stop();
+ // destroy the cache loader
+ loader.destroy();
+ }
+ }
+
+ @Start
+ public void startCacheLoader() throws CacheException
+ {
+ if (config == null) config = configuration.getCacheLoaderConfig();
+
+ if (config != null && loader == null)
+ {
+ try
+ {
+ loader = createCacheLoader();
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to create cache loaders", e);
+ }
+ }
+
+
+ if (loader != null)
+ {
+ try
+ {
+ // wire any deps.
+ registry.wireDependencies(loader);
+
+ // create the cache loader
+ loader.create();
+ // start the cache loader
+ loader.start();
+
+ purgeLoaders(false);
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to start cache loaders", e);
+ }
+ fetchPersistentState = fetchPersistentState || (loader.getConfig() != null
&& loader.getConfig().isFetchPersistentState());
+ fetchPersistentState = fetchPersistentState || (config != null &&
config.isFetchPersistentState());
+ }
+ }
+
+ public void purgeLoaders(boolean force) throws Exception
+ {
+ if ((loader instanceof ChainingCacheLoader) && !force)
+ {
+ ((ChainingCacheLoader<?,?>) loader).purgeIfNecessary();
+ }
+ else
+ {
+ CacheLoaderConfig.IndividualCacheLoaderConfig first =
getCacheLoaderConfig().getFirstCacheLoaderConfig();
+ if (force ||
+ (first != null && first.isPurgeOnStartup()))
+ {
+ loader.clear();
+ }
+ }
+ }
+}
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/CacheLoaderManager.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/ChainingCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/loader/ChainingCacheLoader.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/loader/ChainingCacheLoader.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -0,0 +1,389 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.jboss.starobrno.config.CacheLoaderConfig;
+import org.jboss.starobrno.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.marshall.EntryData;
+
+/**
+ * This decorator is used whenever more than one cache loader is configured. READ
operations are directed to
+ * each of the cache loaders (in the order which they were configured) until a non-null
(or non-empty in the case
+ * of retrieving collection objects) result is achieved.
+ * <p/>
+ * WRITE operations are propagated to ALL registered cacheloaders that specified set
ignoreModifications to false.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
+ */
+public class ChainingCacheLoader<K,V> extends AbstractCacheLoader<K,V>
+{
+
+ private final List<CacheLoader<K,V>> cacheLoaders = new
ArrayList<CacheLoader<K,V>>(2);
+ private final List<CacheLoader<K,V>> writeCacheLoaders = new
ArrayList<CacheLoader<K,V>>(2);
+ private final List<CacheLoaderConfig.IndividualCacheLoaderConfig>
cacheLoaderConfigs = new
ArrayList<CacheLoaderConfig.IndividualCacheLoaderConfig>(2);
+ private ComponentRegistry registry;
+
+ /**
+ * Sets the configuration. Will be called before {@link #create()} and {@link
#start()}
+ *
+ * @param config ignored
+ */
+ public void setConfig(IndividualCacheLoaderConfig config)
+ {
+ // don't do much here?
+ }
+
+ public IndividualCacheLoaderConfig getConfig()
+ {
+ return null;
+ }
+
+ @Inject
+ public void injectDependencies(ComponentRegistry registry)
+ {
+ this.registry = registry;
+ }
+
+
+ public V get(Object key)
+ {
+ V answer = null;
+ for (CacheLoader<K,V> l : cacheLoaders)
+ {
+ answer = l.get(key);
+ if (answer != null) break;
+ }
+ return answer;
+ }
+
+ /**
+ * Checks whether the CacheLoader has a node with Fqn
+ *
+ * @param name
+ * @return True if node exists, false otherwise
+ */
+ public boolean exists(Object key)
+ {
+ boolean answer = false;
+ for (CacheLoader<K,V> l : cacheLoaders)
+ {
+ answer = l.exists(key);
+ if (answer) break;
+ }
+ return answer;
+ }
+
+ /**
+ * Inserts key and value into the attributes hashmap of the given node. If the node
does not exist, all
+ * parent nodes from the root down are created automatically. Returns the old value
+ */
+ public V put(K key, V value)
+ {
+ V answer = null;
+ boolean isFirst = true;
+ for (CacheLoader<K,V> l : writeCacheLoaders)
+ {
+ V tAnswer = l.put(key, value);
+ if (isFirst)
+ {
+ answer = tAnswer;
+ isFirst = false;
+ }
+
+ }
+ return answer;
+ }
+
+ /**
+ * Inserts all modifications to the backend store. Overwrite whatever is already in
+ * the datastore.
+ *
+ * @param modifications A List<Modification> of modifications
+ * @throws Exception
+ */
+ @Override
+ public void put(List<Modification> modifications)
+ {
+ for (CacheLoader<K,V> l : writeCacheLoaders)
+ {
+ l.put(modifications);
+ }
+ }
+
+ /**
+ * Removes the given key and value. No-op if key doesn't exist.
+ * Returns the first response from the loader chain.
+ */
+ public V remove(Object key)
+ {
+ V answer = null;
+ boolean isFirst = true;
+ for (CacheLoader<K,V> l : writeCacheLoaders)
+ {
+ V tAnswer = l.remove(key);
+ if (isFirst)
+ {
+ answer = tAnswer;
+ isFirst = false;
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * Prepare the modifications. For example, for a DB-based CacheLoader:
+ * <ol>
+ * <li>Create a local (JDBC) transaction
+ * <li>Associate the local transaction with <code>tx</code> (tx is
the key)
+ * <li>Execute the coresponding SQL statements against the DB (statements
derived from modifications)
+ * </ol>
+ * For non-transactional CacheLoader (e.g. file-based), this could be a null
operation
+ *
+ * @param tx The transaction, just used as a hashmap key
+ * @param modifications List<Modification>, a list of all modifications within
the given transaction
+ * @param one_phase Persist immediately and (for example) commit the local JDBC
transaction as well. When true,
+ * we won't get a {@link #commit(Object)} or {@link
#rollback(Object)} method call later
+ * @throws Exception
+ */
+ @Override
+ public void prepare(Object tx, List<Modification> modifications, boolean
one_phase)
+ {
+ for (CacheLoader<K,V> l : writeCacheLoaders)
+ {
+ l.prepare(tx, modifications, one_phase);
+ }
+ }
+
+ /**
+ * Commit the transaction. A DB-based CacheLoader would look up the local JDBC
transaction asociated
+ * with <code>tx</code> and commit that transaction<br/>
+ * Non-transactional CacheLoaders could simply write the data that was previously
saved transiently under the
+ * given <code>tx</code> key, to (for example) a file system (note this
only holds if the previous prepare() did
+ * not define one_phase=true
+ *
+ * @param tx
+ */
+ @Override
+ public void commit(Object tx)
+ {
+ for (CacheLoader<K,V> l : writeCacheLoaders)
+ {
+ l.commit(tx);
+ }
+ }
+
+ /**
+ * Roll the transaction back. A DB-based CacheLoader would look up the local JDBC
transaction asociated
+ * with <code>tx</code> and roll back that transaction
+ *
+ * @param tx
+ */
+ @Override
+ public void rollback(Object tx)
+ {
+ for (CacheLoader<K,V> l : writeCacheLoaders)
+ {
+ l.rollback(tx);
+ }
+ }
+
+
+ /**
+ * Creates individual cache loaders.
+ *
+ * @throws Exception
+ */
+ @Override
+ public void create()
+ {
+ Iterator<CacheLoader<K, V>> it = cacheLoaders.iterator();
+ Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgIt =
cacheLoaderConfigs.iterator();
+ while (it.hasNext() && cfgIt.hasNext())
+ {
+ CacheLoader<K,V> cl = it.next();
+ CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgIt.next();
+ cl.setConfig(cfg);
+ registry.wireDependencies(cl);
+ cl.create();
+ }
+ }
+
+ @Override
+ public void start()
+ {
+ for (CacheLoader<K,V> cacheLoader : cacheLoaders)
+ {
+ cacheLoader.start();
+ }
+ }
+
+ @Override
+ public void stop()
+ {
+ for (CacheLoader<K,V> cacheLoader : cacheLoaders)
+ {
+ cacheLoader.stop();
+ }
+ }
+
+ @Override
+ public void destroy()
+ {
+ for (CacheLoader<K,V> cacheLoader : cacheLoaders)
+ {
+ cacheLoader.destroy();
+ }
+ }
+ @Override
+ public void loadEntireState(ObjectOutputStream os)
+ {
+ Iterator<CacheLoader<K, V>> i = cacheLoaders.iterator();
+ Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgs =
cacheLoaderConfigs.iterator();
+ while (i.hasNext() && cfgs.hasNext())
+ {
+ CacheLoader<K,V> l = i.next();
+ CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgs.next();
+ if (cfg.isFetchPersistentState())
+ {
+ l.loadEntireState(os);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void storeEntireState(ObjectInputStream is)
+ {
+ Iterator<CacheLoader<K, V>> i = writeCacheLoaders.iterator();
+ Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgs =
cacheLoaderConfigs.iterator();
+ while (i.hasNext())
+ {
+ CacheLoader<K,V> l = i.next();
+ CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgs.next();
+ if (cfg.isFetchPersistentState())
+ {
+ l.storeEntireState(is);
+ break;
+ }
+ }
+
+ }
+
+ /**
+ * Returns the number of cache loaders in the chain.
+ */
+ public int getSize()
+ {
+ return cacheLoaders.size();
+ }
+
+ /**
+ * Returns a List<CacheLoader> of individual cache loaders configured.
+ */
+ public List<CacheLoader<K,V>> getCacheLoaders()
+ {
+ return Collections.unmodifiableList(cacheLoaders);
+ }
+
+ /**
+ * Adds a cache loader to the chain (always added at the end of the chain)
+ *
+ * @param l the cache loader to add
+ * @param cfg and its configuration
+ */
+ public void addCacheLoader(CacheLoader<K, V> l,
CacheLoaderConfig.IndividualCacheLoaderConfig cfg)
+ {
+ synchronized (this)
+ {
+ cacheLoaderConfigs.add(cfg);
+ cacheLoaders.add(l);
+
+ if (!cfg.isIgnoreModifications())
+ {
+ writeCacheLoaders.add(l);
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("ChainingCacheLoader{");
+ Iterator<CacheLoader<K, V>> i = cacheLoaders.iterator();
+ Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> c =
cacheLoaderConfigs.iterator();
+ int count = 0;
+ while (i.hasNext() && c.hasNext())
+ {
+ CacheLoader<K,V> loader = i.next();
+ CacheLoaderConfig.IndividualCacheLoaderConfig cfg = c.next();
+
+ buf.append(++count);
+ buf.append(": IgnoreMods? ");
+ buf.append(cfg.isIgnoreModifications());
+ buf.append(" CLoader: ");
+ buf.append(loader);
+ buf.append("; ");
+ }
+ buf.append("}");
+ return buf.toString();
+ }
+
+ public void purgeIfNecessary() throws Exception
+ {
+ Iterator<CacheLoader<K, V>> loaders = cacheLoaders.iterator();
+ Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> configs =
cacheLoaderConfigs.iterator();
+
+ while (loaders.hasNext() && configs.hasNext())
+ {
+ CacheLoader<K,V> myLoader = loaders.next();
+ CacheLoaderConfig.IndividualCacheLoaderConfig myConfig = configs.next();
+
+ if (!myConfig.isIgnoreModifications() && myConfig.isPurgeOnStartup())
myLoader.clear();
+ }
+ }
+
+ public void clear()
+ {
+ for (CacheLoader<K,V> l : writeCacheLoaders)
+ l.clear();
+ }
+
+ public List<EntryData<K, V>> getAllEntries()
+ {
+ ArrayList<EntryData<K,V>> full = new
ArrayList<EntryData<K,V>>();
+
+ for (CacheLoader<K,V> l : writeCacheLoaders)
+ full.addAll(l.getAllEntries());
+
+ return full;
+ }
+
+}
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/ChainingCacheLoader.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/FileCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/loader/FileCacheLoader.java 2008-11-20
04:05:09 UTC (rev 7177)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/loader/FileCacheLoader.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -202,7 +202,7 @@
}
}
- public V put(Object key, V value)
+ public V put(K key, V value)
{
lock(key);
try
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/ReadOnlyDelegatingCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/loader/ReadOnlyDelegatingCacheLoader.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/loader/ReadOnlyDelegatingCacheLoader.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.loader;
+
+import java.io.ObjectInputStream;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Provides ignoreModifications features to all cache loaders.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
+ * @since 2.1.0
+ */
+public class ReadOnlyDelegatingCacheLoader<K,V> extends
AbstractDelegatingCacheLoader<K,V>
+{
+ private static final Log log =
LogFactory.getLog(ReadOnlyDelegatingCacheLoader.class);
+
+ public ReadOnlyDelegatingCacheLoader(CacheLoader cl)
+ {
+ super(cl);
+ }
+
+ @Override
+ public V put(K key, V value)
+ {
+ log.trace("Not delegating write operation to underlying cache loader");
+ return get(key);
+ }
+
+ @Override
+ public void put(List<Modification> modifications)
+ {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+
+ @Override
+ public V remove(Object key)
+ {
+ log.trace("Not delegating write operation to underlying cache loader");
+ return get(key);
+ }
+
+ @Override
+ public void prepare(Object tx, List<Modification> modifications, boolean
one_phase)
+ {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+
+ @Override
+ public void commit(Object tx)
+ {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+
+ @Override
+ public void rollback(Object tx)
+ {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+
+ @Override
+ public void storeEntireState(ObjectInputStream is)
+ {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+
+ @Override
+ public void clear()
+ {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+}
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/loader/ReadOnlyDelegatingCacheLoader.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/branches/flat/src/test/java/org/jboss/starobrno/tree/api/NodeAPITest.java
===================================================================
---
core/branches/flat/src/test/java/org/jboss/starobrno/tree/api/NodeAPITest.java 2008-11-20
04:05:09 UTC (rev 7177)
+++
core/branches/flat/src/test/java/org/jboss/starobrno/tree/api/NodeAPITest.java 2008-11-20
04:06:04 UTC (rev 7178)
@@ -1,4 +1,4 @@
-package org.jboss.cache.api;
+package org.jboss.starobrno.tree.api;
import org.jboss.cache.transaction.DummyTransactionManager;
import org.jboss.cache.transaction.DummyTransactionManagerLookup;