Author: dereed
Date: 2012-08-05 13:46:37 -0400 (Sun, 05 Aug 2012)
New Revision: 8553
Modified:
core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/transaction/GlobalTransaction.java
Log:
[JBPAPP-9615] Backport JBCACHE-1621
Modified:
core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
---
core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2012-08-05
17:44:53 UTC (rev 8552)
+++
core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2012-08-05
17:46:37 UTC (rev 8553)
@@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -94,6 +95,19 @@
private final Set<Transaction> transactions = new
ConcurrentHashSet<Transaction>();
private final Map<Transaction, GlobalTransaction> rollbackTransactions = new
ConcurrentHashMap<Transaction, GlobalTransaction>(16);
+ // A remote PrepareCommand "locks" the transaction by adding gtx to the set
+ // so a remote RollbackCommand for the same transaction will not be processed
simultaneously.
+ // When it removes the lock, it also calls notifyAll to wake up any waiting rollbacks
+ // Not used for onePhaseCommit (REPL_ASYNC)
+ //
+ // Guarded by synchronized(prepareLockTable)
+ private final Set<GlobalTransaction> prepareLockTable = new
HashSet<GlobalTransaction>();
+
+ // remote transactions where a RollbackCommand has no matching PrepareCommand yet
+ //
+ // Guarded by synchronized(prepareLockTable)
+ private final Set<GlobalTransaction> unmatchedRollbacks = new
HashSet<GlobalTransaction>();
+
private long prepares = 0;
private long commits = 0;
private long rollbacks = 0;
@@ -215,12 +229,27 @@
try
{
if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call
on command [" + command + "]");
- Transaction ltx = txTable.getLocalTransaction(gtx);
- if (ltx == null)
- {
- log.warn("No local transaction for this remotely originating rollback.
Possibly rolling back before a prepare call was broadcast?");
- txTable.remove(gtx);
- return null;
+
+ Transaction ltx;
+
+ synchronized(prepareLockTable) {
+ // wait on any PrepareCommand currently running on the same
GlobalTransaction
+ while (prepareLockTable.contains(gtx))
+ {
+ prepareLockTable.wait();
+ }
+
+ ltx = txTable.getLocalTransaction(gtx);
+ if (ltx == null)
+ {
+ log.warn("No local transaction for this remotely originating
rollback. Possibly rolling back before a prepare call was broadcast?");
+ txTable.remove(gtx);
+
+ // keep track of unmatched rollbacks
+ unmatchedRollbacks.add(gtx);
+
+ return null;
+ }
}
// disconnect if we have a current tx associated
Transaction currentTx = txManager.getTransaction();
@@ -326,7 +355,21 @@
Transaction ltx = txTable.getLocalTransaction(gtx);
Transaction currentTx = txManager.getTransaction();
- Object retval = null;
+ if (!command.isOnePhaseCommit())
+ {
+ synchronized(prepareLockTable) {
+ // If a rollback has already been processed for this gtx, do nothing
+ if (unmatchedRollbacks.contains(gtx))
+ {
+ unmatchedRollbacks.remove(gtx);
+ return null;
+ }
+
+ // register "lock" on transaction
+ prepareLockTable.add(gtx);
+ }
+ }
+
boolean success = false;
try
{
@@ -448,14 +491,25 @@
transactions.remove(ltx);// JBAS-298
}
}
+ else
+ {
+ synchronized(prepareLockTable)
+ {
+ // release lock
+ prepareLockTable.remove(gtx);
+ // wake up any waiting Rollbacks
+ prepareLockTable.notifyAll();
+ }
+ }
+
txManager.suspend();// suspends ltx - could be null
// resume whatever else we had going.
if (currentTx != null) txManager.resume(currentTx);
if (trace) log.trace("Finished remote prepare " + gtx);
}
- return retval;
+ return null;
}
private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction
tx, VisitableCommand command) throws Throwable
@@ -1246,6 +1300,12 @@
return rollbacks;
}
+ @ManagedAttribute(description = "number of unmatched rollbacks")
+ public long getUnmatchedRollbacks()
+ {
+ return unmatchedRollbacks.size();
+ }
+
@ManagedAttribute(name = "numberOfSyncsRegistered", writable = false,
description = "number of transaction synchronizations currently registered")
public int getNumberOfSyncsRegistered()
{
Modified:
core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/transaction/GlobalTransaction.java
===================================================================
---
core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/transaction/GlobalTransaction.java 2012-08-05
17:44:53 UTC (rev 8552)
+++
core/support-branches/3.2.2.GA_JBCACHE-1613_JBCACHE-1617_JBCACHE-1618_JBCACHE-1619_JBCACHE-1621/src/main/java/org/jboss/cache/transaction/GlobalTransaction.java 2012-08-05
17:46:37 UTC (rev 8553)
@@ -23,11 +23,14 @@
import org.jgroups.Address;
+import org.jgroups.stack.IpAddress;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.Arrays;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
@@ -47,6 +50,13 @@
private static AtomicLong sid = new AtomicLong(0);
+ // addr + instanceId = Unique ID for the cache even after restarting with the same
IP:port
+ private static byte[] instanceId = new byte[4];
+ static
+ {
+ new Random().nextBytes(instanceId);
+ }
+
private Address addr = null;
private long id = -1;
private transient boolean remote = false;
@@ -63,6 +73,18 @@
private GlobalTransaction(Address addr)
{
+ // store instanceId in IpAddress.additionalData (backwards compatible
serialization)
+ if(addr != null && addr instanceof IpAddress)
+ {
+ try
+ {
+ addr = (Address)((IpAddress)addr).clone();
+ ((IpAddress)addr).setAdditionalData(instanceId);
+ }
+ catch ( CloneNotSupportedException e )
+ {
+ }
+ }
this.addr = addr;
id = sid.getAndIncrement();
}
@@ -77,6 +99,7 @@
return addr;
}
+ // only used during externalization
public void setAddress(Address address)
{
addr = address;
@@ -87,12 +110,20 @@
return id;
}
+ public byte [] getInstanceId ()
+ {
+ if(addr == null || ! (addr instanceof IpAddress))
+ return null;
+ return ((IpAddress)addr).getAdditionalData();
+ }
+
@Override
public int hashCode()
{
if (hash_code == -1)
{
- hash_code = (addr != null ? addr.hashCode() : 0) + (int) id;
+ hash_code = (addr != null ? addr.hashCode() : 0) + (int) id
+ + Arrays.hashCode(getInstanceId());
}
return hash_code;
}
@@ -107,7 +138,7 @@
GlobalTransaction otherGtx = (GlobalTransaction) other;
boolean aeq = (addr == null) ? (otherGtx.addr == null) :
addr.equals(otherGtx.addr);
- return aeq && (id == otherGtx.id);
+ return aeq && (id == otherGtx.id) && Arrays.equals(getInstanceId(),
otherGtx.getInstanceId());
}
@Override