Author: dereed
Date: 2012-08-05 13:23:57 -0400 (Sun, 05 Aug 2012)
New Revision: 8550
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
Log:
[JBCACHE-1621] Handle RollbackCommand processed before its matching PrepareCommand.
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2012-08-05
17:20:57 UTC (rev 8549)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2012-08-05
17:23:57 UTC (rev 8550)
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -94,6 +95,19 @@
private final Set<Transaction> transactions = new
ConcurrentHashSet<Transaction>();
private final Map<Transaction, GlobalTransaction> rollbackTransactions = new
ConcurrentHashMap<Transaction, GlobalTransaction>(16);
+ // A remote PrepareCommand "locks" the transaction by adding gtx to the set
+ // so a remote RollbackCommand for the same transaction will not be processed
simultaneously.
+ // When it removes the lock, it also calls notifyAll to wake up any waiting rollbacks
+ // Not used for onePhaseCommit (REPL_ASYNC)
+ //
+ // Guarded by synchronized(prepareLockTable)
+ private final Set<GlobalTransaction> prepareLockTable = new
HashSet<GlobalTransaction>();
+
+ // remote transactions where a RollbackCommand has no matching PrepareCommand yet
+ //
+ // Guarded by synchronized(prepareLockTable)
+ private final Set<GlobalTransaction> unmatchedRollbacks = new
HashSet<GlobalTransaction>();
+
private long prepares = 0;
private long commits = 0;
private long rollbacks = 0;
@@ -215,12 +229,27 @@
try
{
if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call
on command [" + command + "]");
- Transaction ltx = txTable.getLocalTransaction(gtx);
- if (ltx == null)
- {
- log.warn("No local transaction for this remotely originating rollback.
Possibly rolling back before a prepare call was broadcast?");
- txTable.remove(gtx);
- return null;
+
+ Transaction ltx;
+
+ synchronized(prepareLockTable) {
+ // wait on any PrepareCommand currently running on the same
GlobalTransaction
+ while (prepareLockTable.contains(gtx))
+ {
+ prepareLockTable.wait();
+ }
+
+ ltx = txTable.getLocalTransaction(gtx);
+ if (ltx == null)
+ {
+ log.warn("No local transaction for this remotely originating
rollback. Possibly rolling back before a prepare call was broadcast?");
+ txTable.remove(gtx);
+
+ // keep track of unmatched rollbacks
+ unmatchedRollbacks.add(gtx);
+
+ return null;
+ }
}
// disconnect if we have a current tx associated
Transaction currentTx = txManager.getTransaction();
@@ -326,7 +355,21 @@
Transaction ltx = txTable.getLocalTransaction(gtx);
Transaction currentTx = txManager.getTransaction();
- Object retval = null;
+ if (!command.isOnePhaseCommit())
+ {
+ synchronized(prepareLockTable) {
+ // If a rollback has already been processed for this gtx, do nothing
+ if (unmatchedRollbacks.contains(gtx))
+ {
+ unmatchedRollbacks.remove(gtx);
+ return null;
+ }
+
+ // register "lock" on transaction
+ prepareLockTable.add(gtx);
+ }
+ }
+
boolean success = false;
try
{
@@ -448,14 +491,25 @@
transactions.remove(ltx);// JBAS-298
}
}
+ else
+ {
+ synchronized(prepareLockTable)
+ {
+ // release lock
+ prepareLockTable.remove(gtx);
+ // wake up any waiting Rollbacks
+ prepareLockTable.notifyAll();
+ }
+ }
+
txManager.suspend();// suspends ltx - could be null
// resume whatever else we had going.
if (currentTx != null) txManager.resume(currentTx);
if (trace) log.trace("Finished remote prepare " + gtx);
}
- return retval;
+ return null;
}
private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction
tx, VisitableCommand command) throws Throwable
@@ -1246,6 +1300,12 @@
return rollbacks;
}
+ @ManagedAttribute(description = "number of unmatched rollbacks")
+ public long getUnmatchedRollbacks()
+ {
+ return unmatchedRollbacks.size();
+ }
+
@ManagedAttribute(name = "numberOfSyncsRegistered", writable = false,
description = "number of transaction synchronizations currently registered")
public int getNumberOfSyncsRegistered()
{