[infinispan-commits] Infinispan SVN: r2546 - in trunk/core/src: main/java/org/infinispan/interceptors and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Oct 21 09:10:49 EDT 2010


Author: manik.surtani at jboss.com
Date: 2010-10-21 09:10:48 -0400 (Thu, 21 Oct 2010)
New Revision: 2546

Added:
   trunk/core/src/test/java/org/infinispan/lock/StaleLocksTransactionTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
Log:
ISPN-711 - Locks not released when committing a transaction which contains explicit locks but no modifications

Modified: trunk/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java	2010-10-21 13:01:10 UTC (rev 2545)
+++ trunk/core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java	2010-10-21 13:10:48 UTC (rev 2546)
@@ -97,7 +97,7 @@
           * @see LockControlCommand.java 
           * https://jira.jboss.org/jira/browse/ISPN-48
           */
-         remoteTransaction.setModifications(modifications);
+         remoteTransaction.setModifications(getModifications());
       }
 
       // 2. then set it on the invocation context
@@ -121,7 +121,7 @@
    }
 
    public WriteCommand[] getModifications() {
-      return modifications;
+      return modifications == null ? new WriteCommand[]{} : modifications;
    }
 
    public boolean isOnePhaseCommit() {

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2010-10-21 13:01:10 UTC (rev 2545)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2010-10-21 13:10:48 UTC (rev 2546)
@@ -33,6 +33,7 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -55,14 +56,12 @@
 
 
    static final RecipientGenerator CLEAR_COMMAND_GENERATOR = new RecipientGenerator() {
-      private final Object[] EMPTY_ARRAY = {};
-
       public List<Address> generateRecipients() {
          return null;
       }
 
-      public Object[] getKeys() {
-         return EMPTY_ARRAY;
+      public Collection<Object> getKeys() {
+         return Collections.emptySet();
       }
    };
 
@@ -106,7 +105,9 @@
     *
     * @param ctx invocation context
     * @param key key to retrieve
+    *
     * @return value of a remote get, or null
+    *
     * @throws Throwable if there are problems
     */
    private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key, boolean dmWasRehashingDuringLocalLookup) throws Throwable {
@@ -152,6 +153,7 @@
     * Tests whether a key is in the L1 cache if L1 is enabled.
     *
     * @param key key to check for
+    *
     * @return true if the key is not in L1, or L1 caching is not enabled.  false the key is in L1.
     */
    private boolean isNotInL1(Object key) {
@@ -169,14 +171,14 @@
    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
       // don't bother with a remote get for the PutMapCommand!
       return handleWriteCommand(ctx, command,
-                                new MultipleKeysRecipientGenerator(command.getMap().keySet()), true);
+              new MultipleKeysRecipientGenerator(command.getMap().keySet()), true);
    }
 
    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
 
       return handleWriteCommand(ctx, command,
-                                new SingleKeyRecipientGenerator(command.getKey()), false);
+              new SingleKeyRecipientGenerator(command.getKey()), false);
    }
 
    @Override
@@ -187,7 +189,7 @@
    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
       return handleWriteCommand(ctx, command,
-                                new SingleKeyRecipientGenerator(command.getKey()), false);
+              new SingleKeyRecipientGenerator(command.getKey()), false);
    }
 
    @Override
@@ -201,18 +203,16 @@
             List<Address> where;
             if (toMulticast.size() == 1) {//avoid building an extra array, as most often this will be a single key
                where = toMulticast.values().iterator().next();
-               rpcManager.invokeRemotely(where, command, true, true);
             } else {
-               where = new ArrayList<Address>();
-               for (List<Address> values:  toMulticast.values()) {
-                  where.addAll(values);
-               }
-               rpcManager.invokeRemotely(where, command, true, true);
+               where = new LinkedList<Address>();
+               for (List<Address> values : toMulticast.values()) where.addAll(values);
             }
-            ((LocalTxInvocationContext)ctx).remoteLocksAcquired(where);
+            rpcManager.invokeRemotely(where, command, true, true);
+            ((LocalTxInvocationContext) ctx).remoteLocksAcquired(where);
          } else {
             rpcManager.invokeRemotely(dm.getAffectedNodes(command.getKeys()), command, true, true);
          }
+         ctx.addAffectedKeys(command.getKeys());
       }
       return invokeNextInterceptor(ctx, command);
    }
@@ -221,9 +221,9 @@
 
    @Override
    public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
