[jbosscache-commits] JBoss Cache SVN: r4727 - in core/trunk/src: main/java/org/jboss/cache/interceptors and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Nov 5 11:25:48 EST 2007


Author: manik.surtani at jboss.com
Date: 2007-11-05 11:25:48 -0500 (Mon, 05 Nov 2007)
New Revision: 4727

Modified:
   core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java
   core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
   core/trunk/src/test/java/org/jboss/cache/transaction/SimultaneousRollbackAndPutTest.java
Log:
JBCACHE-923 - reduces the likelihood of race conditions described.

Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java	2007-11-03 02:27:34 UTC (rev 4726)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java	2007-11-05 16:25:48 UTC (rev 4727)
@@ -48,18 +48,7 @@
 import org.jboss.cache.util.concurrent.ConcurrentHashSet;
 import org.jboss.util.stream.MarshalledValueInputStream;
 import org.jboss.util.stream.MarshalledValueOutputStream;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.ChannelClosedException;
-import org.jgroups.ChannelException;
-import org.jgroups.ChannelFactory;
-import org.jgroups.ChannelNotConnectedException;
-import org.jgroups.ExtendedMembershipListener;
-import org.jgroups.ExtendedMessageListener;
-import org.jgroups.JChannel;
-import org.jgroups.Message;
-import org.jgroups.StateTransferException;
-import org.jgroups.View;
+import org.jgroups.*;
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.RpcDispatcher;
 import org.jgroups.blocks.RspFilter;
@@ -77,16 +66,7 @@
 import java.io.NotSerializableException;
 import java.io.OutputStream;
 import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Vector;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -4028,7 +4008,7 @@
       }
       catch (Throwable t)
       {
-         throw new RuntimeException(t);
+         throw new CacheException(t);
       }
       finally
       {

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java	2007-11-03 02:27:34 UTC (rev 4726)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java	2007-11-05 16:25:48 UTC (rev 4727)
@@ -23,6 +23,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.config.Configuration;
@@ -180,6 +181,22 @@
       }
    }
 
