Author: manik.surtani(a)jboss.com
Date: 2008-03-07 12:05:23 -0500 (Fri, 07 Mar 2008)
New Revision: 5400
Modified:
core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OrderedSynchronizationHandler.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/main/java/org/jboss/cache/transaction/OptimisticTransactionEntry.java
core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java
core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
core/trunk/src/test/java/org/jboss/cache/transaction/AbortionTest.java
core/trunk/src/test/java/org/jboss/cache/transaction/InvocationContextCleanupTest.java
core/trunk/src/test/java/org/jboss/cache/transaction/NotifyingTransactionManager.java
Log:
JBCACHE-1304 - OrderedSynchronizationHandler thread safety and scoping issues
Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2008-03-07 15:48:54 UTC (rev
5399)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2008-03-07 17:05:23 UTC (rev
5400)
@@ -2115,8 +2115,16 @@
Address addr = rpcManager.getLocalAddress();
gtx = GlobalTransaction.create(addr);
transactionTable.put(tx, gtx);
- TransactionEntry ent = configuration.isNodeLockingOptimistic() ? new
OptimisticTransactionEntry() : new TransactionEntry();
- ent.setTransaction(tx);
+ TransactionEntry ent = null;
+ try
+ {
+ ent = configuration.isNodeLockingOptimistic() ? new
OptimisticTransactionEntry(tx) : new TransactionEntry(tx);
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Unable to create a transaction entry!",
e);
+ }
+
transactionTable.put(gtx, ent);
if (trace)
{
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OrderedSynchronizationHandler.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OrderedSynchronizationHandler.java 2008-03-07
15:48:54 UTC (rev 5399)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OrderedSynchronizationHandler.java 2008-03-07
17:05:23 UTC (rev 5400)
@@ -7,9 +7,7 @@
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
-import java.util.HashMap;
import java.util.LinkedList;
-import java.util.Map;
/**
* Maintains a list of Synchronization handlers. Reason is that we have to
@@ -26,36 +24,15 @@
private Transaction tx = null;
private LinkedList<Synchronization> handlers = new
LinkedList<Synchronization>();
- /**
- * Map<Transaction,OrderedSynchronizationHandler>
- */
- static Map instances = new HashMap();
-
static Log log = LogFactory.getLog(OrderedSynchronizationHandler.class);
- private OrderedSynchronizationHandler(Transaction tx)
+ public OrderedSynchronizationHandler(Transaction tx) throws SystemException,
RollbackException
{
this.tx = tx;
+ tx.registerSynchronization(this);
}
- /**
- * Creates a new instance of OrderedSynchronizationHandler, or fetches an existing
instance. Key is the local
- * transaction (tx). This instance registers with the TransactionManager
automatically
- *
- * @param tx
- */
- public static OrderedSynchronizationHandler getInstance(Transaction tx) throws
SystemException, RollbackException
- {
- OrderedSynchronizationHandler retval = (OrderedSynchronizationHandler)
instances.get(tx);
- if (retval != null) return retval;
- retval = new OrderedSynchronizationHandler(tx);
- tx.registerSynchronization(retval);
- instances.put(tx, retval);
- return retval;
- }
-
-
public void registerAtHead(Synchronization handler)
{
register(handler, true);
@@ -101,9 +78,6 @@
}
}
- // finally unregister us from the hashmap
- instances.remove(tx);
-
// throw the exception so the TM can deal with it.
if (exceptionInAfterCompletion != null) throw exceptionInAfterCompletion;
}
@@ -114,13 +88,13 @@
sb.append("tx=" + getTxAsString() + ", handlers=" + handlers);
return sb.toString();
}
-
+
private String getTxAsString()
{
// JBCACHE-1114 -- don't call toString() on tx or it can lead to stack
overflow
if (tx == null)
return null;
-
+
return tx.getClass().getName() + "@" + System.identityHashCode(tx);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-03-07
15:48:54 UTC (rev 5399)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-03-07
17:05:23 UTC (rev 5400)
@@ -383,20 +383,24 @@
// entry for TX in txTable, the modifications
// below will need this entry to add their modifications
// under the GlobalTx key
+ TransactionEntry entry;
if (txTable.get(gtx) == null)
{
// create a new transaction entry
- TransactionEntry entry = configuration.isNodeLockingOptimistic() ? new
OptimisticTransactionEntry() : new TransactionEntry();
- entry.setTransaction(ltx);
+ entry = configuration.isNodeLockingOptimistic() ? new
OptimisticTransactionEntry(ltx) : new TransactionEntry(ltx);
log.debug("creating new tx entry");
txTable.put(gtx, entry);
if (trace) log.trace("TxTable contents: " + txTable);
}
+ else
+ {
+ entry = txTable.get(gtx);
+ }
setTransactionalContext(ltx, gtx, ctx);
// register a sync handler for this tx.
- registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx), ctx);
+ registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx), ctx, entry);
if (configuration.isNodeLockingOptimistic())
{
@@ -1054,7 +1058,7 @@
}
// see the comment in the LocalSyncHandler for the last isOriginLocal param.
LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx,
tx, !ctx.isOriginLocal());
- registerHandler(tx, myHandler, ctx);
+ registerHandler(tx, myHandler, ctx, txTable.get(gtx));
}
}
else if ((gtx = (GlobalTransaction) rollbackTransactions.get(tx)) != null)
@@ -1076,9 +1080,9 @@
* @param handler
* @throws Exception
*/
- private void registerHandler(Transaction tx, Synchronization handler,
InvocationContext ctx) throws Exception
+ private void registerHandler(Transaction tx, Synchronization handler,
InvocationContext ctx, TransactionEntry entry) throws Exception
{
- OrderedSynchronizationHandler orderedHandler =
OrderedSynchronizationHandler.getInstance(tx);
+ OrderedSynchronizationHandler orderedHandler =
entry.getOrderedSynchronizationHandler();
//OrderedSynchronizationHandler.getInstance(tx);
if (trace) log.trace("registering for TX completion:
SynchronizationHandler(" + handler + ")");
Modified:
core/trunk/src/main/java/org/jboss/cache/transaction/OptimisticTransactionEntry.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/transaction/OptimisticTransactionEntry.java 2008-03-07
15:48:54 UTC (rev 5399)
+++
core/trunk/src/main/java/org/jboss/cache/transaction/OptimisticTransactionEntry.java 2008-03-07
17:05:23 UTC (rev 5400)
@@ -9,6 +9,10 @@
import org.jboss.cache.optimistic.TransactionWorkspace;
import org.jboss.cache.optimistic.TransactionWorkspaceImpl;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
/**
* Subclasses the {@link TransactionEntry} class to add a {@link TransactionWorkspace}.
Used with optimistic locking
* where each call is assigned a trasnaction and a transaction workspace.
@@ -21,6 +25,11 @@
{
private TransactionWorkspace transactionWorkSpace = new TransactionWorkspaceImpl();
+ public OptimisticTransactionEntry(Transaction tx) throws SystemException,
RollbackException
+ {
+ super(tx);
+ }
+
public String toString()
{
StringBuffer sb = new StringBuffer(super.toString());
Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java 2008-03-07
15:48:54 UTC (rev 5399)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java 2008-03-07
17:05:23 UTC (rev 5400)
@@ -15,11 +15,14 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.Modification;
import org.jboss.cache.config.Option;
+import org.jboss.cache.interceptors.OrderedSynchronizationHandler;
import org.jboss.cache.invocation.CacheInvocationDelegate;
import org.jboss.cache.lock.IdentityLock;
import org.jboss.cache.lock.NodeLock;
import org.jboss.cache.marshall.MethodCall;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
import javax.transaction.Transaction;
import java.util.ArrayList;
import java.util.Collection;
@@ -57,6 +60,7 @@
*/
private Transaction ltx = null;
private Option option;
+ private OrderedSynchronizationHandler orderedSynchronizationHandler;
private boolean forceAsyncReplication = false;
private boolean forceSyncReplication = false;
@@ -97,6 +101,12 @@
*/
private List<Fqn> removedNodes = new LinkedList<Fqn>();
+ public TransactionEntry(Transaction tx) throws SystemException, RollbackException
+ {
+ ltx = tx;
+ orderedSynchronizationHandler = new OrderedSynchronizationHandler(tx);
+ }
+
/**
* Adds a modification to the modification list.
*/
@@ -412,4 +422,14 @@
{
return this.option;
}
+
+ public OrderedSynchronizationHandler getOrderedSynchronizationHandler()
+ {
+ return orderedSynchronizationHandler;
+ }
+
+ public void setOrderedSynchronizationHandler(OrderedSynchronizationHandler
orderedSynchronizationHandler)
+ {
+ this.orderedSynchronizationHandler = orderedSynchronizationHandler;
+ }
}
Modified: core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java 2008-03-07
15:48:54 UTC (rev 5399)
+++ core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java 2008-03-07
17:05:23 UTC (rev 5400)
@@ -1,8 +1,8 @@
package org.jboss.cache.loader;
import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
-import org.jboss.cache.CacheSPI;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.interceptors.OrderedSynchronizationHandler;
@@ -11,6 +11,7 @@
import org.jboss.cache.notifications.annotation.CacheListener;
import org.jboss.cache.notifications.annotation.NodeCreated;
import org.jboss.cache.notifications.event.Event;
+import org.jboss.cache.transaction.GlobalTransaction;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -184,7 +185,15 @@
{
int oldStartCount = START_COUNT;
cache.getTransactionManager().begin();
-
OrderedSynchronizationHandler.getInstance(cache.getTransactionManager().getTransaction()).registerAtTail(
+
+ cache.put(FQN, "key", "value");
+ cache.put(FQN, "key2", "value2");
+
+ GlobalTransaction gtx =
cache.getTransactionTable().get(cache.getTransactionManager().getTransaction());
+ OrderedSynchronizationHandler osh =
cache.getTransactionTable().get(gtx).getOrderedSynchronizationHandler();
+
+//
OrderedSynchronizationHandler.getInstance(cache.getTransactionManager().getTransaction()).registerAtTail(
+ osh.registerAtTail(
new Synchronization()
{
@@ -201,8 +210,6 @@
}
);
- cache.put(FQN, "key", "value");
- cache.put(FQN, "key2", "value2");
cache.getTransactionManager().commit();
Map m = new HashMap();
Modified: core/trunk/src/test/java/org/jboss/cache/transaction/AbortionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/AbortionTest.java 2008-03-07
15:48:54 UTC (rev 5399)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/AbortionTest.java 2008-03-07
17:05:23 UTC (rev 5400)
@@ -34,25 +34,16 @@
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception
{
- System.out.println("********* START: SET UP *************");
cache1 = initCache(false);
TestingUtil.sleepThread(1500); // to ensure cache1 is the coordinator
cache2 = initCache(false);
cache3 = initCache(true);
- System.out.println("********* END: SET UP *************");
}
@AfterMethod(alwaysRun = true)
public void tearDown() throws Exception
{
- System.out.println("********* START: TEAR DOWN *************");
- destroyCache(cache3);
- destroyCache(cache2);
- destroyCache(cache1);
- cache1 = null;
- cache2 = null;
- cache3 = null;
- System.out.println("********* END: TEAR DOWN *************");
+ TestingUtil.killCaches(cache3, cache2, cache1);
}
private CacheSPI initCache(boolean notifying)
@@ -121,6 +112,7 @@
TransactionManager mgr2 = cache2.getTransactionManager();
assertTrue(cache3.getTransactionManager() instanceof NotifyingTransactionManager);
NotifyingTransactionManager mgr3 = (NotifyingTransactionManager)
cache3.getTransactionManager();
+ mgr3.setCache(cache3);
assertSame(mgr1, mgr2);
assertNotSame(mgr1, mgr3);
@@ -160,8 +152,10 @@
this.abortBeforeCompletion = abortBeforeCompletion;
}
- public void notify(Transaction tx) throws SystemException, RollbackException
+ public void notify(Transaction tx, TransactionEntry entry) throws SystemException,
RollbackException
{
+ OrderedSynchronizationHandler osh = entry.getOrderedSynchronizationHandler();
+
final Transaction finalTx = tx;
System.out.println("Notify called.");
// add an aborting sync handler.
@@ -197,7 +191,6 @@
}
};
- OrderedSynchronizationHandler osh =
OrderedSynchronizationHandler.getInstance(tx);
osh.registerAtHead(abort);
System.out.println("Added sync handler.");
}
Modified:
core/trunk/src/test/java/org/jboss/cache/transaction/InvocationContextCleanupTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/transaction/InvocationContextCleanupTest.java 2008-03-07
15:48:54 UTC (rev 5399)
+++
core/trunk/src/test/java/org/jboss/cache/transaction/InvocationContextCleanupTest.java 2008-03-07
17:05:23 UTC (rev 5400)
@@ -81,10 +81,13 @@
mgr.begin();
- OrderedSynchronizationHandler orderedHandler =
OrderedSynchronizationHandler.getInstance(mgr.getTransaction());
+ cache0.put("/test", "x", "y");
+
+ GlobalTransaction gtx = cache0.getTransactionTable().get(mgr.getTransaction());
+ OrderedSynchronizationHandler orderedHandler =
cache0.getTransactionTable().get(gtx).getOrderedSynchronizationHandler();
+// OrderedSynchronizationHandler orderedHandler =
OrderedSynchronizationHandler.getInstance(mgr.getTransaction());
orderedHandler.registerAtTail(new DummySynchronization(cache0, mgr));
- cache0.put("/test", "x", "y");
try
{
mgr.commit();
Modified:
core/trunk/src/test/java/org/jboss/cache/transaction/NotifyingTransactionManager.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/transaction/NotifyingTransactionManager.java 2008-03-07
15:48:54 UTC (rev 5399)
+++
core/trunk/src/test/java/org/jboss/cache/transaction/NotifyingTransactionManager.java 2008-03-07
17:05:23 UTC (rev 5400)
@@ -7,7 +7,10 @@
package org.jboss.cache.transaction;
-import javax.transaction.NotSupportedException;
+import org.jboss.cache.CacheSPI;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
@@ -24,18 +27,35 @@
private static final long serialVersionUID = -2994163352889758708L;
private Notification notification;
+ private CacheSPI cache;
- public void begin() throws SystemException, NotSupportedException
+ @Override
+ public void commit() throws HeuristicMixedException, SystemException,
HeuristicRollbackException, RollbackException
{
- super.begin();
+ notifyListeners();
+ super.commit();
+ }
+
+ @Override
+ public void rollback() throws SystemException
+ {
+ notifyListeners();
+ super.rollback();
+ }
+
+ private void notifyListeners()
+ {
try
{
- System.out.println("Calling notification.notify()");
- notification.notify(getTransaction());
+ log.debug("Calling notification.notify()");
+ TransactionTable txTable = cache.getTransactionTable();
+ Transaction tx = getTransaction();
+ GlobalTransaction gtx = txTable.get(tx);
+ notification.notify(tx, txTable.get(gtx));
}
- catch (RollbackException e)
+ catch (Exception e)
{
- e.printStackTrace();
+ log.debug(e);
}
}
@@ -46,15 +66,23 @@
public interface Notification
{
- public void notify(Transaction tx) throws SystemException, RollbackException;
+ public void notify(Transaction tx, TransactionEntry entry) throws SystemException,
RollbackException;
}
-
+ public CacheSPI getCache()
+ {
+ return cache;
+ }
+
+ public void setCache(CacheSPI cache)
+ {
+ this.cache = cache;
+ }
+
public Notification getNotification()
{
return notification;
}
-
public void setNotification(Notification notification)
{