-      if (ctx.isOriginLocal() && ctx.hasModifications()) {
+      if (shouldInvokeRemoteTxCommand(ctx)) {
          List<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
-         NotifyingNotifiableFuture<Object> f = flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), null);
+         NotifyingNotifiableFuture<Object> f = flushL1Cache(recipients.size(), ctx.getLockedKeys(), null);
          rpcManager.invokeRemotely(recipients, command, configuration.isSyncCommitPhase(), true);
          if (f != null) f.get();
       }
@@ -236,11 +236,11 @@
 
       boolean sync = isSynchronous(ctx);
 
-      if (ctx.isOriginLocal() && ctx.hasModifications()) {
+      if (shouldInvokeRemoteTxCommand(ctx)) {
          List<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
          NotifyingNotifiableFuture<Object> f = null;
          if (command.isOnePhaseCommit())
-            f = flushL1Cache(recipients.size(), getKeys(ctx.getModifications()), null);
+            f = flushL1Cache(recipients.size(), ctx.getLockedKeys(), null);
          // this method will return immediately if we're the only member (because exclude_self=true)
          rpcManager.invokeRemotely(recipients, command, sync);
          if (f != null) f.get();
@@ -250,7 +250,7 @@
 
    @Override
    public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
-      if (ctx.isOriginLocal())
+      if (shouldInvokeRemoteTxCommand(ctx))
          rpcManager.invokeRemotely(dm.getAffectedNodes(ctx.getAffectedKeys()), command, configuration.isSyncRollbackPhase(), true);
       return invokeNextInterceptor(ctx, command);
    }
@@ -269,19 +269,7 @@
       return !ctx.hasFlag(Flag.SKIP_REMOTE_LOOKUP) && needReliableReturnValues;
    }
 