+   /**
+    * Tests whether the caller is in a valid transaction.  If not, will throw a CacheException.
+    */
+   protected void assertTransactionValid(InvocationContext ctx)
+   {
+      Transaction tx = ctx.getTransaction();
+      if (!isValid(tx)) try
+      {
+         throw new CacheException("Invalid transaction " + tx + ", status = " + (tx == null ? null : tx.getStatus()));
+      }
+      catch (SystemException e)
+      {
+         throw new CacheException("Exception trying to analyse status of transaction " + tx, e);
+      }
+   }
+
    public String toString()
    {
       return getClass().getName()

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java	2007-11-03 02:27:34 UTC (rev 4726)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java	2007-11-05 16:25:48 UTC (rev 4727)
@@ -6,6 +6,7 @@
  */
 package org.jboss.cache.interceptors;
 
+import org.jboss.cache.CacheException;
 import org.jboss.cache.CacheImpl;
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
@@ -271,6 +272,12 @@
       Object child_name;
       Thread currentThread = Thread.currentThread();
       GlobalTransaction gtx = ctx.getGlobalTransaction();
+      // if the tx associated with the current thread is rolling back, barf! JBCACHE-923
+      if (gtx != null)
+      {
+         assertTransactionValid(ctx);
+      }
+
       Object owner = (gtx != null) ? gtx : currentThread;
       int treeNodeSize;
 
@@ -428,7 +435,16 @@
       {
          // add the lock to the list of locks maintained for this transaction
          // (needed for release of locks on commit or rollback)
-         cache.getTransactionTable().addLock(gtx, lock);
+         try
+         {
+            cache.getTransactionTable().addLock(gtx, lock);
+         }
+         catch (CacheException e)
+         {
+            // may happen, if the transaction entry does not exist
+            lock.release(gtx);
+            throw e;
+         }
       }
       else
       {
@@ -518,9 +534,13 @@
 
    private void cleanup(GlobalTransaction gtx)
    {
+      if (log.isTraceEnabled()) log.trace("Cleaning up locks for gtx " + gtx);
       TransactionEntry entry = tx_table.get(gtx);
       // Let's do it in stack style, LIFO
-      entry.releaseAllLocksLIFO(gtx);
+      if (entry != null)
+         entry.releaseAllLocksLIFO(gtx);
+      else
+         log.error("No transaction entry present!!", new Throwable());
 
 /*
       Transaction ltx = entry.getTransaction();

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2007-11-03 02:27:34 UTC (rev 4726)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2007-11-05 16:25:48 UTC (rev 4727)
@@ -7,7 +7,6 @@
 package org.jboss.cache.interceptors;
 
 import org.jboss.cache.CacheException;
-import org.jboss.cache.CacheSPI;
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.ReplicationException;
 import org.jboss.cache.config.Configuration;
@@ -243,7 +242,7 @@
 
          setTransactionalContext(ltx, gtx, ctx);
          // register a sync handler for this tx.
-         registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, cache), ctx);
+         registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx), ctx);
 
          if (configuration.isNodeLockingOptimistic())
          {
@@ -895,7 +894,7 @@
                log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);
             }
             // see the comment in the LocalSyncHandler for the last isOriginLocal param.
-            LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, cache, !ctx.isOriginLocal());
+            LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, !ctx.isOriginLocal());
             registerHandler(tx, myHandler, ctx);
          }
       }
@@ -998,17 +997,16 @@
    {
       Transaction tx = null;
       GlobalTransaction gtx = null;
-      CacheSPI cache = null;
+//      CacheSPI cache = null;
       List modifications = null;
       TransactionEntry entry = null;
       protected InvocationContext ctx; // the context for this call.
 
 
-      RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache)
+      RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx)
       {
          this.gtx = gtx;
          this.tx = tx;
-         this.cache = cache;
       }
 
       public void beforeCompletion()
@@ -1057,7 +1055,7 @@
                modifications = entry.getModifications();
                ctx.setOptionOverrides(entry.getOption());
             }
-            transactions.remove(tx);
+            if (tx != null) transactions.remove(tx);
 
             switch (status)
             {
@@ -1101,7 +1099,6 @@
       {
          this.tx = null;
          this.gtx = null;
-         this.cache = null;
          this.modifications = null;
          this.entry = null;
       }
@@ -1141,12 +1138,11 @@
        *
        * @param gtx
        * @param tx
-       * @param cache
        * @param remoteLocal
        */
-      LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache, boolean remoteLocal)
+      LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, boolean remoteLocal)
       {
-         super(gtx, tx, cache);
+         super(gtx, tx);
          this.remoteLocal = remoteLocal;
       }
 
@@ -1188,7 +1184,7 @@
                   }
                   break;
                default:
-                  throw new CacheException("transaction " + tx + " in status " + tx.getStatus() + " unbale to start transaction");
+                  throw new CacheException("transaction " + tx + " in status " + tx.getStatus() + " unable to start transaction");
             }
          }
          catch (Throwable t)
@@ -1221,6 +1217,8 @@
          if (ctx == null) ctx = cache.getInvocationContext();
          ctx.setLocalRollbackOnly(localRollbackOnly);
          ctx.setOptionOverrides(transactionalOptions);
+         ctx.setTransaction(tx);
+         ctx.setGlobalTransaction(gtx);
          try
          {
             super.afterCompletion(status);

Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java	2007-11-03 02:27:34 UTC (rev 4726)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java	2007-11-05 16:25:48 UTC (rev 4727)
@@ -8,6 +8,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
 import org.jboss.cache.lock.NodeLock;
 import org.jboss.cache.marshall.MethodCall;
 
@@ -210,8 +211,7 @@
       TransactionEntry entry = get(gtx);
       if (entry == null)
       {
-         log.error("transaction entry not found for (gtx=" + gtx + ")");
-         return;
+         throw new CacheException("Unable to record lock for transaction " + gtx + " since no transaction entry exists!");
       }
       entry.addLock(l);
    }

Modified: core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java	2007-11-03 02:27:34 UTC (rev 4726)
+++ core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java	2007-11-05 16:25:48 UTC (rev 4727)
@@ -416,4 +416,29 @@
          }
       }
    }
