Author: manik.surtani(a)jboss.com
Date: 2007-11-05 11:25:48 -0500 (Mon, 05 Nov 2007)
New Revision: 4727
Modified:
core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java
core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
core/trunk/src/test/java/org/jboss/cache/transaction/SimultaneousRollbackAndPutTest.java
Log:
JBCACHE-923 - reduces the likelihood of race conditions described.
Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2007-11-03 02:27:34 UTC (rev
4726)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2007-11-05 16:25:48 UTC (rev
4727)
@@ -48,18 +48,7 @@
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
import org.jboss.util.stream.MarshalledValueInputStream;
import org.jboss.util.stream.MarshalledValueOutputStream;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.ChannelClosedException;
-import org.jgroups.ChannelException;
-import org.jgroups.ChannelFactory;
-import org.jgroups.ChannelNotConnectedException;
-import org.jgroups.ExtendedMembershipListener;
-import org.jgroups.ExtendedMessageListener;
-import org.jgroups.JChannel;
-import org.jgroups.Message;
-import org.jgroups.StateTransferException;
-import org.jgroups.View;
+import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.RspFilter;
@@ -77,16 +66,7 @@
import java.io.NotSerializableException;
import java.io.OutputStream;
import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Vector;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -4028,7 +4008,7 @@
}
catch (Throwable t)
{
- throw new RuntimeException(t);
+ throw new CacheException(t);
}
finally
{
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java 2007-11-03
02:27:34 UTC (rev 4726)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java 2007-11-05
16:25:48 UTC (rev 4727)
@@ -23,6 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.config.Configuration;
@@ -180,6 +181,22 @@
}
}
+ /**
+ * Tests whether the caller is in a valid transaction. If not, will throw a
CacheException.
+ */
+ protected void assertTransactionValid(InvocationContext ctx)
+ {
+ Transaction tx = ctx.getTransaction();
+ if (!isValid(tx)) try
+ {
+ throw new CacheException("Invalid transaction " + tx + ", status
= " + (tx == null ? null : tx.getStatus()));
+ }
+ catch (SystemException e)
+ {
+ throw new CacheException("Exception trying to analyse status of transaction
" + tx, e);
+ }
+ }
+
public String toString()
{
return getClass().getName()
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java 2007-11-03
02:27:34 UTC (rev 4726)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java 2007-11-05
16:25:48 UTC (rev 4727)
@@ -6,6 +6,7 @@
*/
package org.jboss.cache.interceptors;
+import org.jboss.cache.CacheException;
import org.jboss.cache.CacheImpl;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
@@ -271,6 +272,12 @@
Object child_name;
Thread currentThread = Thread.currentThread();
GlobalTransaction gtx = ctx.getGlobalTransaction();
+ // if the tx associated with the current thread is rolling back, barf! JBCACHE-923
+ if (gtx != null)
+ {
+ assertTransactionValid(ctx);
+ }
+
Object owner = (gtx != null) ? gtx : currentThread;
int treeNodeSize;
@@ -428,7 +435,16 @@
{
// add the lock to the list of locks maintained for this transaction
// (needed for release of locks on commit or rollback)
- cache.getTransactionTable().addLock(gtx, lock);
+ try
+ {
+ cache.getTransactionTable().addLock(gtx, lock);
+ }
+ catch (CacheException e)
+ {
+ // may happen, if the transaction entry does not exist
+ lock.release(gtx);
+ throw e;
+ }
}
else
{
@@ -518,9 +534,13 @@
private void cleanup(GlobalTransaction gtx)
{
+ if (log.isTraceEnabled()) log.trace("Cleaning up locks for gtx " + gtx);
TransactionEntry entry = tx_table.get(gtx);
// Let's do it in stack style, LIFO
- entry.releaseAllLocksLIFO(gtx);
+ if (entry != null)
+ entry.releaseAllLocksLIFO(gtx);
+ else
+ log.error("No transaction entry present!!", new Throwable());
/*
Transaction ltx = entry.getTransaction();
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2007-11-03
02:27:34 UTC (rev 4726)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2007-11-05
16:25:48 UTC (rev 4727)
@@ -7,7 +7,6 @@
package org.jboss.cache.interceptors;
import org.jboss.cache.CacheException;
-import org.jboss.cache.CacheSPI;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.ReplicationException;
import org.jboss.cache.config.Configuration;
@@ -243,7 +242,7 @@
setTransactionalContext(ltx, gtx, ctx);
// register a sync handler for this tx.
- registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, cache), ctx);
+ registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx), ctx);
if (configuration.isNodeLockingOptimistic())
{
@@ -895,7 +894,7 @@
log.trace("Registering sync handler for tx " + tx + ", gtx
" + gtx);
}
// see the comment in the LocalSyncHandler for the last isOriginLocal param.
- LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx,
tx, cache, !ctx.isOriginLocal());
+ LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx,
tx, !ctx.isOriginLocal());
registerHandler(tx, myHandler, ctx);
}
}
@@ -998,17 +997,16 @@
{
Transaction tx = null;
GlobalTransaction gtx = null;
- CacheSPI cache = null;
+// CacheSPI cache = null;
List modifications = null;
TransactionEntry entry = null;
protected InvocationContext ctx; // the context for this call.
- RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI
cache)
+ RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx)
{
this.gtx = gtx;
this.tx = tx;
- this.cache = cache;
}
public void beforeCompletion()
@@ -1057,7 +1055,7 @@
modifications = entry.getModifications();
ctx.setOptionOverrides(entry.getOption());
}
- transactions.remove(tx);
+ if (tx != null) transactions.remove(tx);
switch (status)
{
@@ -1101,7 +1099,6 @@
{
this.tx = null;
this.gtx = null;
- this.cache = null;
this.modifications = null;
this.entry = null;
}
@@ -1141,12 +1138,11 @@
*
* @param gtx
* @param tx
- * @param cache
* @param remoteLocal
*/
- LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache,
boolean remoteLocal)
+ LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, boolean
remoteLocal)
{
- super(gtx, tx, cache);
+ super(gtx, tx);
this.remoteLocal = remoteLocal;
}
@@ -1188,7 +1184,7 @@
}
break;
default:
- throw new CacheException("transaction " + tx + " in
status " + tx.getStatus() + " unbale to start transaction");
+ throw new CacheException("transaction " + tx + " in
status " + tx.getStatus() + " unable to start transaction");
}
}
catch (Throwable t)
@@ -1221,6 +1217,8 @@
if (ctx == null) ctx = cache.getInvocationContext();
ctx.setLocalRollbackOnly(localRollbackOnly);
ctx.setOptionOverrides(transactionalOptions);
+ ctx.setTransaction(tx);
+ ctx.setGlobalTransaction(gtx);
try
{
super.afterCompletion(status);
Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java 2007-11-03
02:27:34 UTC (rev 4726)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java 2007-11-05
16:25:48 UTC (rev 4727)
@@ -8,6 +8,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
import org.jboss.cache.lock.NodeLock;
import org.jboss.cache.marshall.MethodCall;
@@ -210,8 +211,7 @@
TransactionEntry entry = get(gtx);
if (entry == null)
{
- log.error("transaction entry not found for (gtx=" + gtx +
")");
- return;
+ throw new CacheException("Unable to record lock for transaction " +
gtx + " since no transaction entry exists!");
}
entry.addLock(l);
}
Modified: core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java 2007-11-03 02:27:34 UTC
(rev 4726)
+++ core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java 2007-11-05 16:25:48 UTC
(rev 4727)
@@ -416,4 +416,29 @@
}
}
}
+
+ /**
+ * Clears any associated transactions with the current thread in the caches'
transaction managers.
+ */
+ public static void killTransactions(Cache... caches)
+ {
+ for (Cache c: caches)
+ {
+ if (c != null && c.getCacheStatus() == CacheStatus.STARTED)
+ {
+ CacheImpl ci = (CacheImpl) c;
+ if (ci.getTransactionManager() != null)
+ {
+ try
+ {
+ ci.getTransactionManager().rollback();
+ }
+ catch (Exception e)
+ {
+ // don't care
+ }
+ }
+ }
+ }
+ }
}
Modified:
core/trunk/src/test/java/org/jboss/cache/transaction/SimultaneousRollbackAndPutTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/transaction/SimultaneousRollbackAndPutTest.java 2007-11-03
02:27:34 UTC (rev 4726)
+++
core/trunk/src/test/java/org/jboss/cache/transaction/SimultaneousRollbackAndPutTest.java 2007-11-05
16:25:48 UTC (rev 4727)
@@ -1,9 +1,14 @@
package org.jboss.cache.transaction;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.jboss.cache.Cache;
+import org.jboss.cache.CacheException;
import org.jboss.cache.CacheImpl;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
+import org.jboss.cache.misc.TestingUtil;
+import org.jboss.cache.util.CachePrinter;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
@@ -20,12 +25,13 @@
*
* @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
*/
-@Test(groups = {"functional", "transaction"}, enabled = false) //
Known issue - disabled because of JBCACHE-923
+@Test(groups = {"functional", "transaction"}, enabled = true) //
Known issue - disabled because of JBCACHE-923
public class SimultaneousRollbackAndPutTest
{
private Cache cache;
private TransactionManager tm;
private Fqn A = Fqn.fromString("/a"), B = Fqn.fromString("/b");
+ private Log log = LogFactory.getLog(SimultaneousRollbackAndPutTest.class);
@BeforeTest(alwaysRun = true)
protected void setUp() throws Exception
@@ -40,18 +46,30 @@
@AfterTest(alwaysRun = true)
protected void tearDown()
{
- cache.stop();
+ TestingUtil.killCaches(cache);
}
@AfterMethod(alwaysRun = true)
- protected void resetCache()
+ protected void resetCache() throws Exception
{
- cache.removeNode(B);
- cache.getRoot().getChild(A).clearData();
- cache.put(A, "k", "v");
+ try
+ {
+ cache.removeNode(B);
+ cache.getRoot().getChild(A).clearData();
+ cache.put(A, "k", "v");
+ // make sure we clean up any txs associa with the thread
+ TestingUtil.killTransactions(cache);
+ }
+ catch (Exception e)
+ {
+ // restart the cache
+ tearDown();
+ setUp();
+ }
+
}
- @Test(invocationCount = 200, alwaysRun = false, enabled = false)
+ @Test(invocationCount = 100, alwaysRun = false, enabled = false, description =
"This is to do with a flaw in the way pessimistic locking deals with transactions.
See JBCACHE-923")
public void testStaleLocks() throws Exception
{
// scenario:
@@ -63,7 +81,7 @@
cache.put(B, "k", "v");
// now the container should attempt to rollback the tx in a separate thread.
- new Thread()
+ Thread rollbackThread = new Thread("RollbackThread")
{
public void run()
{
@@ -76,19 +94,34 @@
exceptions.add(e);
}
}
- }.start();
+ };
+ rollbackThread.start();
- // now try and put stuff in the main thread again
- cache.put(A, "k2", "v2");
try
{
+ // now try and put stuff in the main thread again
+ cache.put(A, "k2", "v2");
tm.commit();
+// assert false : "Should never reach here";
}
catch (RollbackException expected)
{
// this is expected.
}
+ catch (CacheException ce)
+ {
+ // also expected at times
+ }
+ rollbackThread.join();
+
+ if (((CacheImpl) cache).getNumberOfLocksHeld() > 0)
+ {
+ log.fatal("***********");
+ log.fatal(CachePrinter.printCacheLockingInfo(cache));
+ log.fatal("***********");
+ }
+
assert 0 == ((CacheImpl) cache).getNumberOfLocksHeld();
if (exceptions.size() > 0) throw ((Exception) exceptions.get(0));