Author: bstansberry(a)jboss.com
Date: 2010-02-22 21:11:31 -0500 (Mon, 22 Feb 2010)
New Revision: 18855
Modified:
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/OptimisticTransactionalAccessDelegate.java
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/PutFromLoadValidator.java
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/TransactionalAccessDelegate.java
core/trunk/cache-jbosscache/src/test/java/org/hibernate/test/cache/jbc/access/PutFromLoadValidatorUnitTestCase.java
Log:
[HHH-3817] Better handle race between putFromLoad and removal
Modified:
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/OptimisticTransactionalAccessDelegate.java
===================================================================
---
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/OptimisticTransactionalAccessDelegate.java 2010-02-23
01:12:57 UTC (rev 18854)
+++
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/OptimisticTransactionalAccessDelegate.java 2010-02-23
02:11:31 UTC (rev 18855)
@@ -63,7 +63,9 @@
*/
@Override
public void evict(Object key) throws CacheException {
- putValidator.keyRemoved(key);
+ if (!putValidator.invalidateKey(key)) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls for
key " + key + " from region " + region.getName());
+ }
region.ensureRegionRootExists();
Option opt = NonLockingDataVersion.getInvocationOption();
@@ -75,7 +77,9 @@
@Override
public void evictAll() throws CacheException
{
- putValidator.regionRemoved();
+ if (!putValidator.invalidateRegion()) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls
for region " + region.getName());
+ }
Transaction tx = region.suspend();
try {
@@ -116,16 +120,21 @@
if (!region.checkValid())
return false;
- if (!putValidator.isPutValid(key))
+ if (!putValidator.acquirePutFromLoadLock(key))
return false;
- region.ensureRegionRootExists();
-
- // We ignore minimalPutOverride. JBossCache putForExternalRead is
- // already about as minimal as we can get; it will promptly return
- // if it discovers that the node we want to write to already exists
- Option opt = getDataVersionOption(version, version);
- return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
+ try {
+ region.ensureRegionRootExists();
+
+ // We ignore minimalPutOverride. JBossCache putForExternalRead is
+ // already about as minimal as we can get; it will promptly return
+ // if it discovers that the node we want to write to already exists
+ Option opt = getDataVersionOption(version, version);
+ return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
+ }
+ finally {
+ putValidator.releasePutFromLoadLock(key);
+ }
}
@Override
@@ -134,19 +143,26 @@
if (!region.checkValid())
return false;
- if (!putValidator.isPutValid(key))
+ if (!putValidator.acquirePutFromLoadLock(key))
return false;
- region.ensureRegionRootExists();
-
- Option opt = getDataVersionOption(version, version);
- return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
+ try {
+ region.ensureRegionRootExists();
+
+ Option opt = getDataVersionOption(version, version);
+ return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
+ }
+ finally {
+ putValidator.releasePutFromLoadLock(key);
+ }
}
@Override
public void remove(Object key) throws CacheException {
- putValidator.keyRemoved(key);
+ if (!putValidator.invalidateKey(key)) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls for
key " + key + " from region " + region.getName());
+ }
// We remove whether or not the region is valid. Other nodes
// may have already restored the region so they need to
@@ -160,7 +176,9 @@
@Override
public void removeAll() throws CacheException {
- putValidator.regionRemoved();
+ if (!putValidator.invalidateRegion()) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls for
region " + region.getName());
+ }
Option opt = NonLockingDataVersion.getInvocationOption();
CacheHelper.removeAll(cache, regionFqn, opt);
}
Modified:
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/PutFromLoadValidator.java
===================================================================
---
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/PutFromLoadValidator.java 2010-02-23
01:12:57 UTC (rev 18854)
+++
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/PutFromLoadValidator.java 2010-02-23
02:11:31 UTC (rev 18855)
@@ -30,6 +30,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -46,6 +47,36 @@
* the potential to store stale data, since the data may have been removed from the
* database and the cache between the time when the data was read from the database
* and the actual call to <code>putFromLoad</code>.
+ * <p>
+ * The expected usage of this class by a thread that read the cache and did
+ * not find data is:
+ *
+ * <ol>
+ * <li> Call {@link #registerPendingPut(Object)}</li>
+ * <li> Read the database</li>
+ * <li> Call {@link #acquirePutFromLoadLock(Object)}
+ * <li> if above returns <code>false</code>, the thread should not
cache the data;
+ * only if above returns <code>true</code>, put data in the cache
and...</li>
+ * <li> then call {@link #releasePutFromLoadLock(Object)}</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ * The expected usage by a thread that is taking an action such that any pending
+ * <code>putFromLoad</code> may have stale data and should not cache it is to
either
+ * call
+ *
+ * <ul>
+ * <li> {@link #invalidateKey(Object)} (for a single key invalidation)</li>
+ * <li>or {@link #invalidateRegion()} (for a general invalidation all pending
puts)</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * This class also supports the concept of "naked puts", which are calls to
+ * {@link #acquirePutFromLoadLock(Object)} without a preceding {@link
#registerPendingPut(Object)}
+ * call.
+ * </p>
*
* @author Brian Stansberry
*
@@ -54,21 +85,21 @@
public class PutFromLoadValidator {
/**
* Period in ms after a removal during which a call to
- * {@link #isPutValid(Object)} that hasn't been
+ * {@link #acquirePutFromLoadLock(Object)} that hasn't been
* {@link #registerPendingPut(Object) pre-registered} (aka a "naked put")
* will return false.
*/
- public static final long NAKED_PUT_INVALIDATION_PERIOD = 10 * 1000;
+ public static final long NAKED_PUT_INVALIDATION_PERIOD = 20 * 1000;
- /** Period after which a pending put is placed in the over-age queue */
+ /** Period (in ms) after which a pending put is placed in the over-age queue */
private static final long PENDING_PUT_OVERAGE_PERIOD = 5 * 1000;
- /** Period before which we stop trying to clean out pending puts */
+ /** Period (in ms) before which we stop trying to clean out pending puts */
private static final long PENDING_PUT_RECENT_PERIOD = 2 * 1000;
/**
- * Period after which a pending put is never expected to come in and should
- * be cleaned
+ * Period (in ms) after which a pending put is never expected to come in
+ * and should be cleaned
*/
private static final long MAX_PENDING_PUT_DELAY = 2 * 60 * 1000;
@@ -128,7 +159,7 @@
* Creates a new PutFromLoadValidator.
*
* @param transactionManager
- * transaction manager to use to associated changes with a
+ * transaction manager to use to associate changes with a
* transaction; may be <code>null</code>
*/
public PutFromLoadValidator(TransactionManager transactionManager) {
@@ -153,41 +184,147 @@
// ----------------------------------------------------------------- Public
- public boolean isPutValid(Object key) {
+ /**
+ * Acquire a lock giving the calling thread the right to put data in the
+ * cache for the given key.
+ * <p>
+ * <strong>NOTE:</strong> A call to this method that returns
<code>true</code>
+ * should always be matched with a call to {@link #releasePutFromLoadLock(Object)}.
+ * </p>
+ *
+ * @param key the key
+ *
+ * @return <code>true</code> if the lock is acquired and the cache put
+ * can proceed; <code>false</code> if the data should not be cached
+ */
+ public boolean acquirePutFromLoadLock(Object key) {
+
boolean valid = false;
+ boolean locked = false;
long now = System.currentTimeMillis();
- PendingPutMap pending = pendingPuts.get(key);
- if (pending != null) {
- synchronized (pending) {
- PendingPut toCancel = pending.remove(getOwnerForPut());
- valid = toCancel != null;
- if (valid) {
- toCancel.completed = true;
- if (pending.size() == 0) {
- pendingPuts.remove(key);
+ // Important: Do cleanup before we acquire any locks so we
+ // don't deadlock with invalidateRegion
+ cleanOutdatedPendingPuts(now, true);
+
+ try {
+ PendingPutMap pending = pendingPuts.get(key);
+ if (pending != null) {
+ locked = pending.acquireLock(100, TimeUnit.MILLISECONDS);
+ if (locked) {
+ try {
+ PendingPut toCancel = pending.remove(getOwnerForPut());
+ if (toCancel != null) {
+ valid = !toCancel.completed;
+ toCancel.completed = true;
+ }
}
+ finally {
+ if (!valid) {
+ pending.releaseLock();
+ locked = false;
+ }
+ }
}
}
+ else {
+ // Key wasn't in pendingPuts, so either this is a "naked put"
+ // or regionRemoved has been called. Check if we can proceed
+ if (now > invalidationTimestamp) {
+ Long removedTime = recentRemovals.get(key);
+ if (removedTime == null || now > removedTime.longValue()) {
+ // It's legal to proceed. But we have to record this key
+ // in pendingPuts so releasePutFromLoadLock can find it.
+ // To do this we basically simulate a normal "register
+ // then acquire lock" pattern
+ registerPendingPut(key);
+ locked = acquirePutFromLoadLock(key);
+ valid = locked;
+ }
+ }
+ }
}
-
- if (!valid) {
- if (now > invalidationTimestamp) {
- Long removedTime = recentRemovals.get(key);
- if (removedTime == null || now > removedTime.longValue()) {
- valid = true;
+ catch (Throwable t) {
+
+ valid = false;
+
+ if (locked) {
+ PendingPutMap toRelease = pendingPuts.get(key);
+ if (toRelease != null) {
+ toRelease.releaseLock();
}
}
+
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ else if (t instanceof Error) {
+ throw (Error) t;
+ }
+ else {
+ throw new RuntimeException(t);
+ }
}
-
- cleanOutdatedPendingPuts(now, true);
-
+
return valid;
}
+
+ /**
+ * Releases the lock previously obtained by a call to
+ * {@link #acquirePutFromLoadLock(Object)} that returned <code>true</code>.
+ *
+ * @param key the key
+ */
+ public void releasePutFromLoadLock(Object key) {
+ PendingPutMap pending = pendingPuts.get(key);
+ if (pending != null) {
+ if (pending.size() == 0) {
+ pendingPuts.remove(key);
+ }
+ pending.releaseLock();
+ }
+ }
- public void keyRemoved(Object key) {
+ /**
+ * Invalidates any {@link #registerPendingPut(Object) previously registered pending
puts}
+ * ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)} will
+ * return <code>false</code>.
+ * <p>
+ * This method will block until any concurrent thread that has
+ * {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for
+ * the given key has released the lock. This allows the caller to be certain
+ * the putFromLoad will not execute after this method returns, possibly
+ * caching stale data.
+ * </p>
+ *
+ * @param key key identifying data whose pending puts should be invalidated
+ *
+ * @return <code>true</code> if the invalidation was successful;
<code>false</code>
+ * if a problem occured (which the caller should treat as an
+ * exception condition)
+ */
+ public boolean invalidateKey(Object key) {
+
+ boolean success = true;
+
// Invalidate any pending puts
- pendingPuts.remove(key);
+ PendingPutMap pending = pendingPuts.get(key);
+ if (pending != null) {
+ // This lock should be available very quickly, but we'll be
+ // very patient waiting for it as callers should treat not
+ // acquiring it as an exception condition
+ if (pending.acquireLock(60, TimeUnit.SECONDS)) {
+ try {
+ pending.invalidate();
+ }
+ finally {
+ pending.releaseLock();
+ }
+ }
+ else {
+ success = false;
+ }
+ }
// Record when this occurred to invalidate later naked puts
RecentRemoval removal = new RecentRemoval(key,
@@ -224,55 +361,96 @@
}
}
}
+
+ return success;
}
- public void regionRemoved() {
+ /**
+ * Invalidates all {@link #registerPendingPut(Object) previously registered pending
puts}
+ * ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)} will
+ * return <code>false</code>.
+ * <p>
+ * This method will block until any concurrent thread that has
+ * {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for
+ * the any key has released the lock. This allows the caller to be certain
+ * the putFromLoad will not execute after this method returns, possibly
+ * caching stale data.
+ * </p>
+ *
+ * @return <code>true</code> if the invalidation was successful;
<code>false</code>
+ * if a problem occured (which the caller should treat as an
+ * exception condition)
+ */
+ public boolean invalidateRegion() {
+
+ boolean ok = false;
invalidationTimestamp = System.currentTimeMillis()
+ this.nakedPutInvalidationPeriod;
- pendingLock.lock();
+
try {
+
+ // Acquire the lock for each entry to ensure any ongoing
+ // work associated with it is completed before we return
+ for (PendingPutMap entry : pendingPuts.values()) {
+ if (entry.acquireLock(60, TimeUnit.SECONDS)) {
+ try {
+ entry.invalidate();
+ }
+ finally {
+ entry.releaseLock();
+ }
+ }
+ else {
+ ok = false;
+ }
+ }
+
removalsLock.lock();
try {
- pendingPuts.clear();
- pendingQueue.clear();
- overagePendingQueue.clear();
recentRemovals.clear();
- removalsQueue.clear();
- earliestRemovalTimestamp = invalidationTimestamp;
+ removalsQueue.clear();
+
+ ok = true;
} finally {
removalsLock.unlock();
}
- } finally {
- pendingLock.unlock();
}
+ catch (Exception e) {
+ ok = false;
+ }
+ finally {
+ earliestRemovalTimestamp = invalidationTimestamp;
+ }
+
+ return ok;
}
/**
* Notifies this validator that it is expected that a database read followed
- * by a subsequent {@link #isPutValid(Object)} call will occur. The intent
+ * by a subsequent {@link #acquirePutFromLoadLock(Object)} call will occur. The intent
* is this method would be called following a cache miss wherein it is
* expected that a database read plus cache put will occur. Calling this
* method allows the validator to treat the subsequent
- * <code>isPutValid</code> as if the database read occurred when this
method
+ * <code>acquirePutFromLoadLock</code> as if the database read occurred when
this method
* was invoked. This allows the validator to compare the timestamp of this
* call against the timestamp of subsequent removal notifications. A put
* that occurs without this call preceding it is "naked"; i.e the validator
* must assume the put is not valid if any relevant removal has occurred
* within {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
*
- * @param key
- * key that will be used for subsequent put
+ * @param key key that will be used for subsequent cache put
*/
public void registerPendingPut(Object key) {
PendingPut pendingPut = new PendingPut(key, getOwnerForPut());
- PendingPutMap pendingForKey = new PendingPutMap();
- synchronized (pendingForKey) {
- for (;;) {
- PendingPutMap existing = pendingPuts.putIfAbsent(key,
- pendingForKey);
- if (existing != null && existing != pendingForKey) {
- synchronized (existing) {
+ PendingPutMap pendingForKey = new PendingPutMap(pendingPut);
+
+ for (;;) {
+ PendingPutMap existing = pendingPuts.putIfAbsent(key,
+ pendingForKey);
+ if (existing != null) {
+ if (existing.acquireLock(10, TimeUnit.SECONDS)) {
+ try {
existing.put(pendingPut);
PendingPutMap doublecheck = pendingPuts.putIfAbsent(
key, existing);
@@ -281,10 +459,17 @@
}
// else we hit a race and need to loop to try again
}
- } else {
- pendingForKey.put(pendingPut);
+ finally {
+ existing.releaseLock();
+ }
+ }
+ else {
+ // Can't get the lock; when we come back we'll be a "naked put"
break;
}
+ } else {
+ // normal case
+ break;
}
}
@@ -343,7 +528,9 @@
pendingLock.lock();
try {
pendingQueue.add(new WeakReference<PendingPut>(pendingPut));
- cleanOutdatedPendingPuts(pendingPut.timestamp, false);
+ if (pendingQueue.size() > 1) {
+ cleanOutdatedPendingPuts(pendingPut.timestamp, false);
+ }
} finally {
pendingLock.unlock();
}
@@ -356,9 +543,7 @@
pendingLock.lock();
}
try {
-
// Clean items out of the basic queue
-
long overaged = now - this.pendingPutOveragePeriod;
long recent = now - this.pendingPutRecentPeriod;
@@ -411,31 +596,62 @@
if (toClean != null) {
PendingPutMap map = pendingPuts.get(toClean.key);
if (map != null) {
- synchronized (map) {
- PendingPut cleaned = map.remove(toClean.owner);
- if (toClean.equals(cleaned) == false) {
- // Oops. Restore it.
- map.put(cleaned);
- } else if (map.size() == 0) {
- pendingPuts.remove(toClean.key);
+ if (map.acquireLock(100, TimeUnit.MILLISECONDS)) {
+ try {
+ PendingPut cleaned = map.remove(toClean.owner);
+ if (toClean.equals(cleaned) == false) {
+ // Oops. Restore it.
+ map.put(cleaned);
+ } else if (map.size() == 0) {
+ pendingPuts.remove(toClean.key);
+ }
}
+ finally {
+ map.releaseLock();
+ }
}
+ else {
+ // Something's gone wrong and the lock isn't being released.
+ // We removed toClean from the queue and need to restore it
+ // TODO this is pretty dodgy
+ restorePendingPut(toClean);
+ }
}
}
}
+
+ private void restorePendingPut(PendingPut toRestore) {
+ pendingLock.lock();
+ try {
+ // Give it a new lease on life so it's not out of order. We could
+ // scan the queue and put toRestore back at the front, but then
+ // we'll just immediately try removing it again; instead we
+ // let it cycle through the queue again
+ toRestore.refresh();
+ pendingQueue.add(new WeakReference<PendingPut>(toRestore));
+ }
+ finally {
+ pendingLock.unlock();
+ }
+ }
/**
* Lazy-initialization map for PendingPut. Optimized for the expected usual
* case where only a single put is pending for a given key.
*
* This class is NOT THREAD SAFE. All operations on it must be performed
- * with the object monitor held.
+ * with the lock held.
*/
private static class PendingPutMap {
private PendingPut singlePendingPut;
private Map<Object, PendingPut> fullMap;
-
+ private final Lock lock = new ReentrantLock();
+
+ PendingPutMap(PendingPut singleItem) {
+ this.singlePendingPut = singleItem;
+ }
+
public void put(PendingPut pendingPut) {
if (singlePendingPut == null) {
if (fullMap == null) {
@@ -471,18 +687,46 @@
return fullMap == null ? (singlePendingPut == null ? 0 : 1)
: fullMap.size();
}
+
+ public boolean acquireLock(long time, TimeUnit unit) {
+ try {
+ return lock.tryLock(time, unit);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ public void releaseLock() {
+ lock.unlock();
+ }
+
+ public void invalidate() {
+ if (singlePendingPut != null) {
+ singlePendingPut.completed = true;
+ }
+ else if (fullMap != null) {
+ for (PendingPut pp : fullMap.values()) {
+ pp.completed = true;
+ }
+ }
+ }
}
private static class PendingPut {
private final Object key;
private final Object owner;
- private final long timestamp = System.currentTimeMillis();
+ private long timestamp = System.currentTimeMillis();
private volatile boolean completed;
private PendingPut(Object key, Object owner) {
this.key = key;
this.owner = owner;
}
+
+ private void refresh() {
+ timestamp = System.currentTimeMillis();
+ }
}
Modified:
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/TransactionalAccessDelegate.java
===================================================================
---
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/TransactionalAccessDelegate.java 2010-02-23
01:12:57 UTC (rev 18854)
+++
core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/TransactionalAccessDelegate.java 2010-02-23
02:11:31 UTC (rev 18855)
@@ -83,12 +83,17 @@
if (!region.checkValid())
return false;
- if (!putValidator.isPutValid(key))
+ if (!putValidator.acquirePutFromLoadLock(key))
return false;
- region.ensureRegionRootExists();
-
- return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
+ try {
+ region.ensureRegionRootExists();
+
+ return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
+ }
+ finally {
+ putValidator.releasePutFromLoadLock(key);
+ }
}
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version,
boolean minimalPutOverride)
@@ -97,15 +102,20 @@
if (!region.checkValid())
return false;
- if (!putValidator.isPutValid(key))
+ if (!putValidator.acquirePutFromLoadLock(key))
return false;
-
- region.ensureRegionRootExists();
-
- // We ignore minimalPutOverride. JBossCache putForExternalRead is
- // already about as minimal as we can get; it will promptly return
- // if it discovers that the node we want to write to already exists
- return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
+
+ try {
+ region.ensureRegionRootExists();
+
+ // We ignore minimalPutOverride. JBossCache putForExternalRead is
+ // already about as minimal as we can get; it will promptly return
+ // if it discovers that the node we want to write to already exists
+ return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
+ }
+ finally {
+ putValidator.releasePutFromLoadLock(key);
+ }
}
public SoftLock lockItem(Object key, Object version) throws CacheException {
@@ -163,7 +173,9 @@
public void remove(Object key) throws CacheException {
- putValidator.keyRemoved(key);
+ if (!putValidator.invalidateKey(key)) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls for
key " + key + " from region " + region.getName());
+ }
// We remove whether or not the region is valid. Other nodes
// may have already restored the region so they need to
@@ -175,13 +187,17 @@
}
public void removeAll() throws CacheException {
- putValidator.regionRemoved();
+ if (!putValidator.invalidateRegion()) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls for
region " + region.getName());
+ }
CacheHelper.removeAll(cache, regionFqn);
}
public void evict(Object key) throws CacheException {
- putValidator.keyRemoved(key);
+ if (!putValidator.invalidateKey(key)) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls for
key " + key + " from region " + region.getName());
+ }
region.ensureRegionRootExists();
@@ -189,7 +205,10 @@
}
public void evictAll() throws CacheException {
- putValidator.regionRemoved();
+ if (!putValidator.invalidateRegion()) {
+ throw new CacheException("Failed to invalidate pending putFromLoad calls for
region " + region.getName());
+ }
+
Transaction tx = region.suspend();
try {
region.ensureRegionRootExists();
Modified:
core/trunk/cache-jbosscache/src/test/java/org/hibernate/test/cache/jbc/access/PutFromLoadValidatorUnitTestCase.java
===================================================================
---
core/trunk/cache-jbosscache/src/test/java/org/hibernate/test/cache/jbc/access/PutFromLoadValidatorUnitTestCase.java 2010-02-23
01:12:57 UTC (rev 18854)
+++
core/trunk/cache-jbosscache/src/test/java/org/hibernate/test/cache/jbc/access/PutFromLoadValidatorUnitTestCase.java 2010-02-23
02:11:31 UTC (rev 18855)
@@ -23,11 +23,15 @@
*/
package org.hibernate.test.cache.jbc.access;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -87,7 +91,7 @@
if (transactional) {
tm.begin();
}
- assertTrue(testee.isPutValid(KEY1));
+ assertTrue(testee.acquirePutFromLoadLock(KEY1));
}
public void testRegisteredPut() throws Exception {
@@ -105,7 +109,16 @@
tm.begin();
}
testee.registerPendingPut(KEY1);
- assertTrue(testee.isPutValid(KEY1));
+
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertTrue(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
public void testNakedPutAfterKeyRemoval() throws Exception {
@@ -129,14 +142,23 @@
PutFromLoadValidator testee = new PutFromLoadValidator(
transactional ? tm : null);
if (removeRegion) {
- testee.regionRemoved();
+ testee.invalidateRegion();
} else {
- testee.keyRemoved(KEY1);
+ testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
- assertFalse(testee.isPutValid(KEY1));
+
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertFalse(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
@@ -163,15 +185,24 @@
PutFromLoadValidator testee = new PutFromLoadValidator(
transactional ? tm : null);
if (removeRegion) {
- testee.regionRemoved();
+ testee.invalidateRegion();
} else {
- testee.keyRemoved(KEY1);
+ testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
- assertTrue(testee.isPutValid(KEY1));
+
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertTrue(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
public void testRegisteredPutWithInterveningKeyRemoval() throws Exception {
@@ -202,11 +233,20 @@
}
testee.registerPendingPut(KEY1);
if (removeRegion) {
- testee.regionRemoved();
+ testee.invalidateRegion();
} else {
- testee.keyRemoved(KEY1);
+ testee.invalidateKey(KEY1);
}
- assertFalse(testee.isPutValid(KEY1));
+
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertFalse(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
@@ -232,15 +272,24 @@
PutFromLoadValidator testee = new TestValidator(transactional ? tm
: null, 100, 1000, 500, 10000);
if (removeRegion) {
- testee.regionRemoved();
+ testee.invalidateRegion();
} else {
- testee.keyRemoved(KEY1);
+ testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
Thread.sleep(110);
- assertTrue(testee.isPutValid(KEY1));
+
+ boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ assertTrue(lockable);
+ }
+ finally {
+ if (lockable) {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
}
@@ -268,8 +317,13 @@
testee.registerPendingPut(KEY1);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
- if (testee.isPutValid(KEY1)) {
- success.incrementAndGet();
+ if (testee.acquirePutFromLoadLock(KEY1)) {
+ try {
+ success.incrementAndGet();
+ }
+ finally {
+ testee.releasePutFromLoadLock(KEY1);
+ }
}
finishedLatch.countDown();
}
@@ -284,7 +338,7 @@
// Start with a removal so the "isPutValid" calls will fail if
// any of the concurrent activity isn't handled properly
- testee.regionRemoved();
+ testee.invalidateRegion();
// Do the registration + isPutValid calls
executor.execute(r);
@@ -303,13 +357,13 @@
*/
public void testRemovalCleanup() throws Exception {
TestValidator testee = new TestValidator(null, 200, 1000, 500, 10000);
- testee.keyRemoved("KEY1");
- testee.keyRemoved("KEY2");
+ testee.invalidateKey("KEY1");
+ testee.invalidateKey("KEY2");
Thread.sleep(210);
assertEquals(2, testee.getRemovalQueueLength());
- testee.keyRemoved("KEY1");
+ testee.invalidateKey("KEY1");
assertEquals(2, testee.getRemovalQueueLength());
- testee.keyRemoved("KEY2");
+ testee.invalidateKey("KEY2");
assertEquals(2, testee.getRemovalQueueLength());
}
@@ -324,7 +378,7 @@
// Start with a regionRemoval so we can confirm at the end that all
// registrations have been cleaned out
- testee.regionRemoved();
+ testee.invalidateRegion();
testee.registerPendingPut("1");
testee.registerPendingPut("2");
@@ -332,8 +386,10 @@
testee.registerPendingPut("4");
testee.registerPendingPut("5");
testee.registerPendingPut("6");
- testee.isPutValid("6");
- testee.isPutValid("2");
+ testee.acquirePutFromLoadLock("6");
+ testee.releasePutFromLoadLock("6");
+ testee.acquirePutFromLoadLock("2");
+ testee.releasePutFromLoadLock("2");
// ppq = [1,2(c),3,4,5,6(c)]
assertEquals(6, testee.getPendingPutQueueLength());
assertEquals(0, testee.getOveragePendingPutQueueLength());
@@ -358,7 +414,8 @@
// Sleep past "maxPendingPutDelay"
Thread.sleep(310);
- testee.isPutValid("3");
+ testee.acquirePutFromLoadLock("3");
+ testee.releasePutFromLoadLock("3");
// White box -- should have cleaned out 1 (overage) and
// moved 7 to overage queue
// oppq = [3(c),4,5,7] ppq=[8]
@@ -380,21 +437,116 @@
// Validate that only expected items can do puts, thus indirectly
// proving the others have been cleaned out of pendingPuts map
- assertFalse(testee.isPutValid("1"));
+ boolean locked = testee.acquirePutFromLoadLock("1");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
// 5 was overage, so should have been cleaned
assertEquals(2, testee.getOveragePendingPutQueueLength());
- assertFalse(testee.isPutValid("2"));
+ locked = testee.acquirePutFromLoadLock("2");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
// 7 was overage, so should have been cleaned
assertEquals(1, testee.getOveragePendingPutQueueLength());
- assertFalse(testee.isPutValid("3"));
- assertFalse(testee.isPutValid("4"));
- assertFalse(testee.isPutValid("5"));
- assertFalse(testee.isPutValid("6"));
- assertFalse(testee.isPutValid("7"));
- assertTrue(testee.isPutValid("8"));
+ locked = testee.acquirePutFromLoadLock("3");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
+ locked = testee.acquirePutFromLoadLock("4");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
+ locked = testee.acquirePutFromLoadLock("5");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
+ locked = testee.acquirePutFromLoadLock("1");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(testee.acquirePutFromLoadLock("6"));
+ locked = testee.acquirePutFromLoadLock("7");
+ if (locked) {
+ testee.releasePutFromLoadLock("1");
+ }
+ assertFalse(locked);
+ assertTrue(testee.acquirePutFromLoadLock("8"));
+ testee.releasePutFromLoadLock("8");
tm.resume(tx);
- assertTrue(testee.isPutValid("7"));
+ assertTrue(testee.acquirePutFromLoadLock("7"));
+ testee.releasePutFromLoadLock("7");
}
+
+ public void testInvalidateKeyBlocksForInProgressPut() throws Exception {
+ invalidationBlocksForInProgressPutTest(true);
+ }
+
+ public void testInvalidateRegionBlocksForInProgressPut() throws Exception {
+ invalidationBlocksForInProgressPutTest(false);
+ }
+
+ private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws
Exception {
+ final PutFromLoadValidator testee = new PutFromLoadValidator(null);
+ final CountDownLatch removeLatch = new CountDownLatch(1);
+ final CountDownLatch pferLatch = new CountDownLatch(1);
+ final AtomicReference<Object> cache = new
AtomicReference<Object>("INITIAL");
+
+ Callable<Boolean> pferCallable = new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ testee.registerPendingPut(KEY1);
+ if (testee.acquirePutFromLoadLock(KEY1)) {
+ try {
+ removeLatch.countDown();
+ pferLatch.await();
+ cache.set("PFER");
+ return Boolean.TRUE;
+ }
+ finally {
+ testee.releasePutFromLoadLock(KEY1);
+ }
+ }
+ return Boolean.FALSE;
+ }
+ };
+
+ Callable<Void> invalidateCallable = new Callable<Void>() {
+ public Void call() throws Exception {
+ removeLatch.await();
+ if (keyOnly) {
+ testee.invalidateKey(KEY1);
+ }
+ else {
+ testee.invalidateRegion();
+ }
+ cache.set(null);
+ return null;
+ }
+ };
+
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ Future<Boolean> pferFuture = executorService.submit(pferCallable);
+ Future<Void> invalidateFuture = executorService.submit(invalidateCallable);
+
+ try {
+ invalidateFuture.get(1, TimeUnit.SECONDS);
+ fail("invalidateFuture did not block");
+ }
+ catch (TimeoutException good) {}
+
+ pferLatch.countDown();
+
+ assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
+ invalidateFuture.get(5, TimeUnit.SECONDS);
+
+ assertNull(cache.get());
+
+ }
private static class TestValidator extends PutFromLoadValidator {
@@ -408,19 +560,16 @@
@Override
public int getOveragePendingPutQueueLength() {
- // TODO Auto-generated method stub
return super.getOveragePendingPutQueueLength();
}
@Override
public int getPendingPutQueueLength() {
- // TODO Auto-generated method stub
return super.getPendingPutQueueLength();
}
@Override
public int getRemovalQueueLength() {
- // TODO Auto-generated method stub
return super.getRemovalQueueLength();
}