[jbosscache-commits] JBoss Cache SVN: r8550 - core/trunk/src/main/java/org/jboss/cache/interceptors.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Sun Aug 5 13:23:57 EDT 2012


Author: dereed
Date: 2012-08-05 13:23:57 -0400 (Sun, 05 Aug 2012)
New Revision: 8550

Modified:
   core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
Log:
[JBCACHE-1621] Handle RollbackCommand processed before its matching PrepareCommand.


Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2012-08-05 17:20:57 UTC (rev 8549)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2012-08-05 17:23:57 UTC (rev 8550)
@@ -23,6 +23,7 @@
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -94,6 +95,19 @@
    private final Set<Transaction> transactions = new ConcurrentHashSet<Transaction>();
    private final Map<Transaction, GlobalTransaction> rollbackTransactions = new ConcurrentHashMap<Transaction, GlobalTransaction>(16);
 
+   // A remote PrepareCommand "locks" the transaction by adding gtx to the set
+   // so a remote RollbackCommand for the same transaction will not be processed simultaneously.
+   // When it removes the lock, it also calls notifyAll to wake up any waiting rollbacks
+   // Not used for onePhaseCommit (REPL_ASYNC)
+   //
+   // Guarded by synchronized(prepareLockTable)
+   private final Set<GlobalTransaction> prepareLockTable = new HashSet<GlobalTransaction>();
+
+   // remote transactions where a RollbackCommand has no matching PrepareCommand yet
+   //
+   // Guarded by synchronized(prepareLockTable)
+   private final Set<GlobalTransaction> unmatchedRollbacks = new HashSet<GlobalTransaction>();
+
    private long prepares = 0;
    private long commits = 0;
    private long rollbacks = 0;
@@ -215,12 +229,27 @@
       try
       {
          if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call on command [" + command + "]");
-         Transaction ltx = txTable.getLocalTransaction(gtx);
-         if (ltx == null)
-         {
-            log.warn("No local transaction for this remotely originating rollback.  Possibly rolling back before a prepare call was broadcast?");
-            txTable.remove(gtx);
-            return null;
+
+         Transaction ltx;
+
+         synchronized(prepareLockTable) {
+            // wait on any PrepareCommand currently running on the same GlobalTransaction
+            while (prepareLockTable.contains(gtx))
+            {
+               prepareLockTable.wait();
+            }
+
+            ltx = txTable.getLocalTransaction(gtx);
+            if (ltx == null)
+            {
+               log.warn("No local transaction for this remotely originating rollback.  Possibly rolling back before a prepare call was broadcast?");
+               txTable.remove(gtx);
+
+               // keep track of unmatched rollbacks
+               unmatchedRollbacks.add(gtx);
+
+               return null;
+            }
          }
          // disconnect if we have a current tx associated
          Transaction currentTx = txManager.getTransaction();
@@ -326,7 +355,21 @@
       Transaction ltx = txTable.getLocalTransaction(gtx);
       Transaction currentTx = txManager.getTransaction();
 
-      Object retval = null;
+      if (!command.isOnePhaseCommit())
+      {
+         synchronized(prepareLockTable) {
+            // If a rollback has already been processed for this gtx, do nothing
+            if (unmatchedRollbacks.contains(gtx))
+            {
+               unmatchedRollbacks.remove(gtx);
+               return null;
+            }
+
+            // register "lock" on transaction
+            prepareLockTable.add(gtx);
+         }
+      }
+
       boolean success = false;
       try
       {
@@ -448,14 +491,25 @@
                transactions.remove(ltx);// JBAS-298
             }
          }
+         else
+         {
+            synchronized(prepareLockTable)
+            {
+               // release lock
+               prepareLockTable.remove(gtx);
 
+               // wake up any waiting Rollbacks
+               prepareLockTable.notifyAll();
+            }
+         }
+
          txManager.suspend();// suspends ltx - could be null
          // resume whatever else we had going.
          if (currentTx != null) txManager.resume(currentTx);
          if (trace) log.trace("Finished remote prepare " + gtx);
       }
 
-      return retval;
+      return null;
    }
 
    private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction tx, VisitableCommand command) throws Throwable
@@ -1246,6 +1300,12 @@
       return rollbacks;
    }
 
+   @ManagedAttribute(description = "number of unmatched rollbacks")
+   public long getUnmatchedRollbacks()
+   {
+      return unmatchedRollbacks.size();
+   }
+
    @ManagedAttribute(name = "numberOfSyncsRegistered", writable = false, description = "number of transaction synchronizations currently registered")
    public int getNumberOfSyncsRegistered()
    {



More information about the jbosscache-commits mailing list