-   private Object[] getKeys(List<WriteCommand> mods) {
-      List<Object> l = new LinkedList<Object>();
-      for (WriteCommand m : mods) {
-         if (m instanceof DataCommand) {
-            l.add(((DataCommand) m).getKey());
-         } else if (m instanceof PutMapCommand) {
-            l.addAll(((PutMapCommand) m).getMap().keySet());
-         }
-      }
-      return l.toArray(new Object[l.size()]);
-   }
-
-   private NotifyingNotifiableFuture<Object> flushL1Cache(int numCallRecipients, Object[] keys, Object retval) {
+   private NotifyingNotifiableFuture<Object> flushL1Cache(int numCallRecipients, Collection<Object> keys, Object retval) {
       if (isL1CacheEnabled && numCallRecipients > 0 && rpcManager.getTransport().getMembers().size() > numCallRecipients) {
          if (trace) log.trace("Invalidating L1 caches");
          InvalidateCommand ic = cf.buildInvalidateFromL1Command(false, keys);
@@ -344,7 +332,7 @@
    }
 
    interface KeyGenerator {
-      Object[] getKeys();
+      Collection<Object> getKeys();
    }
 
    interface RecipientGenerator extends KeyGenerator {
@@ -353,12 +341,12 @@
 
    class SingleKeyRecipientGenerator implements RecipientGenerator {
       final Object key;
-      final Object[] keysArray;
+      final Set<Object> keys;
       List<Address> recipients = null;
 
       SingleKeyRecipientGenerator(Object key) {
          this.key = key;
-         keysArray = new Object[]{key};
+         keys = Collections.singleton(key);
       }
 
       public List<Address> generateRecipients() {
@@ -366,20 +354,18 @@
          return recipients;
       }
 
-      public Object[] getKeys() {
-         return keysArray;
+      public Collection<Object> getKeys() {
+         return keys;
       }
    }
 
    class MultipleKeysRecipientGenerator implements RecipientGenerator {
 
       final Collection<Object> keys;
-      final Object[] keysArray;
       List<Address> recipients = null;
 
       MultipleKeysRecipientGenerator(Collection<Object> keys) {
          this.keys = keys;
-         keysArray = keys.toArray();
       }
 
       public List<Address> generateRecipients() {
@@ -392,8 +378,8 @@
          return recipients;
       }
 
-      public Object[] getKeys() {
-         return keysArray;
+      public Collection<Object> getKeys() {
+         return keys;
       }
    }
 }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java	2010-10-21 13:01:10 UTC (rev 2545)
+++ trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java	2010-10-21 13:10:48 UTC (rev 2546)
@@ -131,7 +131,7 @@
       Object retval = invokeNextInterceptor(ctx, command);
       if (trace) log.trace("Entering InvalidationInterceptor's prepare phase");
       // fetch the modifications before the transaction is committed (and thus removed from the txTable)
-      if (ctx.hasModifications() && ctx.isOriginLocal()) {
+      if (shouldInvokeRemoteTxCommand(ctx)) {
          List<WriteCommand> mods = Arrays.asList(command.getModifications());
          Transaction runningTransaction = ctx.getRunningTransaction();
          if (runningTransaction == null) throw new IllegalStateException("we must have an associated transaction");

Modified: trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java	2010-10-21 13:01:10 UTC (rev 2545)
+++ trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java	2010-10-21 13:10:48 UTC (rev 2546)
@@ -49,7 +49,7 @@
    @Override
    public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
       if (!ctx.isInTxScope()) throw new IllegalStateException("This should not be possible!");
-      if (ctx.isOriginLocal()) {
+      if (shouldInvokeRemoteTxCommand(ctx)) {
          rpcManager.broadcastRpcCommand(command, configuration.isSyncCommitPhase(), true);
       }
       return invokeNextInterceptor(ctx, command);
@@ -58,7 +58,7 @@
    @Override
    public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
       Object retVal = invokeNextInterceptor(ctx, command);
-      if (ctx.isOriginLocal() && command.hasModifications()) {
+      if (shouldInvokeRemoteTxCommand(ctx)) {
          boolean async = configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
          rpcManager.broadcastRpcCommand(command, !async, false);
       }
@@ -67,7 +67,7 @@
 
    @Override
    public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
-      if (ctx.isOriginLocal() && !configuration.isOnePhaseCommit()) {
+      if (shouldInvokeRemoteTxCommand(ctx) && !configuration.isOnePhaseCommit()) {
          rpcManager.broadcastRpcCommand(command, configuration.isSyncRollbackPhase(), true);
       }
       return invokeNextInterceptor(ctx, command);

Modified: trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java	2010-10-21 13:01:10 UTC (rev 2545)
+++ trunk/core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java	2010-10-21 13:10:48 UTC (rev 2546)
@@ -79,4 +79,10 @@
       }
       return false;
    }
+
+   protected final boolean shouldInvokeRemoteTxCommand(TxInvocationContext ctx) {
+      // just testing for empty modifications isn't enough - the Lock API may acquire locks on keys but won't
+      // register a Modification.  See ISPN-711.
+      return ctx.isOriginLocal() && (ctx.hasModifications() || !ctx.getLockedKeys().isEmpty());
+   }
 }
\ No newline at end of file

Copied: trunk/core/src/test/java/org/infinispan/lock/StaleLocksTransactionTest.java (from rev 2544, branches/4.2.x/core/src/test/java/org/infinispan/lock/StaleLocksTransactionTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/lock/StaleLocksTransactionTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/lock/StaleLocksTransactionTest.java	2010-10-21 13:10:48 UTC (rev 2546)
@@ -0,0 +1,62 @@
+package org.infinispan.lock;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.CleanupAfterMethod;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.concurrent.locks.LockManager;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+ at Test(testName = "lock.StaleLocksTransactionTest", groups = "functional")
+ at CleanupAfterMethod
+public class StaleLocksTransactionTest extends MultipleCacheManagersTest {
+
+   Cache<String, String> c1, c2;
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration cfg = TestCacheManagerFactory.getDefaultConfiguration(true, Configuration.CacheMode.DIST_SYNC);
+      cfg.setLockAcquisitionTimeout(100);
+      EmbeddedCacheManager cm1 = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+      EmbeddedCacheManager cm2 = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+      registerCacheManager(cm1, cm2);
+      c1 = cm1.getCache();
+      c2 = cm2.getCache();
+   }
+
+   public void testNoModsCommit() throws Exception {
+      doTest(false, true);
+   }
+
+   public void testModsRollback() throws Exception {
+      doTest(true, false);
+   }
+
+   public void testNoModsRollback() throws Exception {
+      doTest(false, false);
+   }
+
+   public void testModsCommit() throws Exception {
+      doTest(true, true);
+   }
+
+   private void doTest(boolean mods, boolean commit) throws Exception {
+      tm(c1).begin();
+      c1.getAdvancedCache().lock("k");
+      assert c1.get("k") == null;
+      if (mods) c1.put("k", "v");
+      if (commit)
+         tm(c1).commit();
+      else
+         tm(c1).rollback();
+
+      assertNotLocked(c1, "k");
+      assertNotLocked(c2, "k");
+   }
+}



More information about the infinispan-commits mailing list