[jbosscache-commits] JBoss Cache SVN: r7338 - core/trunk/src/main/java/org/jboss/cache/interceptors.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Fri Dec 19 10:25:58 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-12-19 10:25:58 -0500 (Fri, 19 Dec 2008)
New Revision: 7338

Modified:
   core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyCacheStoreInterceptor.java
Log:
JBCACHE-1454 - Memory leak in CacheStoreInterceptor

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java	2008-12-19 02:32:29 UTC (rev 7337)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java	2008-12-19 15:25:58 UTC (rev 7338)
@@ -41,7 +41,6 @@
 import org.jboss.cache.commands.write.RemoveKeyCommand;
 import org.jboss.cache.commands.write.RemoveNodeCommand;
 import org.jboss.cache.config.CacheLoaderConfig;
-import org.jboss.cache.config.Configuration.NodeLockingScheme;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
 import org.jboss.cache.interceptors.base.SkipCheckChainedInterceptor;
@@ -53,7 +52,6 @@
 import org.jboss.cache.transaction.TransactionContext;
 
 import javax.transaction.SystemException;
-import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -61,7 +59,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Writes modifications back to the store on the way out: stores modifications back
@@ -73,13 +70,11 @@
 public class CacheStoreInterceptor extends SkipCheckChainedInterceptor
 {
    private CacheLoaderConfig loaderConfig = null;
-   private TransactionManager txMgr = null;
+   protected TransactionManager txMgr = null;
    private HashMap txStores = new HashMap();
-   private Map<GlobalTransaction, Set<Fqn>> preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Fqn>>();
    private long cacheStores = 0;
    CacheLoader loader;
    private CacheLoaderManager loaderManager;
-   private boolean optimistic;
    private boolean statsEnabled;
 
    public CacheStoreInterceptor()
@@ -102,7 +97,6 @@
    {
       // this should only happen after the CacheLoaderManager has started, since the CacheLoaderManager only creates the CacheLoader instance in its @Start method.
       loader = loaderManager.getCacheLoader();
-      optimistic = configuration.getNodeLockingScheme() == NodeLockingScheme.OPTIMISTIC;
       this.setStatisticsEnabled(configuration.getExposeManagementStatistics());
    }
 
@@ -115,7 +109,9 @@
       if ((!ctx.isOriginLocal() && loaderConfig.isShared()) || ctx.getOptionOverrides().isSuppressPersistence())
       {
          if (trace)
+         {
             log.trace("Passing up method call and bypassing this interceptor since the cache loader is shared and this call originated remotely.");
+         }
          return true;
       }
       return false;
@@ -131,40 +127,23 @@
             // this is a commit call.
             GlobalTransaction gtx = command.getGlobalTransaction();
             if (trace) log.trace("Calling loader.commit() for gtx " + gtx);
-            // sync call (a write) on the loader
-            // ignore modified FQNs
-            // List fqnsModified = getFqnsFromModificationList(txTable.get(globalTransaction).getCacheLoaderModifications());
             try
             {
                loader.commit(gtx);
             }
-            catch (Throwable t)
+            finally
             {
-               preparingTxs.remove(gtx);
-               throw t;
-            }
-            if (getStatisticsEnabled())
-            {
-               Integer puts = (Integer) txStores.get(gtx);
-               if (puts != null)
+               if (getStatisticsEnabled())
                {
-                  cacheStores = cacheStores + puts;
+                  Integer puts = (Integer) txStores.get(gtx);
+                  if (puts != null)
+                  {
+                     cacheStores = cacheStores + puts;
+                  }
+                  txStores.remove(gtx);
                }
-               txStores.remove(gtx);
             }
-            Object returnValue = invokeNextInterceptor(ctx, command);
-
-            // persist additional internal state, if any, and then clean up internal resources
-            // specifically for optimistic locking to store versioning info AFTER a tx has committed.  Hacky.
-            if (optimistic)
-            {
-               Set<Fqn> affectedFqns = preparingTxs.remove(gtx);
-               if (affectedFqns != null && !affectedFqns.isEmpty())
-               {
-                  storeInternalState(affectedFqns, ctx);
-               }
-            }
-            return returnValue;
+            return invokeNextInterceptor(ctx, command);
          }
          else
          {
@@ -184,11 +163,14 @@
          {
             GlobalTransaction gtx = command.getGlobalTransaction();
             // this is a rollback method
-            if (preparingTxs.containsKey(gtx))
+            try
             {
-               preparingTxs.remove(gtx);
                loader.rollback(gtx);
             }
+            catch (Exception e)
+            {
+               log.info("Problems rolling back transaction " + gtx + " on cache loader.  Perhaps the prepare phase hasn't been initiated on this loader?", e);
+            }
             if (getStatisticsEnabled()) txStores.remove(gtx);
          }
          else
@@ -315,39 +297,11 @@
       return handlePutKeyValueCommand(ctx, command);
    }
 
-   private boolean inTransaction() throws SystemException
+   protected boolean inTransaction() throws SystemException
    {
       return txMgr != null && txMgr.getTransaction() != null;
    }
 
-   private void storeInternalState(Set<Fqn> affectedFqns, InvocationContext ctx) throws Exception
-   {
-      if (configuration.getNodeLockingScheme().isVersionedScheme())
-      {
-         // we need to suspend any txs here since they would be in the tx-committed state and if the loader attempts to
-         // use JTA (E.g., a JDBC CL using connections from a tx aware datasource) it will fail since the tx is in an
-         // illegal state to perform writes.  See JBCACHE-1408.
-         Transaction tx = txMgr.suspend();
-         try
-         {
-            for (Fqn f : affectedFqns)
-            {
-               // NOT going to store tombstones!!
-               NodeSPI n = ctx.lookUpNode(f);
-               if (n != null && !n.isDeleted())
-               {
-                  Map internalState = n.getInternalState(false);
-                  loader.put(f, internalState);
-               }
-            }
-         }
-         finally
-         {
-            txMgr.resume(tx);
-         }
-      }
-   }
-
    private void recursiveMove(Fqn fqn, Fqn newFqn) throws Exception
    {
       loader.put(newFqn, loader.get(fqn));
@@ -362,7 +316,7 @@
       }
    }
 
