[jbosscache-commits] JBoss Cache SVN: r8553 - in core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache: transaction and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Sun Aug 5 13:46:38 EDT 2012


Author: dereed
Date: 2012-08-05 13:46:37 -0400 (Sun, 05 Aug 2012)
New Revision: 8553

Modified:
   core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
   core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/transaction/GlobalTransaction.java
Log:
[JBPAPP-9615] Backport JBCACHE-1621


Modified: core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2012-08-05 17:44:53 UTC (rev 8552)
+++ core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2012-08-05 17:46:37 UTC (rev 8553)
@@ -23,6 +23,7 @@
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -94,6 +95,19 @@
    private final Set<Transaction> transactions = new ConcurrentHashSet<Transaction>();
    private final Map<Transaction, GlobalTransaction> rollbackTransactions = new ConcurrentHashMap<Transaction, GlobalTransaction>(16);
 
+   // A remote PrepareCommand "locks" the transaction by adding gtx to the set
+   // so a remote RollbackCommand for the same transaction will not be processed simultaneously.
+   // When it removes the lock, it also calls notifyAll to wake up any waiting rollbacks
+   // Not used for onePhaseCommit (REPL_ASYNC)
+   //
+   // Guarded by synchronized(prepareLockTable)
+   private final Set<GlobalTransaction> prepareLockTable = new HashSet<GlobalTransaction>();
+
+   // remote transactions where a RollbackCommand has no matching PrepareCommand yet
+   //
+   // Guarded by synchronized(prepareLockTable)
+   private final Set<GlobalTransaction> unmatchedRollbacks = new HashSet<GlobalTransaction>();
+
    private long prepares = 0;
    private long commits = 0;
    private long rollbacks = 0;
@@ -215,12 +229,27 @@
       try
       {
          if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call on command [" + command + "]");
-         Transaction ltx = txTable.getLocalTransaction(gtx);
-         if (ltx == null)
-         {
-            log.warn("No local transaction for this remotely originating rollback.  Possibly rolling back before a prepare call was broadcast?");
-            txTable.remove(gtx);
-            return null;
+
+         Transaction ltx;
+
+         synchronized(prepareLockTable) {
+            // wait on any PrepareCommand currently running on the same GlobalTransaction
+            while (prepareLockTable.contains(gtx))
+            {
+               prepareLockTable.wait();
+            }
+
+            ltx = txTable.getLocalTransaction(gtx);
+            if (ltx == null)
+            {
+               log.warn("No local transaction for this remotely originating rollback.  Possibly rolling back before a prepare call was broadcast?");
+               txTable.remove(gtx);
+
+               // keep track of unmatched rollbacks
+               unmatchedRollbacks.add(gtx);
+
+               return null;
+            }
          }
          // disconnect if we have a current tx associated
          Transaction currentTx = txManager.getTransaction();
@@ -326,7 +355,21 @@
       Transaction ltx = txTable.getLocalTransaction(gtx);
       Transaction currentTx = txManager.getTransaction();
 
-      Object retval = null;
+      if (!command.isOnePhaseCommit())
+      {
+         synchronized(prepareLockTable) {
+            // If a rollback has already been processed for this gtx, do nothing
+            if (unmatchedRollbacks.contains(gtx))
+            {
+               unmatchedRollbacks.remove(gtx);
+               return null;
+            }
+
+            // register "lock" on transaction
+            prepareLockTable.add(gtx);
+         }
+      }
+
       boolean success = false;
       try
       {
@@ -448,14 +491,25 @@
                transactions.remove(ltx);// JBAS-298
             }
          }
+         else
+         {
+            synchronized(prepareLockTable)
+            {
+               // release lock
+               prepareLockTable.remove(gtx);
 
+               // wake up any waiting Rollbacks
+               prepareLockTable.notifyAll();
+            }
+         }
+
          txManager.suspend();// suspends ltx - could be null
          // resume whatever else we had going.
          if (currentTx != null) txManager.resume(currentTx);
          if (trace) log.trace("Finished remote prepare " + gtx);
       }
 
-      return retval;
+      return null;
    }
 
    private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction tx, VisitableCommand command) throws Throwable
@@ -1246,6 +1300,12 @@
       return rollbacks;
    }
 
+   @ManagedAttribute(description = "number of unmatched rollbacks")
+   public long getUnmatchedRollbacks()
+   {
+      return unmatchedRollbacks.size();
+   }
+
    @ManagedAttribute(name = "numberOfSyncsRegistered", writable = false, description = "number of transaction synchronizations currently registered")
    public int getNumberOfSyncsRegistered()
    {

Modified: core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/transaction/GlobalTransaction.java
===================================================================
--- core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/transaction/GlobalTransaction.java	2012-08-05 17:44:53 UTC (rev 8552)
+++ core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/transaction/GlobalTransaction.java	2012-08-05 17:46:37 UTC (rev 8553)
@@ -23,11 +23,14 @@
 
 
 import org.jgroups.Address;
+import org.jgroups.stack.IpAddress;
 
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
 
 
@@ -47,6 +50,13 @@
 
    private static AtomicLong sid = new AtomicLong(0);
 
+   // addr + instanceId = Unique ID for the cache even after restarting with the same IP:port
+   private static byte[] instanceId = new byte[4];
+   static
+   {
+      new Random().nextBytes(instanceId);
+   }
+
    private Address addr = null;
    private long id = -1;
    private transient boolean remote = false;
@@ -63,6 +73,18 @@
 
    private GlobalTransaction(Address addr)
    {
+      // store instanceId in IpAddress.additionalData (backwards compatible serialization)
+      if(addr != null && addr instanceof IpAddress)
+      {
+         try
+         {
+            addr = (Address)((IpAddress)addr).clone();
+            ((IpAddress)addr).setAdditionalData(instanceId);
+         }
+         catch ( CloneNotSupportedException e )
+         {
+         }
+      }
       this.addr = addr;
       id = sid.getAndIncrement();
    }
@@ -77,6 +99,7 @@
       return addr;
    }
 
+   // only used during externalization
    public void setAddress(Address address)
    {
       addr = address;
@@ -87,12 +110,20 @@
       return id;
    }
 
+   public byte [] getInstanceId ()
+   {
+      if(addr == null || ! (addr instanceof IpAddress))
+         return null;
+      return ((IpAddress)addr).getAdditionalData();
+   }
+
    @Override
    public int hashCode()
    {
       if (hash_code == -1)
       {
-         hash_code = (addr != null ? addr.hashCode() : 0) + (int) id;
+         hash_code = (addr != null ? addr.hashCode() : 0) + (int) id 
+            + Arrays.hashCode(getInstanceId());
       }
       return hash_code;
    }
@@ -107,7 +138,7 @@
 
       GlobalTransaction otherGtx = (GlobalTransaction) other;
       boolean aeq = (addr == null) ? (otherGtx.addr == null) : addr.equals(otherGtx.addr);
-      return aeq && (id == otherGtx.id);
+      return aeq && (id == otherGtx.id) && Arrays.equals(getInstanceId(), otherGtx.getInstanceId());
    }
 
    @Override



More information about the jbosscache-commits mailing list