[infinispan-commits] Infinispan SVN: r2618 - in trunk/core/src: main/java/org/infinispan/context/impl and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Oct 27 12:38:06 EDT 2010


Author: manik.surtani at jboss.com
Date: 2010-10-27 12:38:05 -0400 (Wed, 27 Oct 2010)
New Revision: 2618

Added:
   trunk/core/src/test/java/org/infinispan/lock/APIDistTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/container/entries/AbstractInternalCacheEntry.java
   trunk/core/src/main/java/org/infinispan/container/entries/CacheEntry.java
   trunk/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java
   trunk/core/src/main/java/org/infinispan/container/entries/NullMarkerEntryForRemoval.java
   trunk/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java
   trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java
   trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
   trunk/core/src/main/java/org/infinispan/util/Immutables.java
   trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java
Log:
ISPN-738 - Get operation doesn't work with explict locking under distribution mode (merged from branch 4.2.x, r2617)

Modified: trunk/core/src/main/java/org/infinispan/container/entries/AbstractInternalCacheEntry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/AbstractInternalCacheEntry.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/container/entries/AbstractInternalCacheEntry.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -59,6 +59,10 @@
       return false;
    }
 
+   public final boolean isLockPlaceholder() {
+      return false;
+   }
+
    public void setMaxIdle(long maxIdle) {
    }
 

Modified: trunk/core/src/main/java/org/infinispan/container/entries/CacheEntry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/CacheEntry.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/container/entries/CacheEntry.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -103,4 +103,9 @@
    void setRemoved(boolean removed);
 
    void setValid(boolean valid);
+
+   /**
+    * @return true if this entry is a placeholder for the sake of acquiring a lock; and false if it is a real entry. 
+    */
+   boolean isLockPlaceholder();
 }

Modified: trunk/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -18,4 +18,10 @@
     *                       to {@link org.infinispan.util.concurrent.IsolationLevel#REPEATABLE_READ}.
     */
    void copyForUpdate(DataContainer container, boolean writeSkewCheck);
+
+   /**
+    * Marks an entry as a lock placeholder
+    * @param placeholder if true, the entry is marked as a lock placeholder.  If false, the entry is un-marked as a placeholder.
+    */
+   void setLockPlaceholder(boolean placeholder);
 }

Modified: trunk/core/src/main/java/org/infinispan/container/entries/NullMarkerEntryForRemoval.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/NullMarkerEntryForRemoval.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/container/entries/NullMarkerEntryForRemoval.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -38,4 +38,14 @@
    public final boolean isValid() {
       return false;
    }
+
+   @Override
+   public boolean isLockPlaceholder() {
+      return false;
+   }
+
+   @Override
+   public void setLockPlaceholder(boolean placeholder) {
+      // a no-op
+   }
 }
\ No newline at end of file

Modified: trunk/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -27,6 +27,7 @@
 import org.infinispan.util.logging.LogFactory;
 
 import static org.infinispan.container.entries.ReadCommittedEntry.Flags.*;
+import static org.infinispan.container.entries.ReadCommittedEntry.Flags.LOCK_PLACEHOLDER;
 
 /**
  * A wrapper around a cached entry that encapsulates read committed semantics when writes are initiated, committed or
@@ -62,7 +63,8 @@
       CHANGED(1), // same as 1 << 0
       CREATED(1 << 1),
       REMOVED(1 << 2),
-      VALID(1 << 3);
+      VALID(1 << 3),
+      LOCK_PLACEHOLDER(1 << 4);      
 
       final byte mask;
 
@@ -143,6 +145,14 @@
       if (!isCreated()) oldValue = value;
    }
 
+   @Override
+   public void setLockPlaceholder(boolean placeholder) {
+      if (placeholder)
+         setFlag(LOCK_PLACEHOLDER);
+      else
+         unsetFlag(LOCK_PLACEHOLDER);
+   }
+
    @SuppressWarnings("unchecked")
    public final void commit(DataContainer container) {
       // only do stuff if there are changes.
@@ -195,6 +205,11 @@
          unsetFlag(VALID);
    }
 
+   @Override
+   public boolean isLockPlaceholder() {
+      return isFlagSet(LOCK_PLACEHOLDER);
+   }
+
    public final boolean isCreated() {
       return isFlagSet(CREATED);
    }

Modified: trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -6,6 +6,7 @@
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -28,11 +29,13 @@
       return affectedKeys == null ? Collections.emptySet() : affectedKeys;
    }
 
-   public void addAffectedKeys(Object... keys) {
-      if (affectedKeys == null) {
-         affectedKeys = new HashSet<Object>();
+   public void addAffectedKeys(Collection<Object> keys) {
+      if (keys != null && !keys.isEmpty()) {
+         if (affectedKeys == null) {
+            affectedKeys = new HashSet<Object>();
+         }
+         affectedKeys.addAll(keys);
       }
-      affectedKeys.addAll(Arrays.asList(keys));
    }
 
    @Override

Modified: trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -5,6 +5,7 @@
 import org.infinispan.transaction.xa.GlobalTransaction;
 
 import javax.transaction.Transaction;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
@@ -48,5 +49,5 @@
    /**
     * Registers a new participant with the transaction.
     */
