[infinispan-commits] Infinispan SVN: r1231 - in trunk/core/src: main/java/org/infinispan/transaction/xa and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Nov 27 11:42:32 EST 2009
Author: manik.surtani at jboss.com
Date: 2009-11-27 11:42:31 -0500 (Fri, 27 Nov 2009)
New Revision: 1231
Added:
trunk/core/src/test/java/org/infinispan/tx/StaleLockRecoveryTest.java
Modified:
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
Log:
[ISPN-292] ( Implement a stale lock cleanup mechanism )
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-11-27 16:29:36 UTC (rev 1230)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-11-27 16:42:31 UTC (rev 1231)
@@ -474,21 +474,6 @@
synchronized (membersListLock) {
boolean needNotification = false;
if (newMembers != null) {
-
- // TODO: Implement breaking stale locks for dead members. This should be in the TxInterceptor or TransactionTable, with a listener on the cache manager.
-// if (members != null) {
- // we had a membership list before this event. Check to make sure we haven't lost any members,
- // and if so, determine what members have been removed
- // and roll back any tx and break any locks
-// List<org.jgroups.Address> removed = toJGroupsAddressList(members);
-// removed.removeAll(newMembers);
-// spi.createInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
-// if (root != null)
-// {
- //removeLocksForDeadMembers(root.getDelegationTarget(), removed);
-// }
-// }
-
oldMembers = members;
// we need a defensive copy anyway
members = fromJGroupsAddressList(newMembers);
Modified: trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java 2009-11-27 16:29:36 UTC (rev 1230)
+++ trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java 2009-11-27 16:42:31 UTC (rev 1231)
@@ -2,13 +2,20 @@
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.config.Configuration;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Stop;
import org.infinispan.interceptors.InterceptorChain;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.remoting.MembershipArithmetic;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
@@ -16,7 +23,12 @@
import javax.transaction.Transaction;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* Repository for {@link org.infinispan.transaction.xa.RemoteTransaction} and {@link
@@ -41,11 +53,12 @@
private CacheNotifier notifier;
private RpcManager rpcManager;
private GlobalTransactionFactory gtf;
+ private ExecutorService lockBreakingService = Executors.newFixedThreadPool(1);
-
@Inject
public void initialize(CommandsFactory commandsFactory, RpcManager rpcManager, Configuration configuration,
- InvocationContextContainer icc, InterceptorChain invoker, CacheNotifier notifier, GlobalTransactionFactory gtf) {
+ InvocationContextContainer icc, InterceptorChain invoker, CacheNotifier notifier,
+ GlobalTransactionFactory gtf, CacheManager cm) {
this.commandsFactory = commandsFactory;
this.rpcManager = rpcManager;
this.configuration = configuration;
@@ -53,9 +66,49 @@
this.invoker = invoker;
this.notifier = notifier;
this.gtf = gtf;
+ cm.addListener(new StaleTransactionCleanup());
}
+ @Stop
+ private void stop() {
+ lockBreakingService.shutdownNow();
+ }
+ @Listener
+ public class StaleTransactionCleanup {
+ @ViewChanged
+ public void onViewChange(ViewChangedEvent vce) {
+ final List<Address> leavers = MembershipArithmetic.getMembersLeft(vce.getOldMembers(), vce.getNewMembers());
+ if (!leavers.isEmpty()) {
+ if (trace) log.trace("Saw {0} leavers - kicking off a lock breaking task", leavers.size());
+ lockBreakingService.submit(new Runnable() {
+ public void run() {
+ Set<GlobalTransaction> toKill = new HashSet<GlobalTransaction>();
+ for (GlobalTransaction gt: remoteTransactions.keySet()) {
+ if (leavers.contains(gt.getAddress())) toKill.add(gt);
+ }
+
+ if (trace) log.trace("Global transactions {0} pertain to leavers list {1} and need to be killed", toKill, leavers);
+
+ for (GlobalTransaction gtx: toKill) {
+ if (trace) log.trace("Killing {0}", gtx);
+ RollbackCommand rc = new RollbackCommand(gtx);
+ rc.init(invoker, icc, TransactionTable.this);
+ try {
+ rc.perform(null);
+ } catch (Throwable e) {
+ log.warn("Unable to roll back gtx " + gtx, e);
+ } finally {
+ removeRemoteTransaction(gtx);
+ }
+ }
+ }
+ });
+ }
+ }
+ }
+
+
/**
* Returns the {@link org.infinispan.transaction.xa.RemoteTransaction} associated with the supplied transaction id.
* Returns null if no such association exists.
@@ -159,4 +212,6 @@
public boolean containRemoteTx(GlobalTransaction globalTransaction) {
return remoteTransactions.containsKey(globalTransaction);
}
+
+
}
Added: trunk/core/src/test/java/org/infinispan/tx/StaleLockRecoveryTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/tx/StaleLockRecoveryTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/tx/StaleLockRecoveryTest.java 2009-11-27 16:42:31 UTC (rev 1231)
@@ -0,0 +1,81 @@
+package org.infinispan.tx;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import javax.transaction.NotSupportedException;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import java.util.List;
+
+/**
+ * Tests what happens when a member acquires locks and then dies.
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "tx.StaleLockRecoveryTest")
+public class StaleLockRecoveryTest extends MultipleCacheManagersTest {
+ Cache<String, String> c1, c2;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ Configuration c = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+ c.setLockAcquisitionTimeout(500);
+ List<Cache<String, String>> caches = createClusteredCaches(2, "tx", c);
+ c1 = caches.get(0);
+ c2 = caches.get(1);
+ }
+
+ public void testStaleLock() throws SystemException, NotSupportedException {
+ c1.put("k", "v");
+ assert c1.get("k").equals("v");
+ assert c2.get("k").equals("v");
+
+ TransactionManager tm = TestingUtil.getTransactionManager(c1);
+ tm.begin();
+ c1.getAdvancedCache().lock("k");
+ tm.suspend();
+
+ // test that both c1 and c2 have locked k
+ assertLocked(c1, "k");
+ assertLocked(c2, "k");
+
+ cacheManagers.get(0).stop();
+
+ assert c2.getCacheManager().getMembers().size() == 1;
+
+ // may take a while from when the view change is seen through to when the lock is cleared
+ TestingUtil.sleepThread(1000);
+
+ assertNotLocked(c2, "k");
+ }
+
+ private void assertLocked(Cache<String, String> c, String key) throws SystemException, NotSupportedException {
+ TransactionManager tm = TestingUtil.getTransactionManager(c);
+ tm.begin();
+ try {
+ c.put(key, "dummy"); // should time out
+ assert false : "Should have been locked!";
+ } catch (Exception e) {
+
+ } finally {
+ tm.rollback();
+ }
+ }
+
+ private void assertNotLocked(Cache<String, String> c, String key) throws SystemException, NotSupportedException {
+ TransactionManager tm = TestingUtil.getTransactionManager(c);
+ tm.begin();
+ try {
+ c.put(key, "dummy"); // should time out
+ } catch (Exception e) {
+ assert false : "Should not have been locked!";
+ } finally {
+ tm.rollback();
+ }
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/tx/StaleLockRecoveryTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the infinispan-commits
mailing list