+
+   /**
+    * Clears any associated transactions with the current thread in the caches' transaction managers.
+    */
+   public static void killTransactions(Cache... caches)
+   {
+      for (Cache c: caches)
+      {
+         if (c != null && c.getCacheStatus() == CacheStatus.STARTED)
+         {
+            CacheImpl ci = (CacheImpl) c;
+            if (ci.getTransactionManager() != null)
+            {
+               try
+               {
+                  ci.getTransactionManager().rollback();
+               }
+               catch (Exception e)
+               {
+                  // don't care
+               }
+            }
+         }
+      }
+   }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/transaction/SimultaneousRollbackAndPutTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/SimultaneousRollbackAndPutTest.java	2007-11-03 02:27:34 UTC (rev 4726)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/SimultaneousRollbackAndPutTest.java	2007-11-05 16:25:48 UTC (rev 4727)
@@ -1,9 +1,14 @@
 package org.jboss.cache.transaction;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.Cache;
+import org.jboss.cache.CacheException;
 import org.jboss.cache.CacheImpl;
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.Fqn;
+import org.jboss.cache.misc.TestingUtil;
+import org.jboss.cache.util.CachePrinter;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeTest;
@@ -20,12 +25,13 @@
  *
  * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
  */
- at Test(groups = {"functional", "transaction"}, enabled = false) // Known issue - disabled because of JBCACHE-923
+ at Test(groups = {"functional", "transaction"}, enabled = true) // Known issue - disabled because of JBCACHE-923
 public class SimultaneousRollbackAndPutTest
 {
    private Cache cache;
    private TransactionManager tm;
    private Fqn A = Fqn.fromString("/a"), B = Fqn.fromString("/b");
+   private Log log = LogFactory.getLog(SimultaneousRollbackAndPutTest.class);
 
    @BeforeTest(alwaysRun = true)
    protected void setUp() throws Exception
@@ -40,18 +46,30 @@
    @AfterTest(alwaysRun = true)
    protected void tearDown()
    {
-      cache.stop();
+      TestingUtil.killCaches(cache);
    }
 
    @AfterMethod(alwaysRun = true)
-   protected void resetCache()
+   protected void resetCache() throws Exception
    {
-      cache.removeNode(B);
-      cache.getRoot().getChild(A).clearData();
-      cache.put(A, "k", "v");
+      try
+      {
+         cache.removeNode(B);
+         cache.getRoot().getChild(A).clearData();
+         cache.put(A, "k", "v");
+         // make sure we clean up any txs associa with the thread
+         TestingUtil.killTransactions(cache);
+      }
+      catch (Exception e)
+      {
+         // restart the cache
+         tearDown();
+         setUp();
+      }
+
    }
 
-   @Test(invocationCount = 200, alwaysRun = false, enabled = false)
+   @Test(invocationCount = 100, alwaysRun = false, enabled = false, description = "This is to do with a flaw in the way pessimistic locking deals with transactions.  See JBCACHE-923")
    public void testStaleLocks() throws Exception
    {
       // scenario:
@@ -63,7 +81,7 @@
       cache.put(B, "k", "v");
 
       // now the container should attempt to rollback the tx in a separate thread.
-      new Thread()
+      Thread rollbackThread = new Thread("RollbackThread")
       {
          public void run()
          {
@@ -76,19 +94,34 @@
                exceptions.add(e);
             }
          }
-      }.start();
+      };
+      rollbackThread.start();
 
-      // now try and put stuff in the main thread again
-      cache.put(A, "k2", "v2");
       try
       {
+         // now try and put stuff in the main thread again
+         cache.put(A, "k2", "v2");         
          tm.commit();
+//         assert false : "Should never reach here";
       }
       catch (RollbackException expected)
       {
          // this is expected.
       }
+      catch (CacheException ce)
+      {
+         // also expected at times
+      }
 
+      rollbackThread.join();
+
+      if (((CacheImpl) cache).getNumberOfLocksHeld() > 0)
+      {
+         log.fatal("***********");
+         log.fatal(CachePrinter.printCacheLockingInfo(cache));
+         log.fatal("***********");
+      }
+
       assert 0 == ((CacheImpl) cache).getNumberOfLocksHeld();
 
       if (exceptions.size() > 0) throw ((Exception) exceptions.get(0));




More information about the jbosscache-commits mailing list