-   void addAffectedKeys(Object... keys);
+   void addAffectedKeys(Collection<Object> keys);
 }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -16,7 +16,9 @@
 import org.infinispan.commands.write.WriteCommand;
 import org.infinispan.container.DataContainer;
 import org.infinispan.container.EntryFactory;
+import org.infinispan.container.entries.CacheEntry;
 import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.MVCCEntry;
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
 import org.infinispan.context.impl.LocalTxInvocationContext;
@@ -91,11 +93,17 @@
       Object returnValue = invokeNextInterceptor(ctx, command);
       // need to check in the context as well since a null retval is not necessarily an indication of the entry not being
       // available.  It could just have been removed in the same tx beforehand.
-      if (!ctx.hasFlag(Flag.SKIP_REMOTE_LOOKUP) && returnValue == null && ctx.lookupEntry(command.getKey()) == null)
+      if (needsRemoteGet(ctx, command.getKey(), returnValue == null))
          returnValue = remoteGetAndStoreInL1(ctx, command.getKey(), isStillRehashingOnJoin);
       return returnValue;
    }
 
+   private boolean needsRemoteGet(InvocationContext ctx, Object key, boolean retvalCheck) {
+      CacheEntry entry;
+      return retvalCheck && !ctx.hasFlag(Flag.SKIP_REMOTE_LOOKUP) && ((entry = ctx.lookupEntry(key)) == null || entry.isLockPlaceholder());
+   }
+
+
    /**
     * This method retrieves an entry from a remote cache and optionally stores it in L1 (if L1 is enabled).
     * <p/>
@@ -135,12 +143,22 @@
       InternalCacheEntry ice = dm.retrieveFromRemoteSource(key);
 
       if (ice != null) {
-         if (storeInL1 && isL1CacheEnabled) {
-            if (trace) log.trace("Caching remotely retrieved entry for key {0} in L1", key);
-            long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
-            PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
-            entryFactory.wrapEntryForWriting(ctx, key, true, false, ctx.hasLockedKey(key), false, false);
-            invokeNextInterceptor(ctx, put);
+         if (storeInL1) {
+            if (isL1CacheEnabled) {
+               if (trace) log.trace("Caching remotely retrieved entry for key {0} in L1", key);
+               long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
+               PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
+               entryFactory.wrapEntryForWriting(ctx, key, true, false, ctx.hasLockedKey(key), false, false);
+               invokeNextInterceptor(ctx, put);
+            } else {
+               CacheEntry ce = ctx.lookupEntry(key);
+               if (ce == null || ce.isNull() || ce.isLockPlaceholder()) {
+                  if (ce != null && ce.isChanged())
+                     ce.setValue(ice.getValue());
+                  else
+                     ctx.putLookedUpEntry(key, ice);
+               }
+            }
          } else {
             if (trace) log.trace("Not caching remotely retrieved entry for key {0} in L1", key);
          }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -37,8 +37,10 @@
 import org.infinispan.container.EntryFactory;
 import org.infinispan.container.entries.CacheEntry;
 import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.MVCCEntry;
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.LocalTxInvocationContext;
 import org.infinispan.context.impl.TxInvocationContext;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
@@ -157,7 +159,7 @@
          if (goRemoteFirst) {
             Object result = invokeNextInterceptor(ctx, c);
             try {
-               lockKeysForRemoteTx(ctx, c);
+               lockKeysForLockCommand(ctx, c);
                result = true;
             } catch (Throwable e) {
                result = false;
@@ -168,7 +170,7 @@
             }
             return result;
          } else {
-            lockKeysForRemoteTx(ctx, c);
+            lockKeysForLockCommand(ctx, c);
             if (shouldInvokeOnCluster || c.isExplicit()) {
                invokeNextInterceptor(ctx, c);
                return true;
@@ -188,9 +190,13 @@
       }
    }
 
-   private void lockKeysForRemoteTx(TxInvocationContext ctx, LockControlCommand c) throws InterruptedException {
+   private void lockKeysForLockCommand(TxInvocationContext ctx, LockControlCommand c) throws InterruptedException {
       for (Object key : c.getKeys()) {
-         entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
+         MVCCEntry e = entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
+         if (e.isCreated()) {
+            // mark as temporary entry just for the sake of a lock command
+            e.setLockPlaceholder(true);
+         }
       }
    }
 
@@ -302,7 +308,7 @@
                // should never resize.
                List<Object> keysToRemove = new ArrayList<Object>(lookedUpEntries.size());
                for (Map.Entry<Object, CacheEntry> e : lookedUpEntries.entrySet()) {
-                  if (!lockManager.possiblyLocked(e.getValue())) keysToRemove.add(e.getKey());
+                  if (!lockManager.possiblyLocked(e.getValue()) && !possiblyLockedInContext(ctx, e.getKey())) keysToRemove.add(e.getKey());
                }
 
                if (!keysToRemove.isEmpty()) {
@@ -316,6 +322,12 @@
       }
    }
 
+   private boolean possiblyLockedInContext(InvocationContext ctx, Object key) {
+      if (ctx instanceof LocalTxInvocationContext) {
+         return ((LocalTxInvocationContext) ctx).getAffectedKeys().contains(key);
+      } else return false;
+   }
+
    private void cleanupLocks(InvocationContext ctx, boolean commit) {
       if (commit) {
          Object owner = ctx.getLockOwner();

Modified: trunk/core/src/main/java/org/infinispan/util/Immutables.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/Immutables.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/util/Immutables.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -556,6 +556,11 @@
          throw new UnsupportedOperationException();
       }
 
+      @Override
+      public boolean isLockPlaceholder() {
+         return entry.isLockPlaceholder();
+      }
+
       public InternalCacheEntry clone() {
          return new ImmutableInternalCacheEntry(entry.clone());
       }

Modified: trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java	2010-10-27 16:35:00 UTC (rev 2617)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -145,7 +145,7 @@
    }
 
    public final boolean possiblyLocked(CacheEntry entry) {
-      return entry == null || entry.isChanged() || entry.isNull();
+      return entry == null || entry.isChanged() || entry.isNull() || entry.isLockPlaceholder();
    }
 
    public void releaseLocks(InvocationContext ctx) {

Copied: trunk/core/src/test/java/org/infinispan/lock/APIDistTest.java (from rev 2617, branches/4.2.x/core/src/test/java/org/infinispan/lock/APIDistTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/lock/APIDistTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/lock/APIDistTest.java	2010-10-27 16:38:05 UTC (rev 2618)
@@ -0,0 +1,136 @@
+package org.infinispan.lock;
+
+import org.infinispan.Cache;
+import org.infinispan.affinity.KeyAffinityService;
+import org.infinispan.affinity.KeyAffinityServiceFactory;
+import org.infinispan.affinity.KeyAffinityServiceImpl;
+import org.infinispan.affinity.KeyGenerator;
+import org.infinispan.config.Configuration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.fwk.CleanupAfterMethod;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.WithinThreadExecutor;
+import org.testng.annotations.Test;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.Executors;
+
+import static org.infinispan.context.Flag.FAIL_SILENTLY;
+
+
+ at Test(testName = "lock.APITest", groups = "functional")
+ at CleanupAfterMethod
+public class APIDistTest extends MultipleCacheManagersTest {
+   EmbeddedCacheManager cm1, cm2;
+   String key; // guaranteed to be mapped to cache2
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration cfg = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC, true);
+      cfg.setL1CacheEnabled(false); // no L1 enabled
+      cfg.setLockAcquisitionTimeout(100);
+      cfg.setNumOwners(1);
+      cfg.setSyncCommitPhase(true);
+      cfg.setSyncRollbackPhase(true);
+      cm1 = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+      cm2 = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+      registerCacheManager(cm1, cm2);
+      cm1.getCache();
+      Cache<String, String> c = cm2.getCache();
+
+      // lets generate a key such that it is always mapped to cache2.
+      KeyAffinityService<String> service = KeyAffinityServiceFactory.newKeyAffinityService(c,
+              Executors.newCachedThreadPool(), new KeyGenerator<String>() {
+                 final Random r = new Random();
+
+                 @Override
+                 public String getKey() {
+                    return Integer.toHexString(r.nextInt(2000));
+                 }
+              }, 2);
+
+      key = service.getKeyForAddress(c.getAdvancedCache().getRpcManager().getAddress());
+   }
+
+   public void testLockAndGet() throws SystemException, NotSupportedException {
+      Cache<String, String> cache1 = cache(0), cache2 = cache(1);
+
+      cache1.put(key, "v");
+
+      assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+      assert "v".equals(cache2.get(key)) : "Could not find key " + key + " on cache2";
+
+      tm(0).begin();
+      cache1.getAdvancedCache().lock(key);
+      assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+      tm(0).rollback();
+   }
+
+   public void testLockAndGetAndPut() throws SystemException, NotSupportedException, RollbackException, HeuristicRollbackException, HeuristicMixedException, InterruptedException {
+      Cache<String, String> cache1 = cache(0), cache2 = cache(1);
+
+      cache1.put(key, "v");
+
+      assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+      assert "v".equals(cache2.get(key)) : "Could not find key " + key + " on cache2";
+
+      tm(0).begin();
+      cache1.getAdvancedCache().lock(key);
+      assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+      String old = cache1.put(key, "new_value");
+      assert "v".equals(old) : "Expected v, was " + old;
+      tm(0).commit();
+
+      String val;
+      assert "new_value".equals(val = cache1.get(key)) : "Could not find key " + key + " on cache1: expected new_value, was " + val;
+      assert "new_value".equals(val = cache2.get(key)) : "Could not find key " + key + " on cache2: expected new_value, was " + val;
+   }
+
+   public void testLockAndPutRetval() throws SystemException, NotSupportedException, RollbackException, HeuristicRollbackException, HeuristicMixedException, InterruptedException {
+      Cache<String, String> cache1 = cache(0), cache2 = cache(1);
+
+      cache1.put(key, "v");
+
+      assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+      assert "v".equals(cache2.get(key)) : "Could not find key " + key + " on cache2";
+
+      tm(0).begin();
+      cache1.getAdvancedCache().lock(key);
+      String old = cache1.put(key, "new_value");
+      assert "v".equals(old) : "Expected v, was " + old;
+      tm(0).commit();
+
+      String val;
+      assert "new_value".equals(val = cache1.get(key)) : "Could not find key " + key + " on cache1: expected new_value, was " + val;
+      assert "new_value".equals(val = cache2.get(key)) : "Could not find key " + key + " on cache2: expected new_value, was " + val;
+   }
+
+   public void testLockAndRemoveRetval() throws SystemException, NotSupportedException, RollbackException, HeuristicRollbackException, HeuristicMixedException, InterruptedException {
+      Cache<String, String> cache1 = cache(0), cache2 = cache(1);
+
+      cache1.put(key, "v");
+
+      assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+      assert "v".equals(cache2.get(key)) : "Could not find key " + key + " on cache2";
+
+      tm(0).begin();
+      cache1.getAdvancedCache().lock(key);
+      String old = cache1.remove(key);
+      assert "v".equals(old) : "Expected v, was " + old;
+      tm(0).commit();
+
+      String val;
+      assert (null == (val = cache1.get(key))) : "Could not find key " + key + " on cache1: expected null, was " + val;
+      assert (null == (val = cache2.get(key))) : "Could not find key " + key + " on cache2: expected null, was " + val;
+   }
+
+}



More information about the infinispan-commits mailing list