[infinispan-commits] Infinispan SVN: r2617 - in branches/4.2.x/core/src: main/java/org/infinispan/context/impl and 4 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Oct 27 12:35:02 EDT 2010
Author: manik.surtani at jboss.com
Date: 2010-10-27 12:35:00 -0400 (Wed, 27 Oct 2010)
New Revision: 2617
Added:
branches/4.2.x/core/src/test/java/org/infinispan/lock/APIDistTest.java
Modified:
branches/4.2.x/core/src/main/java/org/infinispan/container/entries/AbstractInternalCacheEntry.java
branches/4.2.x/core/src/main/java/org/infinispan/container/entries/CacheEntry.java
branches/4.2.x/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java
branches/4.2.x/core/src/main/java/org/infinispan/container/entries/NullMarkerEntryForRemoval.java
branches/4.2.x/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java
branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java
branches/4.2.x/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
branches/4.2.x/core/src/main/java/org/infinispan/util/Immutables.java
branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java
Log:
ISPN-738 - Get operation doesn't work with explict locking under distribution mode
Modified: branches/4.2.x/core/src/main/java/org/infinispan/container/entries/AbstractInternalCacheEntry.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/container/entries/AbstractInternalCacheEntry.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/container/entries/AbstractInternalCacheEntry.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -59,6 +59,10 @@
return false;
}
+ public final boolean isLockPlaceholder() {
+ return false;
+ }
+
public void setMaxIdle(long maxIdle) {
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/container/entries/CacheEntry.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/container/entries/CacheEntry.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/container/entries/CacheEntry.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -103,4 +103,9 @@
void setRemoved(boolean removed);
void setValid(boolean valid);
+
+ /**
+ * @return true if this entry is a placeholder for the sake of acquiring a lock; and false if it is a real entry.
+ */
+ boolean isLockPlaceholder();
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/container/entries/MVCCEntry.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -18,4 +18,10 @@
* to {@link org.infinispan.util.concurrent.IsolationLevel#REPEATABLE_READ}.
*/
void copyForUpdate(DataContainer container, boolean writeSkewCheck);
+
+ /**
+ * Marks an entry as a lock placeholder
+ * @param placeholder if true, the entry is marked as a lock placeholder. If false, the entry is un-marked as a placeholder.
+ */
+ void setLockPlaceholder(boolean placeholder);
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/container/entries/NullMarkerEntryForRemoval.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/container/entries/NullMarkerEntryForRemoval.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/container/entries/NullMarkerEntryForRemoval.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -38,4 +38,14 @@
public final boolean isValid() {
return false;
}
+
+ @Override
+ public boolean isLockPlaceholder() {
+ return false;
+ }
+
+ @Override
+ public void setLockPlaceholder(boolean placeholder) {
+ // a no-op
+ }
}
\ No newline at end of file
Modified: branches/4.2.x/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/container/entries/ReadCommittedEntry.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -27,6 +27,7 @@
import org.infinispan.util.logging.LogFactory;
import static org.infinispan.container.entries.ReadCommittedEntry.Flags.*;
+import static org.infinispan.container.entries.ReadCommittedEntry.Flags.LOCK_PLACEHOLDER;
/**
* A wrapper around a cached entry that encapsulates read committed semantics when writes are initiated, committed or
@@ -62,7 +63,8 @@
CHANGED(1), // same as 1 << 0
CREATED(1 << 1),
REMOVED(1 << 2),
- VALID(1 << 3);
+ VALID(1 << 3),
+ LOCK_PLACEHOLDER(1 << 4);
final byte mask;
@@ -143,6 +145,14 @@
if (!isCreated()) oldValue = value;
}
+ @Override
+ public void setLockPlaceholder(boolean placeholder) {
+ if (placeholder)
+ setFlag(LOCK_PLACEHOLDER);
+ else
+ unsetFlag(LOCK_PLACEHOLDER);
+ }
+
@SuppressWarnings("unchecked")
public final void commit(DataContainer container) {
// only do stuff if there are changes.
@@ -195,6 +205,11 @@
unsetFlag(VALID);
}
+ @Override
+ public boolean isLockPlaceholder() {
+ return isFlagSet(LOCK_PLACEHOLDER);
+ }
+
public final boolean isCreated() {
return isFlagSet(CREATED);
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/impl/AbstractTxInvocationContext.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -6,6 +6,7 @@
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -28,11 +29,13 @@
return affectedKeys == null ? Collections.emptySet() : affectedKeys;
}
- public void addAffectedKeys(Object... keys) {
- if (affectedKeys == null) {
- affectedKeys = new HashSet<Object>();
+ public void addAffectedKeys(Collection<Object> keys) {
+ if (keys != null && !keys.isEmpty()) {
+ if (affectedKeys == null) {
+ affectedKeys = new HashSet<Object>();
+ }
+ affectedKeys.addAll(keys);
}
- affectedKeys.addAll(Arrays.asList(keys));
}
@Override
Modified: branches/4.2.x/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/context/impl/TxInvocationContext.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -5,6 +5,7 @@
import org.infinispan.transaction.xa.GlobalTransaction;
import javax.transaction.Transaction;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -48,5 +49,5 @@
/**
* Registers a new participant with the transaction.
*/
- void addAffectedKeys(Object... keys);
+ void addAffectedKeys(Collection<Object> keys);
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -16,7 +16,9 @@
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.DataContainer;
import org.infinispan.container.EntryFactory;
+import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.LocalTxInvocationContext;
@@ -91,11 +93,17 @@
Object returnValue = invokeNextInterceptor(ctx, command);
// need to check in the context as well since a null retval is not necessarily an indication of the entry not being
// available. It could just have been removed in the same tx beforehand.
- if (!ctx.hasFlag(Flag.SKIP_REMOTE_LOOKUP) && returnValue == null && ctx.lookupEntry(command.getKey()) == null)
+ if (needsRemoteGet(ctx, command.getKey(), returnValue == null))
returnValue = remoteGetAndStoreInL1(ctx, command.getKey(), isStillRehashingOnJoin);
return returnValue;
}
+ private boolean needsRemoteGet(InvocationContext ctx, Object key, boolean retvalCheck) {
+ CacheEntry entry;
+ return retvalCheck && !ctx.hasFlag(Flag.SKIP_REMOTE_LOOKUP) && ((entry = ctx.lookupEntry(key)) == null || entry.isLockPlaceholder());
+ }
+
+
/**
* This method retrieves an entry from a remote cache and optionally stores it in L1 (if L1 is enabled).
* <p/>
@@ -135,12 +143,22 @@
InternalCacheEntry ice = dm.retrieveFromRemoteSource(key);
if (ice != null) {
- if (storeInL1 && isL1CacheEnabled) {
- if (trace) log.trace("Caching remotely retrieved entry for key {0} in L1", key);
- long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
- PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
- entryFactory.wrapEntryForWriting(ctx, key, true, false, ctx.hasLockedKey(key), false, false);
- invokeNextInterceptor(ctx, put);
+ if (storeInL1) {
+ if (isL1CacheEnabled) {
+ if (trace) log.trace("Caching remotely retrieved entry for key {0} in L1", key);
+ long lifespan = ice.getLifespan() < 0 ? configuration.getL1Lifespan() : Math.min(ice.getLifespan(), configuration.getL1Lifespan());
+ PutKeyValueCommand put = cf.buildPutKeyValueCommand(ice.getKey(), ice.getValue(), lifespan, -1);
+ entryFactory.wrapEntryForWriting(ctx, key, true, false, ctx.hasLockedKey(key), false, false);
+ invokeNextInterceptor(ctx, put);
+ } else {
+ CacheEntry ce = ctx.lookupEntry(key);
+ if (ce == null || ce.isNull() || ce.isLockPlaceholder()) {
+ if (ce != null && ce.isChanged())
+ ce.setValue(ice.getValue());
+ else
+ ctx.putLookedUpEntry(key, ice);
+ }
+ }
} else {
if (trace) log.trace("Not caching remotely retrieved entry for key {0} in L1", key);
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/interceptors/LockingInterceptor.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -37,8 +37,10 @@
import org.infinispan.container.EntryFactory;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
@@ -157,7 +159,7 @@
if (goRemoteFirst) {
Object result = invokeNextInterceptor(ctx, c);
try {
- lockKeysForRemoteTx(ctx, c);
+ lockKeysForLockCommand(ctx, c);
result = true;
} catch (Throwable e) {
result = false;
@@ -168,7 +170,7 @@
}
return result;
} else {
- lockKeysForRemoteTx(ctx, c);
+ lockKeysForLockCommand(ctx, c);
if (shouldInvokeOnCluster || c.isExplicit()) {
invokeNextInterceptor(ctx, c);
return true;
@@ -188,9 +190,13 @@
}
}
- private void lockKeysForRemoteTx(TxInvocationContext ctx, LockControlCommand c) throws InterruptedException {
+ private void lockKeysForLockCommand(TxInvocationContext ctx, LockControlCommand c) throws InterruptedException {
for (Object key : c.getKeys()) {
- entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
+ MVCCEntry e = entryFactory.wrapEntryForWriting(ctx, key, true, false, false, false, false);
+ if (e.isCreated()) {
+ // mark as temporary entry just for the sake of a lock command
+ e.setLockPlaceholder(true);
+ }
}
}
@@ -302,7 +308,7 @@
// should never resize.
List<Object> keysToRemove = new ArrayList<Object>(lookedUpEntries.size());
for (Map.Entry<Object, CacheEntry> e : lookedUpEntries.entrySet()) {
- if (!lockManager.possiblyLocked(e.getValue())) keysToRemove.add(e.getKey());
+ if (!lockManager.possiblyLocked(e.getValue()) && !possiblyLockedInContext(ctx, e.getKey())) keysToRemove.add(e.getKey());
}
if (!keysToRemove.isEmpty()) {
@@ -316,6 +322,12 @@
}
}
+ private boolean possiblyLockedInContext(InvocationContext ctx, Object key) {
+ if (ctx instanceof LocalTxInvocationContext) {
+ return ((LocalTxInvocationContext) ctx).getAffectedKeys().contains(key);
+ } else return false;
+ }
+
private void cleanupLocks(InvocationContext ctx, boolean commit) {
if (commit) {
Object owner = ctx.getLockOwner();
Modified: branches/4.2.x/core/src/main/java/org/infinispan/util/Immutables.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/util/Immutables.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/util/Immutables.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -556,6 +556,11 @@
throw new UnsupportedOperationException();
}
+ @Override
+ public boolean isLockPlaceholder() {
+ return entry.isLockPlaceholder();
+ }
+
public InternalCacheEntry clone() {
return new ImmutableInternalCacheEntry(entry.clone());
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java 2010-10-27 15:08:45 UTC (rev 2616)
+++ branches/4.2.x/core/src/main/java/org/infinispan/util/concurrent/locks/LockManagerImpl.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -145,7 +145,7 @@
}
public final boolean possiblyLocked(CacheEntry entry) {
- return entry == null || entry.isChanged() || entry.isNull();
+ return entry == null || entry.isChanged() || entry.isNull() || entry.isLockPlaceholder();
}
public void releaseLocks(InvocationContext ctx) {
Copied: branches/4.2.x/core/src/test/java/org/infinispan/lock/APIDistTest.java (from rev 2615, branches/4.2.x/core/src/test/java/org/infinispan/lock/APITest.java)
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/lock/APIDistTest.java (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/lock/APIDistTest.java 2010-10-27 16:35:00 UTC (rev 2617)
@@ -0,0 +1,136 @@
+package org.infinispan.lock;
+
+import org.infinispan.Cache;
+import org.infinispan.affinity.KeyAffinityService;
+import org.infinispan.affinity.KeyAffinityServiceFactory;
+import org.infinispan.affinity.KeyAffinityServiceImpl;
+import org.infinispan.affinity.KeyGenerator;
+import org.infinispan.config.Configuration;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.fwk.CleanupAfterMethod;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.WithinThreadExecutor;
+import org.testng.annotations.Test;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.Executors;
+
+import static org.infinispan.context.Flag.FAIL_SILENTLY;
+
+
+ at Test(testName = "lock.APITest", groups = "functional")
+ at CleanupAfterMethod
+public class APIDistTest extends MultipleCacheManagersTest {
+ EmbeddedCacheManager cm1, cm2;
+ String key; // guaranteed to be mapped to cache2
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ Configuration cfg = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC, true);
+ cfg.setL1CacheEnabled(false); // no L1 enabled
+ cfg.setLockAcquisitionTimeout(100);
+ cfg.setNumOwners(1);
+ cfg.setSyncCommitPhase(true);
+ cfg.setSyncRollbackPhase(true);
+ cm1 = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+ cm2 = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+ registerCacheManager(cm1, cm2);
+ cm1.getCache();
+ Cache<String, String> c = cm2.getCache();
+
+ // lets generate a key such that it is always mapped to cache2.
+ KeyAffinityService<String> service = KeyAffinityServiceFactory.newKeyAffinityService(c,
+ Executors.newCachedThreadPool(), new KeyGenerator<String>() {
+ final Random r = new Random();
+
+ @Override
+ public String getKey() {
+ return Integer.toHexString(r.nextInt(2000));
+ }
+ }, 2);
+
+ key = service.getKeyForAddress(c.getAdvancedCache().getRpcManager().getAddress());
+ }
+
+ public void testLockAndGet() throws SystemException, NotSupportedException {
+ Cache<String, String> cache1 = cache(0), cache2 = cache(1);
+
+ cache1.put(key, "v");
+
+ assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+ assert "v".equals(cache2.get(key)) : "Could not find key " + key + " on cache2";
+
+ tm(0).begin();
+ cache1.getAdvancedCache().lock(key);
+ assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+ tm(0).rollback();
+ }
+
+ public void testLockAndGetAndPut() throws SystemException, NotSupportedException, RollbackException, HeuristicRollbackException, HeuristicMixedException, InterruptedException {
+ Cache<String, String> cache1 = cache(0), cache2 = cache(1);
+
+ cache1.put(key, "v");
+
+ assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+ assert "v".equals(cache2.get(key)) : "Could not find key " + key + " on cache2";
+
+ tm(0).begin();
+ cache1.getAdvancedCache().lock(key);
+ assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+ String old = cache1.put(key, "new_value");
+ assert "v".equals(old) : "Expected v, was " + old;
+ tm(0).commit();
+
+ String val;
+ assert "new_value".equals(val = cache1.get(key)) : "Could not find key " + key + " on cache1: expected new_value, was " + val;
+ assert "new_value".equals(val = cache2.get(key)) : "Could not find key " + key + " on cache2: expected new_value, was " + val;
+ }
+
+ public void testLockAndPutRetval() throws SystemException, NotSupportedException, RollbackException, HeuristicRollbackException, HeuristicMixedException, InterruptedException {
+ Cache<String, String> cache1 = cache(0), cache2 = cache(1);
+
+ cache1.put(key, "v");
+
+ assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+ assert "v".equals(cache2.get(key)) : "Could not find key " + key + " on cache2";
+
+ tm(0).begin();
+ cache1.getAdvancedCache().lock(key);
+ String old = cache1.put(key, "new_value");
+ assert "v".equals(old) : "Expected v, was " + old;
+ tm(0).commit();
+
+ String val;
+ assert "new_value".equals(val = cache1.get(key)) : "Could not find key " + key + " on cache1: expected new_value, was " + val;
+ assert "new_value".equals(val = cache2.get(key)) : "Could not find key " + key + " on cache2: expected new_value, was " + val;
+ }
+
+ public void testLockAndRemoveRetval() throws SystemException, NotSupportedException, RollbackException, HeuristicRollbackException, HeuristicMixedException, InterruptedException {
+ Cache<String, String> cache1 = cache(0), cache2 = cache(1);
+
+ cache1.put(key, "v");
+
+ assert "v".equals(cache1.get(key)) : "Could not find key " + key + " on cache1";
+ assert "v".equals(cache2.get(key)) : "Could not find key " + key + " on cache2";
+
+ tm(0).begin();
+ cache1.getAdvancedCache().lock(key);
+ String old = cache1.remove(key);
+ assert "v".equals(old) : "Expected v, was " + old;
+ tm(0).commit();
+
+ String val;
+ assert (null == (val = cache1.get(key))) : "Could not find key " + key + " on cache1: expected null, was " + val;
+ assert (null == (val = cache2.get(key))) : "Could not find key " + key + " on cache2: expected null, was " + val;
+ }
+
+}
More information about the infinispan-commits
mailing list