[infinispan-commits] Infinispan SVN: r1231 - in trunk/core/src: main/java/org/infinispan/transaction/xa and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Nov 27 11:42:32 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-11-27 11:42:31 -0500 (Fri, 27 Nov 2009)
New Revision: 1231

Added:
   trunk/core/src/test/java/org/infinispan/tx/StaleLockRecoveryTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
   trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
Log:
[ISPN-292] ( Implement a stale lock cleanup mechanism )

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-11-27 16:29:36 UTC (rev 1230)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-11-27 16:42:31 UTC (rev 1231)
@@ -474,21 +474,6 @@
       synchronized (membersListLock) {
          boolean needNotification = false;
          if (newMembers != null) {
-
-            // TODO: Implement breaking stale locks for dead members.  This should be in the TxInterceptor or TransactionTable, with a listener on the cache manager.
-//            if (members != null) {
-            // we had a membership list before this event.  Check to make sure we haven't lost any members,
-            // and if so, determine what members have been removed
-            // and roll back any tx and break any locks
-//               List<org.jgroups.Address> removed = toJGroupsAddressList(members);
-//               removed.removeAll(newMembers);
-//               spi.createInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
-//                  if (root != null)
-//                  {
-            //removeLocksForDeadMembers(root.getDelegationTarget(), removed);
-//                  }
-//            }
-
             oldMembers = members;
             // we need a defensive copy anyway            
             members = fromJGroupsAddressList(newMembers);

Modified: trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java	2009-11-27 16:29:36 UTC (rev 1230)
+++ trunk/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java	2009-11-27 16:42:31 UTC (rev 1231)
@@ -2,13 +2,20 @@
 
 import org.infinispan.CacheException;
 import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.tx.RollbackCommand;
 import org.infinispan.commands.write.WriteCommand;
 import org.infinispan.config.Configuration;
 import org.infinispan.context.InvocationContext;
 import org.infinispan.context.InvocationContextContainer;
 import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Stop;
 import org.infinispan.interceptors.InterceptorChain;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.notifications.Listener;
 import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.infinispan.remoting.MembershipArithmetic;
 import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.logging.Log;
@@ -16,7 +23,12 @@
 
 import javax.transaction.Transaction;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Repository for {@link org.infinispan.transaction.xa.RemoteTransaction} and {@link
@@ -41,11 +53,12 @@
    private CacheNotifier notifier;
    private RpcManager rpcManager;
    private GlobalTransactionFactory gtf;
+   private ExecutorService lockBreakingService = Executors.newFixedThreadPool(1);
 
-
    @Inject
    public void initialize(CommandsFactory commandsFactory, RpcManager rpcManager, Configuration configuration,
-                          InvocationContextContainer icc, InterceptorChain invoker, CacheNotifier notifier, GlobalTransactionFactory gtf) {
+                          InvocationContextContainer icc, InterceptorChain invoker, CacheNotifier notifier,
+                          GlobalTransactionFactory gtf, CacheManager cm) {
       this.commandsFactory = commandsFactory;
       this.rpcManager = rpcManager;
       this.configuration = configuration;
@@ -53,9 +66,49 @@
       this.invoker = invoker;
       this.notifier = notifier;
       this.gtf = gtf;
+      cm.addListener(new StaleTransactionCleanup());
    }
 
+   @Stop
+   private void stop() {
+      lockBreakingService.shutdownNow();
+   }
 
+   @Listener
+   public class StaleTransactionCleanup {
+      @ViewChanged
+      public void onViewChange(ViewChangedEvent vce) {
+         final List<Address> leavers = MembershipArithmetic.getMembersLeft(vce.getOldMembers(), vce.getNewMembers());
+         if (!leavers.isEmpty()) {
+            if (trace) log.trace("Saw {0} leavers - kicking off a lock breaking task", leavers.size());
+            lockBreakingService.submit(new Runnable() {
+               public void run() {
+                  Set<GlobalTransaction> toKill = new HashSet<GlobalTransaction>();
+                  for (GlobalTransaction gt: remoteTransactions.keySet()) {
+                     if (leavers.contains(gt.getAddress())) toKill.add(gt);
+                  }
+
+                  if (trace) log.trace("Global transactions {0} pertain to leavers list {1} and need to be killed", toKill, leavers);
+
+                  for (GlobalTransaction gtx: toKill) {
+                     if (trace) log.trace("Killing {0}", gtx);
+                     RollbackCommand rc = new RollbackCommand(gtx);
+                     rc.init(invoker, icc, TransactionTable.this);
+                     try {
+                        rc.perform(null);
+                     } catch (Throwable e) {
+                        log.warn("Unable to roll back gtx " + gtx, e);
+                     } finally {
+                        removeRemoteTransaction(gtx);
+                     }
+                  }
+               }
+            });
+         }
+      }
+   }
+
+
    /**
     * Returns the {@link org.infinispan.transaction.xa.RemoteTransaction} associated with the supplied transaction id.
     * Returns null if no such association exists.
@@ -159,4 +212,6 @@
    public boolean containRemoteTx(GlobalTransaction globalTransaction) {
       return remoteTransactions.containsKey(globalTransaction);
    }
+
+
 }

Added: trunk/core/src/test/java/org/infinispan/tx/StaleLockRecoveryTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/tx/StaleLockRecoveryTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/tx/StaleLockRecoveryTest.java	2009-11-27 16:42:31 UTC (rev 1231)
@@ -0,0 +1,81 @@
+package org.infinispan.tx;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import javax.transaction.NotSupportedException;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+import java.util.List;
+
+/**
+ * Tests what happens when a member acquires locks and then dies.
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "tx.StaleLockRecoveryTest")
+public class StaleLockRecoveryTest extends MultipleCacheManagersTest {
+   Cache<String, String> c1, c2;
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration c = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+      c.setLockAcquisitionTimeout(500);
+      List<Cache<String, String>> caches = createClusteredCaches(2, "tx", c);
+      c1 = caches.get(0);
+      c2 = caches.get(1);
+   }
+
+   public void testStaleLock() throws SystemException, NotSupportedException {
+      c1.put("k", "v");
+      assert c1.get("k").equals("v");
+      assert c2.get("k").equals("v");
+
+      TransactionManager tm = TestingUtil.getTransactionManager(c1);
+      tm.begin();
+      c1.getAdvancedCache().lock("k");
+      tm.suspend();
+
+      // test that both c1 and c2 have locked k
+      assertLocked(c1, "k");
+      assertLocked(c2, "k");
+
+      cacheManagers.get(0).stop();
+
+      assert c2.getCacheManager().getMembers().size() == 1;
+
+      // may take a while from when the view change is seen through to when the lock is cleared
+      TestingUtil.sleepThread(1000);
+
+      assertNotLocked(c2, "k");
+   }
+
+   private void assertLocked(Cache<String, String> c, String key) throws SystemException, NotSupportedException {
+      TransactionManager tm = TestingUtil.getTransactionManager(c);
+      tm.begin();
+      try {
+         c.put(key, "dummy"); // should time out
+         assert false : "Should have been locked!";
+      } catch (Exception e) {
+
+      } finally {
+         tm.rollback();
+      }
+   }
+
+   private void assertNotLocked(Cache<String, String> c, String key) throws SystemException, NotSupportedException {
+      TransactionManager tm = TestingUtil.getTransactionManager(c);
+      tm.begin();
+      try {
+         c.put(key, "dummy"); // should time out
+      } catch (Exception e) {
+         assert false : "Should not have been locked!";
+      } finally {
+         tm.rollback();
+      }
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/tx/StaleLockRecoveryTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF



More information about the infinispan-commits mailing list