JBoss Cache SVN: r7338 - core/trunk/src/main/java/org/jboss/cache/interceptors.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-12-19 10:25:58 -0500 (Fri, 19 Dec 2008)
New Revision: 7338
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyCacheStoreInterceptor.java
Log:
JBCACHE-1454 - Memory leak in CacheStoreInterceptor
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2008-12-19 02:32:29 UTC (rev 7337)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2008-12-19 15:25:58 UTC (rev 7338)
@@ -41,7 +41,6 @@
import org.jboss.cache.commands.write.RemoveKeyCommand;
import org.jboss.cache.commands.write.RemoveNodeCommand;
import org.jboss.cache.config.CacheLoaderConfig;
-import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.interceptors.base.SkipCheckChainedInterceptor;
@@ -53,7 +52,6 @@
import org.jboss.cache.transaction.TransactionContext;
import javax.transaction.SystemException;
-import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.HashMap;
@@ -61,7 +59,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
/**
* Writes modifications back to the store on the way out: stores modifications back
@@ -73,13 +70,11 @@
public class CacheStoreInterceptor extends SkipCheckChainedInterceptor
{
private CacheLoaderConfig loaderConfig = null;
- private TransactionManager txMgr = null;
+ protected TransactionManager txMgr = null;
private HashMap txStores = new HashMap();
- private Map<GlobalTransaction, Set<Fqn>> preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Fqn>>();
private long cacheStores = 0;
CacheLoader loader;
private CacheLoaderManager loaderManager;
- private boolean optimistic;
private boolean statsEnabled;
public CacheStoreInterceptor()
@@ -102,7 +97,6 @@
{
// this should only happen after the CacheLoaderManager has started, since the CacheLoaderManager only creates the CacheLoader instance in its @Start method.
loader = loaderManager.getCacheLoader();
- optimistic = configuration.getNodeLockingScheme() == NodeLockingScheme.OPTIMISTIC;
this.setStatisticsEnabled(configuration.getExposeManagementStatistics());
}
@@ -115,7 +109,9 @@
if ((!ctx.isOriginLocal() && loaderConfig.isShared()) || ctx.getOptionOverrides().isSuppressPersistence())
{
if (trace)
+ {
log.trace("Passing up method call and bypassing this interceptor since the cache loader is shared and this call originated remotely.");
+ }
return true;
}
return false;
@@ -131,40 +127,23 @@
// this is a commit call.
GlobalTransaction gtx = command.getGlobalTransaction();
if (trace) log.trace("Calling loader.commit() for gtx " + gtx);
- // sync call (a write) on the loader
- // ignore modified FQNs
- // List fqnsModified = getFqnsFromModificationList(txTable.get(globalTransaction).getCacheLoaderModifications());
try
{
loader.commit(gtx);
}
- catch (Throwable t)
+ finally
{
- preparingTxs.remove(gtx);
- throw t;
- }
- if (getStatisticsEnabled())
- {
- Integer puts = (Integer) txStores.get(gtx);
- if (puts != null)
+ if (getStatisticsEnabled())
{
- cacheStores = cacheStores + puts;
+ Integer puts = (Integer) txStores.get(gtx);
+ if (puts != null)
+ {
+ cacheStores = cacheStores + puts;
+ }
+ txStores.remove(gtx);
}
- txStores.remove(gtx);
}
- Object returnValue = invokeNextInterceptor(ctx, command);
-
- // persist additional internal state, if any, and then clean up internal resources
- // specifically for optimistic locking to store versioning info AFTER a tx has committed. Hacky.
- if (optimistic)
- {
- Set<Fqn> affectedFqns = preparingTxs.remove(gtx);
- if (affectedFqns != null && !affectedFqns.isEmpty())
- {
- storeInternalState(affectedFqns, ctx);
- }
- }
- return returnValue;
+ return invokeNextInterceptor(ctx, command);
}
else
{
@@ -184,11 +163,14 @@
{
GlobalTransaction gtx = command.getGlobalTransaction();
// this is a rollback method
- if (preparingTxs.containsKey(gtx))
+ try
{
- preparingTxs.remove(gtx);
loader.rollback(gtx);
}
+ catch (Exception e)
+ {
+ log.info("Problems rolling back transaction " + gtx + " on cache loader. Perhaps the prepare phase hasn't been initiated on this loader?", e);
+ }
if (getStatisticsEnabled()) txStores.remove(gtx);
}
else
@@ -315,39 +297,11 @@
return handlePutKeyValueCommand(ctx, command);
}
- private boolean inTransaction() throws SystemException
+ protected boolean inTransaction() throws SystemException
{
return txMgr != null && txMgr.getTransaction() != null;
}
- private void storeInternalState(Set<Fqn> affectedFqns, InvocationContext ctx) throws Exception
- {
- if (configuration.getNodeLockingScheme().isVersionedScheme())
- {
- // we need to suspend any txs here since they would be in the tx-committed state and if the loader attempts to
- // use JTA (E.g., a JDBC CL using connections from a tx aware datasource) it will fail since the tx is in an
- // illegal state to perform writes. See JBCACHE-1408.
- Transaction tx = txMgr.suspend();
- try
- {
- for (Fqn f : affectedFqns)
- {
- // NOT going to store tombstones!!
- NodeSPI n = ctx.lookUpNode(f);
- if (n != null && !n.isDeleted())
- {
- Map internalState = n.getInternalState(false);
- loader.put(f, internalState);
- }
- }
- }
- finally
- {
- txMgr.resume(tx);
- }
- }
- }
-
private void recursiveMove(Fqn fqn, Fqn newFqn) throws Exception
{
loader.put(newFqn, loader.get(fqn));
@@ -362,7 +316,7 @@
}
}
- private void prepareCacheLoader(GlobalTransaction gtx, TransactionContext transactionContext, boolean onePhase) throws Throwable
+ protected StoreModificationsBuilder prepareCacheLoader(GlobalTransaction gtx, TransactionContext transactionContext, boolean onePhase) throws Throwable
{
if (transactionContext == null)
{
@@ -372,7 +326,7 @@
if (modifications.size() == 0)
{
if (trace) log.trace("Transaction has not logged any modifications!");
- return;
+ return null;
}
if (trace) log.trace("Cache loader modification list: " + modifications);
StoreModificationsBuilder modsBuilder = new StoreModificationsBuilder(getStatisticsEnabled());
@@ -388,12 +342,12 @@
{
loader.prepare(gtx, modsBuilder.modifications, onePhase);
- preparingTxs.put(gtx, modsBuilder.affectedFqns);
- if (getStatisticsEnabled() && modsBuilder.putCount > 0)
+ if (getStatisticsEnabled() && modsBuilder.putCount > 0 && !onePhase)
{
txStores.put(gtx, modsBuilder.putCount);
}
}
+ return modsBuilder;
}
public static class StoreModificationsBuilder extends AbstractVisitor
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyCacheStoreInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyCacheStoreInterceptor.java 2008-12-19 02:32:29 UTC (rev 7337)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyCacheStoreInterceptor.java 2008-12-19 15:25:58 UTC (rev 7338)
@@ -23,15 +23,117 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.NodeSPI;
import org.jboss.cache.annotations.Compat;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
+import org.jboss.cache.commands.tx.RollbackCommand;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.transaction.GlobalTransaction;
+import javax.transaction.Transaction;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
@Compat
@Deprecated
public class LegacyCacheStoreInterceptor extends CacheStoreInterceptor
{
+ /**
+ * Used with optimistic locking to persist version information on Fqns that have changed.
+ */
+ private Map<GlobalTransaction, Set<Fqn>> preparingTxs;
+ private boolean optimistic;
+
+ @Start
+ void checkOptimistic()
+ {
+ optimistic = configuration.getNodeLockingScheme() == Configuration.NodeLockingScheme.OPTIMISTIC;
+ if (optimistic) preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Fqn>>();
+
+ }
+
@Override
protected void storeStateForPutDataMap(Fqn f, InvocationContext ctx) throws Exception
{
loader.put(f, ctx.lookUpNode(f).getDataDirect());
}
+
+ @Override
+ protected Object handleOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
+ {
+ if (inTransaction())
+ {
+ if (trace) log.trace("transactional so don't put stuff in the cloader yet.");
+ GlobalTransaction gtx = command.getGlobalTransaction();
+ StoreModificationsBuilder smb = prepareCacheLoader(gtx, ctx.getTransactionContext(), command.isOnePhaseCommit());
+ if (smb != null && !smb.modifications.isEmpty()) preparingTxs.put(gtx, smb.affectedFqns);
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ protected Object handleRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+ {
+ try
+ {
+ return super.handleRollbackCommand(ctx, command);
+ }
+ finally
+ {
+ if (optimistic)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ preparingTxs.remove(gtx);
+ }
+ }
+ }
+
+ @Override
+ protected Object handleCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+ {
+ Object returnValue = super.handleCommitCommand(ctx, command);
+
+ // persist additional internal state, if any, and then clean up internal resources
+ // specifically for optimistic locking to store versioning info AFTER a tx has committed. Hacky.
+ // note - do NOT do this in a finally block. If the commit fails we shouldn't do this.
+ if (optimistic)
+ {
+ // if the commit fails, preparingTxs will be cleaned up in a call to handleRollbackCommand()
+ Set<Fqn> affectedFqns = preparingTxs.remove(ctx.getGlobalTransaction());
+ if (affectedFqns != null && !affectedFqns.isEmpty())
+ {
+ storeInternalState(affectedFqns, ctx);
+ }
+ }
+ return returnValue;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void storeInternalState(Set<Fqn> affectedFqns, InvocationContext ctx) throws Exception
+ {
+ // we need to suspend any txs here since they would be in the tx-committed state and if the loader attempts to
+ // use JTA (E.g., a JDBC CL using connections from a tx aware datasource) it will fail since the tx is in an
+ // illegal state to perform writes. See JBCACHE-1408.
+ Transaction tx = txMgr.suspend();
+ try
+ {
+ for (Fqn f : affectedFqns)
+ {
+ // NOT going to store tombstones!!
+ NodeSPI n = ctx.lookUpNode(f);
+ if (n != null && !n.isDeleted())
+ {
+ Map<Object, Object> internalState = n.getInternalState(false);
+ loader.put(f, internalState);
+ }
+ }
+ }
+ finally
+ {
+ txMgr.resume(tx);
+ }
+ }
}