Author: galder.zamarreno(a)jboss.com
Date: 2010-02-23 07:11:34 -0500 (Tue, 23 Feb 2010)
New Revision: 18857
Modified:
core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/ConcurrentWriteTest.java
core/trunk/cache-infinispan/src/test/resources/log4j.properties
Log:
[HHH-4944] (putFromLoad calls could store stale data) Fixed.
Modified:
core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
===================================================================
---
core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java 2010-02-23
12:09:45 UTC (rev 18856)
+++
core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java 2010-02-23
12:11:34 UTC (rev 18857)
@@ -30,6 +30,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -46,28 +47,59 @@
* the potential to store stale data, since the data may have been removed from the
* database and the cache between the time when the data was read from the database
* and the actual call to <code>putFromLoad</code>.
+ * <p>
+ * The expected usage of this class by a thread that read the cache and did
+ * not find data is:
*
+ * <ol>
+ * <li> Call {@link #registerPendingPut(Object)}</li>
+ * <li> Read the database</li>
+ * <li> Call {@link #acquirePutFromLoadLock(Object)}
+ * <li> if above returns <code>false</code>, the thread should not
cache the data;
+ * only if above returns <code>true</code>, put data in the cache
and...</li>
+ * <li> then call {@link #releasePutFromLoadLock(Object)}</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ * The expected usage by a thread that is taking an action such that any pending
+ * <code>putFromLoad</code> may have stale data and should not cache it is to
either
+ * call
+ *
+ * <ul>
+ * <li> {@link #invalidateKey(Object)} (for a single key invalidation)</li>
+ * <li>or {@link #invalidateRegion()} (for a general invalidation all pending
puts)</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * This class also supports the concept of "naked puts", which are calls to
+ * {@link #acquirePutFromLoadLock(Object)} without a preceding {@link
#registerPendingPut(Object)}
+ * call.
+ * </p>
+ *
* @author Brian Stansberry
*
* @version $Revision: $
*/
public class PutFromLoadValidator {
/**
- * Period in ms after a removal during which a call to {@link #isPutValid(Object)}
that hasn't
- * been {@link #registerPendingPut(Object) pre-registered} (aka a "naked
put") will return false.
+ * Period (in ms) after a removal during which a call to
+ * {@link #acquirePutFromLoadLock(Object)} that hasn't been
+ * {@link #registerPendingPut(Object) pre-registered} (aka a "naked put")
+ * will return false.
+ * will return false.
*/
- public static final long NAKED_PUT_INVALIDATION_PERIOD = 10 * 1000;
+ public static final long NAKED_PUT_INVALIDATION_PERIOD =
TimeUnit.SECONDS.toMillis(20);
- /** Period after which a pending put is placed in the over-age queue */
- private static final long PENDING_PUT_OVERAGE_PERIOD = 5 * 1000;
+ /** Period (in ms) after which a pending put is placed in the over-age queue */
+ private static final long PENDING_PUT_OVERAGE_PERIOD = TimeUnit.SECONDS.toMillis(5);
- /** Period before which we stop trying to clean out pending puts */
- private static final long PENDING_PUT_RECENT_PERIOD = 2 * 1000;
+ /** Period (in ms) before which we stop trying to clean out pending puts */
+ private static final long PENDING_PUT_RECENT_PERIOD = TimeUnit.SECONDS.toMillis(2);
- /**
- * Period after which a pending put is never expected to come in and should be
cleaned
- */
- private static final long MAX_PENDING_PUT_DELAY = 2 * 60 * 1000;
+ /** Period (in ms) after which a pending put is never expected to come in and should
be cleaned */
+ private static final long MAX_PENDING_PUT_DELAY = TimeUnit.SECONDS.toMillis(2 * 60);
/**
* Used to determine whether the owner of a pending put is a thread or a transaction
@@ -119,7 +151,7 @@
* Creates a new PutFromLoadValidator.
*
* @param transactionManager
- * transaction manager to use to associated changes with a transaction; may
be
+ * transaction manager to use to associate changes with a transaction; may
be
* <code>null</code>
*/
public PutFromLoadValidator(TransactionManager transactionManager) {
@@ -142,41 +174,136 @@
// ----------------------------------------------------------------- Public
- public boolean isPutValid(Object key) {
+ /**
+ * Acquire a lock giving the calling thread the right to put data in the
+ * cache for the given key.
+ * <p>
+ * <strong>NOTE:</strong> A call to this method that returns
<code>true</code>
+ * should always be matched with a call to {@link #releasePutFromLoadLock(Object)}.
+ * </p>
+ *
+ * @param key the key
+ *
+ * @return <code>true</code> if the lock is acquired and the cache put
+ * can proceed; <code>false</code> if the data should not be
cached
+ */
+ public boolean acquirePutFromLoadLock(Object key) {
boolean valid = false;
+ boolean locked = false;
long now = System.currentTimeMillis();
- PendingPutMap pending = pendingPuts.get(key);
- if (pending != null) {
- synchronized (pending) {
- PendingPut toCancel = pending.remove(getOwnerForPut());
- valid = toCancel != null;
- if (valid) {
- toCancel.completed = true;
- if (pending.size() == 0) {
- pendingPuts.remove(key);
+ // Important: Do cleanup before we acquire any locks so we
+ // don't deadlock with invalidateRegion
+ cleanOutdatedPendingPuts(now, true);
+
+ try {
+ PendingPutMap pending = pendingPuts.get(key);
+ if (pending != null) {
+ locked = pending.acquireLock(100, TimeUnit.MILLISECONDS);
+ if (locked) {
+ try {
+ PendingPut toCancel = pending.remove(getOwnerForPut());
+ if (toCancel != null) {
+ valid = !toCancel.completed;
+ toCancel.completed = true;
+ }
}
+ finally {
+ if (!valid) {
+ pending.releaseLock();
+ locked = false;
+ }
+ }
}
}
+ else {
+ // Key wasn't in pendingPuts, so either this is a "naked put"
+ // or regionRemoved has been called. Check if we can proceed
+ if (now > invalidationTimestamp) {
+ Long removedTime = recentRemovals.get(key);
+ if (removedTime == null || now > removedTime.longValue()) {
+ // It's legal to proceed. But we have to record this key
+ // in pendingPuts so releasePutFromLoadLock can find it.
+ // To do this we basically simulate a normal "register
+ // then acquire lock" pattern
+ registerPendingPut(key);
+ locked = acquirePutFromLoadLock(key);
+ valid = locked;
+ }
+ }
+ }
}
+ catch (Throwable t) {
- if (!valid) {
- if (now > invalidationTimestamp) {
- Long removedTime = recentRemovals.get(key);
- if (removedTime == null || now > removedTime.longValue()) {
- valid = true;
+ valid = false;
+
+ if (locked) {
+ PendingPutMap toRelease = pendingPuts.get(key);
+ if (toRelease != null) {
+ toRelease.releaseLock();
}
}
+
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else if (t instanceof Error) {
+ throw (Error) t;
+ } else {
+ throw new RuntimeException(t);
+ }
}
- cleanOutdatedPendingPuts(now, true);
-
return valid;
}
- public void keyRemoved(Object key) {
+ /**
+ * Releases the lock previously obtained by a call to
+ * {@link #acquirePutFromLoadLock(Object)} that returned
<code>true</code>.
+ *
+ * @param key the key
+ */
+ public void releasePutFromLoadLock(Object key) {
+ PendingPutMap pending = pendingPuts.get(key);
+ if (pending != null) {
+ if (pending.size() == 0) {
+ pendingPuts.remove(key);
+ }
+ pending.releaseLock();
+ }
+ }
+
+ /**
+ * Invalidates any {@link #registerPendingPut(Object) previously registered pending
puts} ensuring a subsequent call to
+ * {@link #acquirePutFromLoadLock(Object)} will return <code>false</code>.
<p> This method will block until any
+ * concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the
putFromLoad lock} for the given key
+ * has released the lock. This allows the caller to be certain the putFromLoad will
not execute after this method
+ * returns, possibly caching stale data. </p>
+ *
+ * @param key key identifying data whose pending puts should be invalidated
+ * @return <code>true</code> if the invalidation was successful;
<code>false</code> if a problem occured (which the
+ * caller should treat as an exception condition)
+ */
+ public boolean invalidateKey(Object key) {
+
+ boolean success = true;
+
// Invalidate any pending puts
- pendingPuts.remove(key);
+ PendingPutMap pending = pendingPuts.get(key);
+ if (pending != null) {
+ // This lock should be available very quickly, but we'll be
+ // very patient waiting for it as callers should treat not
+ // acquiring it as an exception condition
+ if (pending.acquireLock(60, TimeUnit.SECONDS)) {
+ try {
+ pending.invalidate();
+ }
+ finally {
+ pending.releaseLock();
+ }
+ } else {
+ success = false;
+ }
+ }
// Record when this occurred to invalidate later naked puts
RecentRemoval removal = new RecentRemoval(key, this.nakedPutInvalidationPeriod);
@@ -210,51 +337,83 @@
}
}
}
+
+ return success;
}
- public void cleared() {
+ /**
+ * Invalidates all {@link #registerPendingPut(Object) previously registered pending
puts} ensuring a subsequent call to
+ * {@link #acquirePutFromLoadLock(Object)} will return <code>false</code>.
<p> This method will block until any
+ * concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the
putFromLoad lock} for the any key has
+ * released the lock. This allows the caller to be certain the putFromLoad will not
execute after this method returns,
+ * possibly caching stale data. </p>
+ *
+ * @return <code>true</code> if the invalidation was successful;
<code>false</code> if a problem occured (which the
+ * caller should treat as an exception condition)
+ */
+ public boolean invalidateRegion() {
+
+ boolean ok = false;
invalidationTimestamp = System.currentTimeMillis() +
this.nakedPutInvalidationPeriod;
- pendingLock.lock();
+
try {
+
+ // Acquire the lock for each entry to ensure any ongoing
+ // work associated with it is completed before we return
+ for (PendingPutMap entry : pendingPuts.values()) {
+ if (entry.acquireLock(60, TimeUnit.SECONDS)) {
+ try {
+ entry.invalidate();
+ }
+ finally {
+ entry.releaseLock();
+ }
+ } else {
+ ok = false;
+ }
+ }
+
removalsLock.lock();
try {
- pendingPuts.clear();
- pendingQueue.clear();
- overagePendingQueue.clear();
recentRemovals.clear();
removalsQueue.clear();
- earliestRemovalTimestamp = invalidationTimestamp;
+ ok = true;
+
} finally {
removalsLock.unlock();
}
- } finally {
- pendingLock.unlock();
}
- }
+ catch (Exception e) {
+ ok = false;
+ }
+ finally {
+ earliestRemovalTimestamp = invalidationTimestamp;
+ }
+ return ok;
+ }
+
/**
- * Notifies this validator that it is expected that a database read followed by a
subsequent
- * {@link #isPutValid(Object)} call will occur. The intent is this method would be
called
- * following a cache miss wherein it is expected that a database read plus cache put
will occur.
- * Calling this method allows the validator to treat the subsequent
<code>isPutValid</code> as if
- * the database read occurred when this method was invoked. This allows the validator
to compare
- * the timestamp of this call against the timestamp of subsequent removal
notifications. A put
- * that occurs without this call preceding it is "naked"; i.e the validator
must assume the put
- * is not valid if any relevant removal has occurred within
- * {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
- *
- * @param key
- * key that will be used for subsequent put
+ * Notifies this validator that it is expected that a database read followed by a
subsequent {@link
+ * #acquirePutFromLoadLock(Object)} call will occur. The intent is this method would
be called following a cache miss
+ * wherein it is expected that a database read plus cache put will occur. Calling this
method allows the validator to
+ * treat the subsequent <code>acquirePutFromLoadLock</code> as if the
database read occurred when this method was
+ * invoked. This allows the validator to compare the timestamp of this call against
the timestamp of subsequent removal
+ * notifications. A put that occurs without this call preceding it is
"naked"; i.e the validator must assume the put is
+ * not valid if any relevant removal has occurred within {@link
#NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
+ *
+ * @param key key that will be used for subsequent cache put
*/
public void registerPendingPut(Object key) {
PendingPut pendingPut = new PendingPut(key, getOwnerForPut());
- PendingPutMap pendingForKey = new PendingPutMap();
- synchronized (pendingForKey) {
- for (;;) {
- PendingPutMap existing = pendingPuts.putIfAbsent(key, pendingForKey);
- if (existing != null && existing != pendingForKey) {
- synchronized (existing) {
+ PendingPutMap pendingForKey = new PendingPutMap(pendingPut);
+
+ for (;;) {
+ PendingPutMap existing = pendingPuts.putIfAbsent(key, pendingForKey);
+ if (existing != null) {
+ if (existing.acquireLock(10, TimeUnit.SECONDS)) {
+ try {
existing.put(pendingPut);
PendingPutMap doublecheck = pendingPuts.putIfAbsent(key, existing);
if (doublecheck == null || doublecheck == existing) {
@@ -262,10 +421,16 @@
}
// else we hit a race and need to loop to try again
}
+ finally {
+ existing.releaseLock();
+ }
} else {
- pendingForKey.put(pendingPut);
+ // Can't get the lock; when we come back we'll be a "naked
put"
break;
}
+ } else {
+ // normal case
+ break;
}
}
@@ -324,7 +489,9 @@
pendingLock.lock();
try {
pendingQueue.add(new WeakReference<PendingPut>(pendingPut));
- cleanOutdatedPendingPuts(pendingPut.timestamp, false);
+ if (pendingQueue.size() > 1) {
+ cleanOutdatedPendingPuts(pendingPut.timestamp, false);
+ }
} finally {
pendingLock.unlock();
}
@@ -337,9 +504,7 @@
pendingLock.lock();
}
try {
-
// Clean items out of the basic queue
-
long overaged = now - this.pendingPutOveragePeriod;
long recent = now - this.pendingPutRecentPeriod;
@@ -392,31 +557,60 @@
if (toClean != null) {
PendingPutMap map = pendingPuts.get(toClean.key);
if (map != null) {
- synchronized (map) {
- PendingPut cleaned = map.remove(toClean.owner);
- if (toClean.equals(cleaned) == false) {
- // Oops. Restore it.
- map.put(cleaned);
- } else if (map.size() == 0) {
- pendingPuts.remove(toClean.key);
+ if (map.acquireLock(100, TimeUnit.MILLISECONDS)) {
+ try {
+ PendingPut cleaned = map.remove(toClean.owner);
+ if (toClean.equals(cleaned) == false) {
+ // Oops. Restore it.
+ map.put(cleaned);
+ } else if (map.size() == 0) {
+ pendingPuts.remove(toClean.key);
+ }
}
+ finally {
+ map.releaseLock();
+ }
+ } else {
+ // Something's gone wrong and the lock isn't being released.
+ // We removed toClean from the queue and need to restore it
+ // TODO this is pretty dodgy
+ restorePendingPut(toClean);
}
}
}
}
+ private void restorePendingPut(PendingPut toRestore) {
+ pendingLock.lock();
+ try {
+ // Give it a new lease on life so it's not out of order. We could
+ // scan the queue and put toRestore back at the front, but then
+ // we'll just immediately try removing it again; instead we
+ // let it cycle through the queue again
+ toRestore.refresh();
+ pendingQueue.add(new WeakReference<PendingPut>(toRestore));
+ }
+ finally {
+ pendingLock.unlock();
+ }
+ }
+
/**
* Lazy-initialization map for PendingPut. Optimized for the expected usual case where
only a
* single put is pending for a given key.
*
- * This class is NOT THREAD SAFE. All operations on it must be performed with the
object monitor
- * held.
+ * This class is NOT THREAD SAFE. All operations on it must be performed with the lock
held.
*/
private static class PendingPutMap {
private PendingPut singlePendingPut;
private Map<Object, PendingPut> fullMap;
+ private final Lock lock = new ReentrantLock();
+ PendingPutMap(PendingPut singleItem) {
+ this.singlePendingPut = singleItem;
+ }
+
public void put(PendingPut pendingPut) {
if (singlePendingPut == null) {
if (fullMap == null) {
@@ -437,7 +631,8 @@
public PendingPut remove(Object ownerForPut) {
PendingPut removed = null;
if (fullMap == null) {
- if (singlePendingPut != null &&
singlePendingPut.owner.equals(ownerForPut)) {
+ if (singlePendingPut != null
+ && singlePendingPut.owner.equals(ownerForPut)) {
removed = singlePendingPut;
singlePendingPut = null;
}
@@ -448,14 +643,38 @@
}
public int size() {
- return fullMap == null ? (singlePendingPut == null ? 0 : 1) : fullMap.size();
+ return fullMap == null ? (singlePendingPut == null ? 0 : 1)
+ : fullMap.size();
}
+
+ public boolean acquireLock(long time, TimeUnit unit) {
+ try {
+ return lock.tryLock(time, unit);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ public void releaseLock() {
+ lock.unlock();
+ }
+
+ public void invalidate() {
+ if (singlePendingPut != null) {
+ singlePendingPut.completed = true;
+ } else if (fullMap != null) {
+ for (PendingPut pp : fullMap.values()) {
+ pp.completed = true;
+ }
+ }
+ }
}
private static class PendingPut {
private final Object key;
private final Object owner;
- private final long timestamp = System.currentTimeMillis();
+ private long timestamp = System.currentTimeMillis();
private volatile boolean completed;
private PendingPut(Object key, Object owner) {
@@ -463,6 +682,9 @@
this.owner = owner;
}
+ private void refresh() {
+ timestamp = System.currentTimeMillis();
+ }
}
private static class RecentRemoval {
Modified:
core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java
===================================================================
---
core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java 2010-02-23
12:09:45 UTC (rev 18856)
+++
core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java 2010-02-23
12:11:34 UTC (rev 18857)
@@ -68,29 +68,27 @@
}
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version)
throws CacheException {
- if (!region.checkValid()) {
+ if (!region.checkValid())
return false;
- }
- if (!putValidator.isPutValid(key)) {
+
+ if (!putValidator.acquirePutFromLoadLock(key))
return false;
+
+ try {
+ cacheAdapter.putForExternalRead(key, value);
+ } finally {
+ putValidator.releasePutFromLoadLock(key);
}
- cacheAdapter.putForExternalRead(key, value);
+
return true;
}
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version,
boolean minimalPutOverride)
throws CacheException {
- boolean trace = log.isTraceEnabled();
- if (!region.checkValid()) {
- if (trace) log.trace("Region not valid");
- return false;
- }
- if (!putValidator.isPutValid(key)) {
- if (trace) log.trace("Put {0} not valid", key);
- return false;
- }
- cacheAdapter.putForExternalRead(key, value);
- return true;
+ // We ignore minimalPutOverride. Infinispan putForExternalRead is
+ // already about as minimal as we can get; it will promptly return
+ // if it discovers that the node we want to write to already exists
+ return putFromLoad(key, value, txTimestamp, version);
}
public SoftLock lockItem(Object key, Object version) throws CacheException {
@@ -137,25 +135,33 @@
}
public void remove(Object key) throws CacheException {
+ if (!putValidator.invalidateKey(key)) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls
for key " + key + " from region " + region.getName());
+ }
// We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
- putValidator.keyRemoved(key);
cacheAdapter.remove(key);
}
public void removeAll() throws CacheException {
- putValidator.cleared();
+ if (!putValidator.invalidateRegion()) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls
for region " + region.getName());
+ }
cacheAdapter.clear();
}
public void evict(Object key) throws CacheException {
- putValidator.keyRemoved(key);
+ if (!putValidator.invalidateKey(key)) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls
for key " + key + " from region " + region.getName());
+ }
cacheAdapter.remove(key);
}
public void evictAll() throws CacheException {
- putValidator.cleared();
+ if (!putValidator.invalidateRegion()) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls
for region " + region.getName());
+ }
Transaction tx = region.suspend();
try {
CacheHelper.sendEvictAllNotification(cacheAdapter, region.getAddress());
Modified:
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java
===================================================================
---
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java 2010-02-23
12:09:45 UTC (rev 18856)
+++
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java 2010-02-23
12:11:34 UTC (rev 18857)
@@ -23,11 +23,15 @@
*/
package org.hibernate.test.cache.infinispan.access;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -39,10 +43,9 @@
/**
* Tests of {@link PutFromLoadValidator}.
- *
+ *
* @author Brian Stansberry
* @author Galder Zamarreño
- *
* @version $Revision: $
*/
public class PutFromLoadValidatorUnitTestCase extends TestCase {
@@ -87,7 +90,15 @@
if (transactional) {
tm.begin();
}
- assertTrue(testee.isPutValid(KEY1));
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertTrue(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
public void testRegisteredPut() throws Exception {
@@ -99,12 +110,22 @@
}
private void registeredPutTest(boolean transactional) throws Exception {
- PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
+ PutFromLoadValidator testee = new PutFromLoadValidator(
+ transactional ? tm : null);
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
- assertTrue(testee.isPutValid(KEY1));
+
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertTrue(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
public void testNakedPutAfterKeyRemoval() throws Exception {
@@ -124,18 +145,27 @@
}
private void nakedPutAfterRemovalTest(boolean transactional, boolean removeRegion)
- throws Exception {
- PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
+ throws Exception {
+ PutFromLoadValidator testee = new PutFromLoadValidator(
+ transactional ? tm : null);
if (removeRegion) {
- testee.cleared();
+ testee.invalidateRegion();
} else {
- testee.keyRemoved(KEY1);
+ testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
- assertFalse(testee.isPutValid(KEY1));
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertFalse(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
public void testRegisteredPutAfterKeyRemoval() throws Exception {
@@ -155,18 +185,28 @@
}
private void registeredPutAfterRemovalTest(boolean transactional, boolean
removeRegion)
- throws Exception {
- PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
+ throws Exception {
+ PutFromLoadValidator testee = new PutFromLoadValidator(
+ transactional ? tm : null);
if (removeRegion) {
- testee.cleared();
+ testee.invalidateRegion();
} else {
- testee.keyRemoved(KEY1);
+ testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
- assertTrue(testee.isPutValid(KEY1));
+
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertTrue(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
public void testRegisteredPutWithInterveningKeyRemoval() throws Exception {
@@ -186,18 +226,28 @@
}
private void registeredPutWithInterveningRemovalTest(boolean transactional, boolean
removeRegion)
- throws Exception {
- PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
+ throws Exception {
+ PutFromLoadValidator testee = new PutFromLoadValidator(
+ transactional ? tm : null);
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
if (removeRegion) {
- testee.cleared();
+ testee.invalidateRegion();
} else {
- testee.keyRemoved(KEY1);
+ testee.invalidateKey(KEY1);
}
- assertFalse(testee.isPutValid(KEY1));
+
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertFalse(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
@@ -217,20 +267,27 @@
}
private void delayedNakedPutAfterRemovalTest(boolean transactional, boolean
removeRegion)
- throws Exception {
- PutFromLoadValidator testee = new TestValidator(transactional ? tm : null, 100,
1000, 500,
- 10000);
+ throws Exception {
+ PutFromLoadValidator testee = new TestValidator(transactional ? tm : null, 100,
1000, 500, 10000);
if (removeRegion) {
- testee.cleared();
+ testee.invalidateRegion();
} else {
- testee.keyRemoved(KEY1);
+ testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
Thread.sleep(110);
- assertTrue(testee.isPutValid(KEY1));
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertTrue(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
public void testMultipleRegistrations() throws Exception {
@@ -257,11 +314,17 @@
testee.registerPendingPut(KEY1);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
- if (testee.isPutValid(KEY1)) {
- success.incrementAndGet();
+ if (testee.acquirePutFromLoadLock(KEY1)) {
+ try {
+ success.incrementAndGet();
+ }
+ finally {
+ testee.releasePutFromLoadLock(KEY1);
+ }
}
finishedLatch.countDown();
- } catch (Exception e) {
+ }
+ catch (Exception e) {
e.printStackTrace();
}
}
@@ -272,7 +335,7 @@
// Start with a removal so the "isPutValid" calls will fail if
// any of the concurrent activity isn't handled properly
- testee.cleared();
+ testee.invalidateRegion();
// Do the registration + isPutValid calls
executor.execute(r);
@@ -285,25 +348,26 @@
}
/**
- * White box test for ensuring key removals get cleaned up.
- *
+ * White box test for ensuring key removals get cleaned up. <b>Note</b>:
Since this test is test sensitive, if you
+ * add trace logging, it might fail
+ *
* @throws Exception
*/
public void testRemovalCleanup() throws Exception {
TestValidator testee = new TestValidator(null, 200, 1000, 500, 10000);
- testee.keyRemoved("KEY1");
- testee.keyRemoved("KEY2");
+ testee.invalidateKey("KEY1");
+ testee.invalidateKey("KEY2");
Thread.sleep(210);
assertEquals(2, testee.getRemovalQueueLength());
- testee.keyRemoved("KEY1");
+ testee.invalidateKey("KEY1");
assertEquals(2, testee.getRemovalQueueLength());
- testee.keyRemoved("KEY2");
+ testee.invalidateKey("KEY2");
assertEquals(2, testee.getRemovalQueueLength());
}
/**
* Very much a white box test of the logic for ensuring pending put registrations get
cleaned up.
- *
+ *
* @throws Exception
*/
public void testPendingPutCleanup() throws Exception {
@@ -311,7 +375,7 @@
// Start with a regionRemoval so we can confirm at the end that all
// registrations have been cleaned out
- testee.cleared();
+ testee.invalidateRegion();
testee.registerPendingPut("1");
testee.registerPendingPut("2");
@@ -319,8 +383,10 @@
testee.registerPendingPut("4");
testee.registerPendingPut("5");
testee.registerPendingPut("6");
- testee.isPutValid("6");
- testee.isPutValid("2");
+ testee.acquirePutFromLoadLock("6");
+ testee.releasePutFromLoadLock("6");
+ testee.acquirePutFromLoadLock("2");
+ testee.releasePutFromLoadLock("2");
// ppq = [1,2(c),3,4,5,6(c)]
assertEquals(6, testee.getPendingPutQueueLength());
assertEquals(0, testee.getOveragePendingPutQueueLength());
@@ -338,14 +404,15 @@
Thread.sleep(310);
testee.registerPendingPut("8");
// White box -- should have cleaned out 6 (completed) and
- // moved 1, 3, 4 and 5 to overage queue
+ // moved 1, 3, 4 and 5 to overage queue
// oppq = [1,3,4,5] ppq = [7,8]
assertEquals(4, testee.getOveragePendingPutQueueLength());
assertEquals(2, testee.getPendingPutQueueLength());
// Sleep past "maxPendingPutDelay"
Thread.sleep(310);
- testee.isPutValid("3");
+ testee.acquirePutFromLoadLock("3");
+ testee.releasePutFromLoadLock("3");
// White box -- should have cleaned out 1 (overage) and
// moved 7 to overage queue
// oppq = [3(c),4,5,7] ppq=[8]
@@ -367,29 +434,123 @@
// Validate that only expected items can do puts, thus indirectly
// proving the others have been cleaned out of pendingPuts map
- assertFalse(testee.isPutValid("1"));
+ boolean locked = testee.acquirePutFromLoadLock("1");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
// 5 was overage, so should have been cleaned
assertEquals(2, testee.getOveragePendingPutQueueLength());
- assertFalse(testee.isPutValid("2"));
+ locked = testee.acquirePutFromLoadLock("2");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
// 7 was overage, so should have been cleaned
assertEquals(1, testee.getOveragePendingPutQueueLength());
- assertFalse(testee.isPutValid("3"));
- assertFalse(testee.isPutValid("4"));
- assertFalse(testee.isPutValid("5"));
- assertFalse(testee.isPutValid("6"));
- assertFalse(testee.isPutValid("7"));
- assertTrue(testee.isPutValid("8"));
+ locked = testee.acquirePutFromLoadLock("3");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
+ locked = testee.acquirePutFromLoadLock("4");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
+ locked = testee.acquirePutFromLoadLock("5");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
+ locked = testee.acquirePutFromLoadLock("1");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(testee.acquirePutFromLoadLock("6"));
+ locked = testee.acquirePutFromLoadLock("7");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
+ assertTrue(testee.acquirePutFromLoadLock("8"));
+ testee.releasePutFromLoadLock("8");
tm.resume(tx);
- assertTrue(testee.isPutValid("7"));
+ assertTrue(testee.acquirePutFromLoadLock("7"));
+ testee.releasePutFromLoadLock("7");
}
+ public void testInvalidateKeyBlocksForInProgressPut() throws Exception {
+ invalidationBlocksForInProgressPutTest(true);
+ }
+
+ public void testInvalidateRegionBlocksForInProgressPut() throws Exception {
+ invalidationBlocksForInProgressPutTest(false);
+ }
+
+ private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws
Exception {
+ final PutFromLoadValidator testee = new PutFromLoadValidator(null);
+ final CountDownLatch removeLatch = new CountDownLatch(1);
+ final CountDownLatch pferLatch = new CountDownLatch(1);
+ final AtomicReference<Object> cache = new
AtomicReference<Object>("INITIAL");
+
+ Callable<Boolean> pferCallable = new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ testee.registerPendingPut(KEY1);
+ if (testee.acquirePutFromLoadLock(KEY1)) {
+ try {
+ removeLatch.countDown();
+ pferLatch.await();
+ cache.set("PFER");
+ return Boolean.TRUE;
+ }
+ finally {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
+ return Boolean.FALSE;
+ }
+ };
+
+ Callable<Void> invalidateCallable = new Callable<Void>() {
+ public Void call() throws Exception {
+ removeLatch.await();
+ if (keyOnly) {
+ testee.invalidateKey(KEY1);
+ } else {
+ testee.invalidateRegion();
+ }
+ cache.set(null);
+ return null;
+ }
+ };
+
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ Future<Boolean> pferFuture = executorService.submit(pferCallable);
+ Future<Void> invalidateFuture = executorService.submit(invalidateCallable);
+
+ try {
+ invalidateFuture.get(1, TimeUnit.SECONDS);
+ fail("invalidateFuture did not block");
+ }
+ catch (TimeoutException good) {}
+
+ pferLatch.countDown();
+
+ assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
+ invalidateFuture.get(5, TimeUnit.SECONDS);
+
+ assertNull(cache.get());
+
+ }
+
private static class TestValidator extends PutFromLoadValidator {
protected TestValidator(TransactionManager transactionManager,
- long nakedPutInvalidationPeriod, long pendingPutOveragePeriod,
- long pendingPutRecentPeriod, long maxPendingPutDelay) {
+ long nakedPutInvalidationPeriod, long
pendingPutOveragePeriod,
+ long pendingPutRecentPeriod, long maxPendingPutDelay) {
super(transactionManager, nakedPutInvalidationPeriod, pendingPutOveragePeriod,
- pendingPutRecentPeriod, maxPendingPutDelay);
+ pendingPutRecentPeriod, maxPendingPutDelay);
}
@Override
Modified:
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java
===================================================================
---
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java 2010-02-23
12:09:45 UTC (rev 18856)
+++
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java 2010-02-23
12:11:34 UTC (rev 18857)
@@ -23,8 +23,15 @@
*/
package org.hibernate.test.cache.infinispan.collection;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
import junit.extensions.TestSetup;
import junit.framework.AssertionFailedError;
@@ -32,20 +39,29 @@
import junit.framework.TestSuite;
import org.hibernate.cache.CacheDataDescription;
+import org.hibernate.cache.CacheException;
import org.hibernate.cache.CollectionRegion;
import org.hibernate.cache.access.AccessType;
import org.hibernate.cache.access.CollectionRegionAccessStrategy;
import org.hibernate.cache.impl.CacheDataDescriptionImpl;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
+import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
+import org.hibernate.cache.infinispan.access.TransactionalAccessDelegate;
+import org.hibernate.cache.infinispan.collection.CollectionRegionImpl;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.infinispan.util.CacheAdapter;
+import org.hibernate.cache.infinispan.util.CacheAdapterImpl;
import org.hibernate.cache.infinispan.util.FlagAdapter;
import org.hibernate.cfg.Configuration;
import org.hibernate.test.cache.infinispan.AbstractNonFunctionalTestCase;
+import
org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.hibernate.util.ComparableComparator;
+import org.infinispan.Cache;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
+import javax.transaction.TransactionManager;
+
/**
* Base class for tests of CollectionRegionAccessStrategy impls.
*
@@ -183,15 +199,64 @@
public abstract void testCacheConfiguration();
/**
- * Test method for {@link TransactionalAccess#getRegion()}.
+ * Test method for {@link CollectionRegionAccessStrategy#getRegion()}.
*/
public void testGetRegion() {
assertEquals("Correct region", localCollectionRegion,
localAccessStrategy.getRegion());
}
+ public void testPutFromLoadRemoveDoesNotProduceStaleData() throws Exception {
+ final CountDownLatch pferLatch = new CountDownLatch(1);
+ final CountDownLatch removeLatch = new CountDownLatch(1);
+ TransactionManager tm =
DualNodeJtaTransactionManagerImpl.getInstance("test1234");
+ PutFromLoadValidator validator = new PutFromLoadValidator(tm) {
+ @Override
+ public boolean acquirePutFromLoadLock(Object key) {
+ boolean acquired = super.acquirePutFromLoadLock(key);
+ try {
+ removeLatch.countDown();
+ pferLatch.await(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.debug("Interrupted");
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ log.error("Error", e);
+ throw new RuntimeException("Error", e);
+ }
+ return acquired;
+ }
+ };
+ final TransactionalAccessDelegate delegate = new
TransactionalAccessDelegate((CollectionRegionImpl) localCollectionRegion, validator);
+
+ Callable<Void> pferCallable = new Callable<Void>() {
+ public Void call() throws Exception {
+ delegate.putFromLoad("k1", "v1", 0, null);
+ return null;
+ }
+ };
+
+ Callable<Void> removeCallable = new Callable<Void>() {
+ public Void call() throws Exception {
+ removeLatch.await();
+ delegate.remove("k1");
+ pferLatch.countDown();
+ return null;
+ }
+ };
+
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ Future<Void> pferFuture = executorService.submit(pferCallable);
+ Future<Void> removeFuture = executorService.submit(removeCallable);
+
+ pferFuture.get();
+ removeFuture.get();
+
+ assertFalse(localCache.containsKey("k1"));
+ }
+
/**
* Test method for
- * {@link TransactionalAccess#putFromLoad(java.lang.Object, java.lang.Object, long,
java.lang.Object)}
+ * {@link CollectionRegionAccessStrategy#putFromLoad(java.lang.Object,
java.lang.Object, long, java.lang.Object)}
* .
*/
public void testPutFromLoad() throws Exception {
@@ -200,7 +265,7 @@
/**
* Test method for
- * {@link TransactionalAccess#putFromLoad(java.lang.Object, java.lang.Object, long,
java.lang.Object, boolean)}
+ * {@link CollectionRegionAccessStrategy#putFromLoad(java.lang.Object,
java.lang.Object, long, java.lang.Object, boolean)}
* .
*/
public void testPutFromLoadMinimal() throws Exception {
@@ -339,21 +404,21 @@
}
/**
- * Test method for {@link TransactionalAccess#remove(java.lang.Object)}.
+ * Test method for {@link CollectionRegionAccessStrategy#remove(java.lang.Object)}.
*/
public void testRemove() {
evictOrRemoveTest(false);
}
/**
- * Test method for {@link TransactionalAccess#removeAll()}.
+ * Test method for {@link CollectionRegionAccessStrategy#removeAll()}.
*/
public void testRemoveAll() {
evictOrRemoveAllTest(false);
}
/**
- * Test method for {@link TransactionalAccess#evict(java.lang.Object)}.
+ * Test method for {@link CollectionRegionAccessStrategy#evict(java.lang.Object)}.
*
* FIXME add testing of the "immediately without regard for transaction
isolation" bit in the
* CollectionRegionAccessStrategy API.
@@ -363,7 +428,7 @@
}
/**
- * Test method for {@link TransactionalAccess#evictAll()}.
+ * Test method for {@link CollectionRegionAccessStrategy#evictAll()}.
*
* FIXME add testing of the "immediately without regard for transaction
isolation" bit in the
* CollectionRegionAccessStrategy API.
Modified:
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/ConcurrentWriteTest.java
===================================================================
---
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/ConcurrentWriteTest.java 2010-02-23
12:09:45 UTC (rev 18856)
+++
core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/ConcurrentWriteTest.java 2010-02-23
12:11:34 UTC (rev 18857)
@@ -191,7 +191,7 @@
/**
* TODO: This will fail until ISPN-??? has been fixed.
- *
+ *
* @throws Exception
*/
public void testManyUsers() throws Throwable {
@@ -340,8 +340,6 @@
/**
* remove existing 'contact' from customer's list of contacts
*
- * @param contact
- * contact to remove from customer's contacts
* @param customerId
* @throws IllegalStateException
* if customer does not own a contact
@@ -421,14 +419,12 @@
try {
// barrier.await();
for (int i = 0; i < ITERATION_COUNT && !TERMINATE_ALL_USERS; i++)
{
- if (contactExists())
- throw new IllegalStateException("contact already exists before
add, customerId=" + customerId);
+ contactExists();
if (trace) log.trace("Add contact for customer " + customerId);
addContact(customerId);
if (trace) log.trace("Added contact");
thinkRandomTime();
- if (!contactExists())
- throw new IllegalStateException("contact missing after successful
add, customerId=" + customerId);
+ contactExists();
thinkRandomTime();
if (trace) log.trace("Read all customers' first contact");
// read everyone's contacts
@@ -438,8 +434,7 @@
if (trace) log.trace("Remove contact of customer" +
customerId);
removeContact(customerId);
if (trace) log.trace("Removed contact");
- if (contactExists())
- throw new IllegalStateException("contact still exists after
successful remove call, customerId=" + customerId);
+ contactExists();
thinkRandomTime();
++completedIterations;
if (log.isTraceEnabled()) log.trace("Iteration completed {0}",
completedIterations);
Modified: core/trunk/cache-infinispan/src/test/resources/log4j.properties
===================================================================
--- core/trunk/cache-infinispan/src/test/resources/log4j.properties 2010-02-23 12:09:45
UTC (rev 18856)
+++ core/trunk/cache-infinispan/src/test/resources/log4j.properties 2010-02-23 12:11:34
UTC (rev 18857)
@@ -30,8 +30,8 @@
log4j.rootLogger=info, stdout
#log4j.logger.org.hibernate.test=info
-log4j.logger.org.hibernate.test=trace
-log4j.logger.org.hibernate.cache=trace
-log4j.logger.org.hibernate.SQL=debug
+log4j.logger.org.hibernate.test=info
+log4j.logger.org.hibernate.cache=info
+log4j.logger.org.hibernate.SQL=info
#log4j.logger.org.jgroups=info
#log4j.logger.org.infinispan=trace
\ No newline at end of file