-   private void prepareCacheLoader(GlobalTransaction gtx, TransactionContext transactionContext, boolean onePhase) throws Throwable
+   protected StoreModificationsBuilder prepareCacheLoader(GlobalTransaction gtx, TransactionContext transactionContext, boolean onePhase) throws Throwable
    {
       if (transactionContext == null)
       {
@@ -372,7 +326,7 @@
       if (modifications.size() == 0)
       {
          if (trace) log.trace("Transaction has not logged any modifications!");
-         return;
+         return null;
       }
       if (trace) log.trace("Cache loader modification list: " + modifications);
       StoreModificationsBuilder modsBuilder = new StoreModificationsBuilder(getStatisticsEnabled());
@@ -388,12 +342,12 @@
       {
          loader.prepare(gtx, modsBuilder.modifications, onePhase);
 
-         preparingTxs.put(gtx, modsBuilder.affectedFqns);
-         if (getStatisticsEnabled() && modsBuilder.putCount > 0)
+         if (getStatisticsEnabled() && modsBuilder.putCount > 0 && !onePhase)
          {
             txStores.put(gtx, modsBuilder.putCount);
          }
       }
+      return modsBuilder;
    }
 
    public static class StoreModificationsBuilder extends AbstractVisitor

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyCacheStoreInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyCacheStoreInterceptor.java	2008-12-19 02:32:29 UTC (rev 7337)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyCacheStoreInterceptor.java	2008-12-19 15:25:58 UTC (rev 7338)
@@ -23,15 +23,117 @@
 
 import org.jboss.cache.Fqn;
 import org.jboss.cache.InvocationContext;
+import org.jboss.cache.NodeSPI;
 import org.jboss.cache.annotations.Compat;
+import org.jboss.cache.commands.tx.CommitCommand;
+import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
+import org.jboss.cache.commands.tx.RollbackCommand;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.transaction.GlobalTransaction;
 
+import javax.transaction.Transaction;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 @Compat
 @Deprecated
 public class LegacyCacheStoreInterceptor extends CacheStoreInterceptor
 {
+   /**
+    * Used with optimistic locking to persist version information on Fqns that have changed.
+    */
+   private Map<GlobalTransaction, Set<Fqn>> preparingTxs;
+   private boolean optimistic;
+
+   @Start
+   void checkOptimistic()
+   {
+      optimistic = configuration.getNodeLockingScheme() == Configuration.NodeLockingScheme.OPTIMISTIC;
+      if (optimistic) preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Fqn>>();
+
+   }
+
    @Override
    protected void storeStateForPutDataMap(Fqn f, InvocationContext ctx) throws Exception
    {
       loader.put(f, ctx.lookUpNode(f).getDataDirect());
    }
+
+   @Override
+   protected Object handleOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
+   {
+      if (inTransaction())
+      {
+         if (trace) log.trace("transactional so don't put stuff in the cloader yet.");
+         GlobalTransaction gtx = command.getGlobalTransaction();
+         StoreModificationsBuilder smb = prepareCacheLoader(gtx, ctx.getTransactionContext(), command.isOnePhaseCommit());
+         if (smb != null && !smb.modifications.isEmpty()) preparingTxs.put(gtx, smb.affectedFqns);
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
+   protected Object handleRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+   {
+      try
+      {
+         return super.handleRollbackCommand(ctx, command);
+      }
+      finally
+      {
+         if (optimistic)
+         {
+            GlobalTransaction gtx = ctx.getGlobalTransaction();
+            preparingTxs.remove(gtx);
+         }
+      }
+   }
+
+   @Override
+   protected Object handleCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+   {
+      Object returnValue = super.handleCommitCommand(ctx, command);
+
+      // persist additional internal state, if any, and then clean up internal resources
+      // specifically for optimistic locking to store versioning info AFTER a tx has committed.  Hacky.
+      // note - do NOT do this in a finally block.  If the commit fails we shouldn't do this.
+      if (optimistic)
+      {
+         // if the commit fails, preparingTxs will be cleaned up in a call to handleRollbackCommand()
+         Set<Fqn> affectedFqns = preparingTxs.remove(ctx.getGlobalTransaction());
+         if (affectedFqns != null && !affectedFqns.isEmpty())
+         {
+            storeInternalState(affectedFqns, ctx);
+         }
+      }
+      return returnValue;
+   }
+
+   @SuppressWarnings("unchecked")
+   private void storeInternalState(Set<Fqn> affectedFqns, InvocationContext ctx) throws Exception
+   {
+      // we need to suspend any txs here since they would be in the tx-committed state and if the loader attempts to
+      // use JTA (E.g., a JDBC CL using connections from a tx aware datasource) it will fail since the tx is in an
+      // illegal state to perform writes.  See JBCACHE-1408.
+      Transaction tx = txMgr.suspend();
+      try
+      {
+         for (Fqn f : affectedFqns)
+         {
+            // NOT going to store tombstones!!
+            NodeSPI n = ctx.lookUpNode(f);
+            if (n != null && !n.isDeleted())
+            {
+               Map<Object, Object> internalState = n.getInternalState(false);
+               loader.put(f, internalState);
+            }
+         }
+      }
+      finally
+      {
+         txMgr.resume(tx);
+      }
+   }
 }




More information about the jbosscache-commits mailing list