[hibernate-commits] Hibernate SVN: r18855 - in core/trunk/cache-jbosscache/src: test/java/org/hibernate/test/cache/jbc/access and 1 other directory.

hibernate-commits at lists.jboss.org hibernate-commits at lists.jboss.org
Mon Feb 22 21:11:32 EST 2010


Author: bstansberry at jboss.com
Date: 2010-02-22 21:11:31 -0500 (Mon, 22 Feb 2010)
New Revision: 18855

Modified:
   core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/OptimisticTransactionalAccessDelegate.java
   core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/PutFromLoadValidator.java
   core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/TransactionalAccessDelegate.java
   core/trunk/cache-jbosscache/src/test/java/org/hibernate/test/cache/jbc/access/PutFromLoadValidatorUnitTestCase.java
Log:
[HHH-3817] Better handle race between putFromLoad and removal

Modified: core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/OptimisticTransactionalAccessDelegate.java
===================================================================
--- core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/OptimisticTransactionalAccessDelegate.java	2010-02-23 01:12:57 UTC (rev 18854)
+++ core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/OptimisticTransactionalAccessDelegate.java	2010-02-23 02:11:31 UTC (rev 18855)
@@ -63,7 +63,9 @@
      */
     @Override
     public void evict(Object key) throws CacheException {
-        putValidator.keyRemoved(key);
+    	if (!putValidator.invalidateKey(key)) {
+    		throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
+    	}
         region.ensureRegionRootExists();
 
         Option opt = NonLockingDataVersion.getInvocationOption();
@@ -75,7 +77,9 @@
     @Override
     public void evictAll() throws CacheException
     {
-       putValidator.regionRemoved();
+    	if (!putValidator.invalidateRegion()) {
+     	   throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
+        }        
        
        Transaction tx = region.suspend();
        try {        
@@ -116,16 +120,21 @@
         if (!region.checkValid())
             return false;
         
-        if (!putValidator.isPutValid(key))
+        if (!putValidator.acquirePutFromLoadLock(key))
            return false;
         
-        region.ensureRegionRootExists();
-
-        // We ignore minimalPutOverride. JBossCache putForExternalRead is
-        // already about as minimal as we can get; it will promptly return
-        // if it discovers that the node we want to write to already exists
-        Option opt = getDataVersionOption(version, version);
-        return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
+        try {
+	        region.ensureRegionRootExists();
+	
+	        // We ignore minimalPutOverride. JBossCache putForExternalRead is
+	        // already about as minimal as we can get; it will promptly return
+	        // if it discovers that the node we want to write to already exists
+	        Option opt = getDataVersionOption(version, version);
+	        return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
+        }
+        finally {
+        	putValidator.releasePutFromLoadLock(key);
+        }
     }
 
     @Override
@@ -134,19 +143,26 @@
         if (!region.checkValid())
             return false;
         
-        if (!putValidator.isPutValid(key))
+        if (!putValidator.acquirePutFromLoadLock(key))
            return false;
         
-        region.ensureRegionRootExists();
-
-        Option opt = getDataVersionOption(version, version);
-        return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
+        try {
+	        region.ensureRegionRootExists();
+	
+	        Option opt = getDataVersionOption(version, version);
+	        return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
+        }
+        finally {
+        	putValidator.releasePutFromLoadLock(key);
+        }
     }
 
     @Override
     public void remove(Object key) throws CacheException {
        
-        putValidator.keyRemoved(key);
+    	if (!putValidator.invalidateKey(key)) {
+    		throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
+    	}
         
         // We remove whether or not the region is valid. Other nodes
         // may have already restored the region so they need to
@@ -160,7 +176,9 @@
 
     @Override
     public void removeAll() throws CacheException {
-       putValidator.regionRemoved();
+       if (!putValidator.invalidateRegion()) {
+    	   throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
+       }
        Option opt = NonLockingDataVersion.getInvocationOption();
        CacheHelper.removeAll(cache, regionFqn, opt);
     }

Modified: core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/PutFromLoadValidator.java
===================================================================
--- core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/PutFromLoadValidator.java	2010-02-23 01:12:57 UTC (rev 18854)
+++ core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/PutFromLoadValidator.java	2010-02-23 02:11:31 UTC (rev 18855)
@@ -30,6 +30,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -46,6 +47,36 @@
  * the potential to store stale data, since the data may have been removed from the
  * database and the cache between the time when the data was read from the database 
  * and the actual call to <code>putFromLoad</code>.
+ * <p>
+ * The expected usage of this class by a thread that read the cache and did
+ * not find data is:
+ * 
+ * <ol>
+ * <li> Call {@link #registerPendingPut(Object)}</li>
+ * <li> Read the database</li>
+ * <li> Call {@link #acquirePutFromLoadLock(Object)}
+ * <li> if above returns <code>false</code>, the thread should not cache the data;
+ *      only if above returns <code>true</code>, put data in the cache and...</li>
+ * <li> then call {@link #releasePutFromLoadLock(Object)}</li>
+ * </ol>
+ * </p>
+ * 
+ * <p>
+ * The expected usage by a thread that is taking an action such that any pending
+ * <code>putFromLoad</code> may have stale data and should not cache it is to either
+ * call
+ * 
+ * <ul>
+ * <li> {@link #invalidateKey(Object)} (for a single key invalidation)</li>
+ * <li>or {@link #invalidateRegion()} (for a general invalidation all pending puts)</li>
+ * </ul>
+ * </p>
+ * 
+ * <p>
+ * This class also supports the concept of "naked puts", which are calls to
+ * {@link #acquirePutFromLoadLock(Object)} without a preceding {@link #registerPendingPut(Object)}
+ * call. 
+ * </p>
  *
  * @author Brian Stansberry
  * 
@@ -54,21 +85,21 @@
 public class PutFromLoadValidator {
 	/**
 	 * Period in ms after a removal during which a call to
-	 * {@link #isPutValid(Object)} that hasn't been
+	 * {@link #acquirePutFromLoadLock(Object)} that hasn't been
 	 * {@link #registerPendingPut(Object) pre-registered} (aka a "naked put")
 	 * will return false.
 	 */
-	public static final long NAKED_PUT_INVALIDATION_PERIOD = 10 * 1000;
+	public static final long NAKED_PUT_INVALIDATION_PERIOD = 20 * 1000;
 
-	/** Period after which a pending put is placed in the over-age queue */
+	/** Period (in ms) after which a pending put is placed in the over-age queue */
 	private static final long PENDING_PUT_OVERAGE_PERIOD = 5 * 1000;
 
-	/** Period before which we stop trying to clean out pending puts */
+	/** Period (in ms) before which we stop trying to clean out pending puts */
 	private static final long PENDING_PUT_RECENT_PERIOD = 2 * 1000;
 
 	/**
-	 * Period after which a pending put is never expected to come in and should
-	 * be cleaned
+	 * Period (in ms) after which a pending put is never expected to come in 
+	 * and should be cleaned
 	 */
 	private static final long MAX_PENDING_PUT_DELAY = 2 * 60 * 1000;
 
@@ -128,7 +159,7 @@
 	 * Creates a new PutFromLoadValidator.
 	 * 
 	 * @param transactionManager
-	 *            transaction manager to use to associated changes with a
+	 *            transaction manager to use to associate changes with a
 	 *            transaction; may be <code>null</code>
 	 */
 	public PutFromLoadValidator(TransactionManager transactionManager) {
@@ -153,41 +184,147 @@
 
 	// ----------------------------------------------------------------- Public
 
-	public boolean isPutValid(Object key) {
+	/**
+	 * Acquire a lock giving the calling thread the right to put data in the
+	 * cache for the given key.
+	 * <p>
+	 * <strong>NOTE:</strong> A call to this method that returns <code>true</code>
+	 * should always be matched with a call to {@link #releasePutFromLoadLock(Object)}.
+	 * </p>
+	 * 
+	 * @param key the key
+	 * 
+	 * @return <code>true</code> if the lock is acquired and the cache put
+	 *         can proceed; <code>false</code> if the data should not be cached
+	 */
+	public boolean acquirePutFromLoadLock(Object key) {	
+		
 		boolean valid = false;
+		boolean locked = false;
 		long now = System.currentTimeMillis();
 
-		PendingPutMap pending = pendingPuts.get(key);
-		if (pending != null) {
-			synchronized (pending) {
-				PendingPut toCancel = pending.remove(getOwnerForPut());
-				valid = toCancel != null;
-				if (valid) {
-					toCancel.completed = true;
-					if (pending.size() == 0) {
-						pendingPuts.remove(key);
+		// Important: Do cleanup before we acquire any locks so we
+		// don't deadlock with invalidateRegion
+		cleanOutdatedPendingPuts(now, true);
+		
+		try {
+			PendingPutMap pending = pendingPuts.get(key);
+			if (pending != null) {
+				locked = pending.acquireLock(100, TimeUnit.MILLISECONDS);
+				if (locked) {
+					try {
+						PendingPut toCancel = pending.remove(getOwnerForPut());
+						if (toCancel != null) {  
+							valid = !toCancel.completed;
+							toCancel.completed = true;
+						}
 					}
+					finally {
+						if (!valid) {
+							pending.releaseLock();
+							locked = false;
+						}
+					}
 				}
 			}
+			else {
+				// Key wasn't in pendingPuts, so either this is a "naked put"
+				// or regionRemoved has been called. Check if we can proceed
+				if (now > invalidationTimestamp) {
+					Long removedTime = recentRemovals.get(key);
+					if (removedTime == null || now > removedTime.longValue()) {
+						// It's legal to proceed. But we have to record this key
+						// in pendingPuts so releasePutFromLoadLock can find it.
+						// To do this we basically simulate a normal "register
+						// then acquire lock" pattern
+						registerPendingPut(key);
+						locked = acquirePutFromLoadLock(key);
+						valid = locked;
+					}
+				}
+			}			
 		}
-
-		if (!valid) {
-			if (now > invalidationTimestamp) {
-				Long removedTime = recentRemovals.get(key);
-				if (removedTime == null || now > removedTime.longValue()) {
-					valid = true;
+		catch (Throwable t) {
+			
+			valid = false;
+			
+			if (locked) {
+				PendingPutMap toRelease = pendingPuts.get(key);
+				if (toRelease != null) {
+					toRelease.releaseLock();
 				}
 			}
+			
+			if (t instanceof RuntimeException) {
+				throw (RuntimeException) t;
+			}			
+			else if (t instanceof Error) {
+				throw (Error) t;
+			}			
+			else {
+				throw new RuntimeException(t);
+			}
 		}
-
-		cleanOutdatedPendingPuts(now, true);
-
+		
 		return valid;
 	}
+	
+	/**
+	 * Releases the lock previously obtained by a call to
+	 * {@link #acquirePutFromLoadLock(Object)} that returned <code>true</code>.
+	 * 
+	 * @param key the key
+	 */
+	public void releasePutFromLoadLock(Object key) {
+		PendingPutMap pending = pendingPuts.get(key);
+		if (pending != null) {
+			if (pending.size() == 0) {
+				pendingPuts.remove(key);
+			}
+			pending.releaseLock();
+		}
+	}
 
-	public void keyRemoved(Object key) {
+	/**
+	 * Invalidates any {@link #registerPendingPut(Object) previously registered pending puts} 
+	 * ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)} will
+	 * return <code>false</code>.
+	 * <p>
+	 * This method will block until any concurrent thread that has 
+	 * {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for
+	 * the given key has released the lock. This allows the caller to be certain
+	 * the putFromLoad will not execute after this method returns, possibly
+	 * caching stale data.
+	 * </p>
+	 * 
+	 * @param key key identifying data whose pending puts should be invalidated
+	 * 
+	 * @return <code>true</code> if the invalidation was successful; <code>false</code>
+	 *         if a problem occured (which the caller should treat as an
+	 *         exception condition)
+	 */
+	public boolean invalidateKey(Object key) {
+		
+		boolean success = true;
+		
 		// Invalidate any pending puts
-		pendingPuts.remove(key);
+		PendingPutMap pending = pendingPuts.get(key);		
+		if (pending != null) {
+			// This lock should be available very quickly, but we'll be
+			// very patient waiting for it as callers should treat not
+			// acquiring it as an exception condition
+			if (pending.acquireLock(60, TimeUnit.SECONDS)) {
+				try {
+					pending.invalidate();
+				}
+				finally {
+					pending.releaseLock();
+				}
+			}
+			else {
+				success = false;
+			}
+		}
 
 		// Record when this occurred to invalidate later naked puts
 		RecentRemoval removal = new RecentRemoval(key,
@@ -224,55 +361,96 @@
 				}
 			}
 		}
+		
+		return success;
 	}
 
-	public void regionRemoved() {
+	/**
+	 * Invalidates all {@link #registerPendingPut(Object) previously registered pending puts} 
+	 * ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)} will
+	 * return <code>false</code>.
+	 * <p>
+	 * This method will block until any concurrent thread that has 
+	 * {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for
+	 * the any key has released the lock. This allows the caller to be certain
+	 * the putFromLoad will not execute after this method returns, possibly
+	 * caching stale data.
+	 * </p>
+	 * 
+	 * @return <code>true</code> if the invalidation was successful; <code>false</code>
+	 *         if a problem occured (which the caller should treat as an
+	 *         exception condition)
+	 */
+	public boolean invalidateRegion() {
+		
+		boolean ok = false;
 		invalidationTimestamp = System.currentTimeMillis()
 				+ this.nakedPutInvalidationPeriod;
-		pendingLock.lock();
+		
 		try {
+			
+			// Acquire the lock for each entry to ensure any ongoing
+			// work associated with it is completed before we return
+			for (PendingPutMap entry : pendingPuts.values()) {
+				if (entry.acquireLock(60, TimeUnit.SECONDS)) {
+					try {
+						entry.invalidate();
+					}
+					finally {
+						entry.releaseLock();
+					}
+				}
+				else {
+					ok = false;
+				}
+			}
+			
 			removalsLock.lock();
 			try {
-				pendingPuts.clear();
-				pendingQueue.clear();
-				overagePendingQueue.clear();
 				recentRemovals.clear();
-				removalsQueue.clear();
-				earliestRemovalTimestamp = invalidationTimestamp;
+				removalsQueue.clear();				
+				
+				ok = true;
 
 			} finally {
 				removalsLock.unlock();
 			}
-		} finally {
-			pendingLock.unlock();
 		}
+		catch (Exception e) {
+			ok = false;
+		}
+		finally {
+			earliestRemovalTimestamp = invalidationTimestamp;
+		}
+		
+		return ok;
 	}
 
 	/**
 	 * Notifies this validator that it is expected that a database read followed
-	 * by a subsequent {@link #isPutValid(Object)} call will occur. The intent
+	 * by a subsequent {@link #acquirePutFromLoadLock(Object)} call will occur. The intent
 	 * is this method would be called following a cache miss wherein it is
 	 * expected that a database read plus cache put will occur. Calling this
 	 * method allows the validator to treat the subsequent
-	 * <code>isPutValid</code> as if the database read occurred when this method
+	 * <code>acquirePutFromLoadLock</code> as if the database read occurred when this method
 	 * was invoked. This allows the validator to compare the timestamp of this
 	 * call against the timestamp of subsequent removal notifications. A put
 	 * that occurs without this call preceding it is "naked"; i.e the validator
 	 * must assume the put is not valid if any relevant removal has occurred
 	 * within {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
 	 * 
-	 * @param key
-	 *            key that will be used for subsequent put
+	 * @param key key that will be used for subsequent cache put
 	 */
 	public void registerPendingPut(Object key) {
 		PendingPut pendingPut = new PendingPut(key, getOwnerForPut());
-		PendingPutMap pendingForKey = new PendingPutMap();
-		synchronized (pendingForKey) {
-			for (;;) {
-				PendingPutMap existing = pendingPuts.putIfAbsent(key,
-						pendingForKey);
-				if (existing != null && existing != pendingForKey) {
-					synchronized (existing) {
+		PendingPutMap pendingForKey = new PendingPutMap(pendingPut);
+		
+		for (;;) {
+			PendingPutMap existing = pendingPuts.putIfAbsent(key,
+					pendingForKey);
+			if (existing != null) {
+				if (existing.acquireLock(10, TimeUnit.SECONDS)) {
+					try {
 						existing.put(pendingPut);
 						PendingPutMap doublecheck = pendingPuts.putIfAbsent(
 								key, existing);
@@ -281,10 +459,17 @@
 						}
 						// else we hit a race and need to loop to try again
 					}
-				} else {
-					pendingForKey.put(pendingPut);
+					finally {
+						existing.releaseLock();
+					}
+				}
+				else {
+					// Can't get the lock; when we come back we'll be a "naked put"
 					break;
 				}
+			} else {
+				// normal case
+				break;
 			}
 		}
 
@@ -343,7 +528,9 @@
 		pendingLock.lock();
 		try {
 			pendingQueue.add(new WeakReference<PendingPut>(pendingPut));
-			cleanOutdatedPendingPuts(pendingPut.timestamp, false);
+			if (pendingQueue.size() > 1) {
+				cleanOutdatedPendingPuts(pendingPut.timestamp, false);
+			}
 		} finally {
 			pendingLock.unlock();
 		}
@@ -356,9 +543,7 @@
 			pendingLock.lock();
 		}
 		try {
-
 			// Clean items out of the basic queue
-
 			long overaged = now - this.pendingPutOveragePeriod;
 			long recent = now - this.pendingPutRecentPeriod;
 
@@ -411,31 +596,62 @@
 		if (toClean != null) {
 			PendingPutMap map = pendingPuts.get(toClean.key);
 			if (map != null) {
-				synchronized (map) {
-					PendingPut cleaned = map.remove(toClean.owner);
-					if (toClean.equals(cleaned) == false) {
-						// Oops. Restore it.
-						map.put(cleaned);
-					} else if (map.size() == 0) {
-						pendingPuts.remove(toClean.key);
+				if (map.acquireLock(100, TimeUnit.MILLISECONDS)) {
+					try {
+						PendingPut cleaned = map.remove(toClean.owner);
+						if (toClean.equals(cleaned) == false) {
+							// Oops. Restore it.
+							map.put(cleaned);
+						} else if (map.size() == 0) {
+							pendingPuts.remove(toClean.key);
+						}
 					}
+					finally {
+						map.releaseLock();
+					}
 				}
+				else {
+					// Something's gone wrong and the lock isn't being released. 
+					// We removed toClean from the queue and need to restore it
+					// TODO this is pretty dodgy
+					restorePendingPut(toClean);
+				}
 			}
 		}
 
 	}
+		
+	private void restorePendingPut(PendingPut toRestore) {
+		pendingLock.lock();
+		try {
+			// Give it a new lease on life so it's not out of order. We could
+			// scan the queue and put toRestore back at the front, but then
+			// we'll just immediately try removing it again; instead we
+			// let it cycle through the queue again
+			toRestore.refresh();
+			pendingQueue.add(new WeakReference<PendingPut>(toRestore));
+		}
+		finally {
+			pendingLock.unlock();
+		}
+	}
 
 	/**
 	 * Lazy-initialization map for PendingPut. Optimized for the expected usual
 	 * case where only a single put is pending for a given key.
 	 * 
 	 * This class is NOT THREAD SAFE. All operations on it must be performed
-	 * with the object monitor held.
+	 * with the lock held.
 	 */
 	private static class PendingPutMap {
 		private PendingPut singlePendingPut;
 		private Map<Object, PendingPut> fullMap;
-
+		private final Lock lock = new ReentrantLock();
+		
+		PendingPutMap(PendingPut singleItem) {
+			this.singlePendingPut = singleItem;
+		}
+		
 		public void put(PendingPut pendingPut) {
 			if (singlePendingPut == null) {
 				if (fullMap == null) {
@@ -471,18 +687,46 @@
 			return fullMap == null ? (singlePendingPut == null ? 0 : 1)
 					: fullMap.size();
 		}
+		
+		public boolean acquireLock(long time, TimeUnit unit) {
+			try {
+				return lock.tryLock(time, unit);
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				return false;
+			}
+		}
+		
+		public void releaseLock() {
+			lock.unlock();
+		}
+		
+		public void invalidate() {
+			if (singlePendingPut != null) {
+				singlePendingPut.completed = true;
+			}
+			else if (fullMap != null) {
+				for (PendingPut pp : fullMap.values()) {
+					pp.completed = true;
+				}
+			}
+		}
 	}
 
 	private static class PendingPut {
 		private final Object key;
 		private final Object owner;
-		private final long timestamp = System.currentTimeMillis();
+		private long timestamp = System.currentTimeMillis();
 		private volatile boolean completed;
 
 		private PendingPut(Object key, Object owner) {
 			this.key = key;
 			this.owner = owner;
 		}
+		
+		private void refresh() {
+			timestamp = System.currentTimeMillis();
+		}
 
 	}
 

Modified: core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/TransactionalAccessDelegate.java
===================================================================
--- core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/TransactionalAccessDelegate.java	2010-02-23 01:12:57 UTC (rev 18854)
+++ core/trunk/cache-jbosscache/src/main/java/org/hibernate/cache/jbc/access/TransactionalAccessDelegate.java	2010-02-23 02:11:31 UTC (rev 18855)
@@ -83,12 +83,17 @@
         if (!region.checkValid())
             return false;
         
-        if (!putValidator.isPutValid(key))
+        if (!putValidator.acquirePutFromLoadLock(key))
             return false;
        
-        region.ensureRegionRootExists();
-
-        return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
+        try {
+	        region.ensureRegionRootExists();
+	
+	        return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
+        }
+        finally {
+        	putValidator.releasePutFromLoadLock(key);
+        }
     }
 
    public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride)
@@ -97,15 +102,20 @@
         if (!region.checkValid())
             return false;
         
-        if (!putValidator.isPutValid(key))
+        if (!putValidator.acquirePutFromLoadLock(key))
             return false;
-       
-        region.ensureRegionRootExists();
-
-        // We ignore minimalPutOverride. JBossCache putForExternalRead is
-        // already about as minimal as we can get; it will promptly return
-        // if it discovers that the node we want to write to already exists
-        return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
+        
+        try {
+	        region.ensureRegionRootExists();
+	
+	        // We ignore minimalPutOverride. JBossCache putForExternalRead is
+	        // already about as minimal as we can get; it will promptly return
+	        // if it discovers that the node we want to write to already exists
+	        return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
+        }
+        finally {
+        	putValidator.releasePutFromLoadLock(key);
+        }
     }
 
     public SoftLock lockItem(Object key, Object version) throws CacheException {
@@ -163,7 +173,9 @@
 
     public void remove(Object key) throws CacheException {
        
-       putValidator.keyRemoved(key);
+    	if (!putValidator.invalidateKey(key)) {
+    		throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
+    	}        
        
         // We remove whether or not the region is valid. Other nodes
         // may have already restored the region so they need to
@@ -175,13 +187,17 @@
     }
 
     public void removeAll() throws CacheException {
-       putValidator.regionRemoved();
+       if (!putValidator.invalidateRegion()) {
+     	  throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
+       }
        CacheHelper.removeAll(cache, regionFqn); 
     }
 
     public void evict(Object key) throws CacheException {
        
-        putValidator.keyRemoved(key);
+    	if (!putValidator.invalidateKey(key)) {
+    		throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
+    	}        
        
         region.ensureRegionRootExists();
         
@@ -189,7 +205,10 @@
     }
 
     public void evictAll() throws CacheException {
-       putValidator.regionRemoved();
+       if (!putValidator.invalidateRegion()) {
+     	  throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
+       }
+        
        Transaction tx = region.suspend();
        try {        
           region.ensureRegionRootExists();

Modified: core/trunk/cache-jbosscache/src/test/java/org/hibernate/test/cache/jbc/access/PutFromLoadValidatorUnitTestCase.java
===================================================================
--- core/trunk/cache-jbosscache/src/test/java/org/hibernate/test/cache/jbc/access/PutFromLoadValidatorUnitTestCase.java	2010-02-23 01:12:57 UTC (rev 18854)
+++ core/trunk/cache-jbosscache/src/test/java/org/hibernate/test/cache/jbc/access/PutFromLoadValidatorUnitTestCase.java	2010-02-23 02:11:31 UTC (rev 18855)
@@ -23,11 +23,15 @@
  */
 package org.hibernate.test.cache.jbc.access;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
@@ -87,7 +91,7 @@
 		if (transactional) {
 			tm.begin();
 		}
-		assertTrue(testee.isPutValid(KEY1));
+		assertTrue(testee.acquirePutFromLoadLock(KEY1));
 	}
 
 	public void testRegisteredPut() throws Exception {
@@ -105,7 +109,16 @@
 			tm.begin();
 		}
 		testee.registerPendingPut(KEY1);
-		assertTrue(testee.isPutValid(KEY1));
+		
+		boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+		try {
+			assertTrue(lockable);
+		}
+		finally {
+			if (lockable) {
+				testee.releasePutFromLoadLock(KEY1);
+			}
+		}
 	}
 
 	public void testNakedPutAfterKeyRemoval() throws Exception {
@@ -129,14 +142,23 @@
 		PutFromLoadValidator testee = new PutFromLoadValidator(
 				transactional ? tm : null);
 		if (removeRegion) {
-			testee.regionRemoved();
+			testee.invalidateRegion();
 		} else {
-			testee.keyRemoved(KEY1);
+			testee.invalidateKey(KEY1);
 		}
 		if (transactional) {
 			tm.begin();
 		}
-		assertFalse(testee.isPutValid(KEY1));
+		
+		boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+		try {
+			assertFalse(lockable);
+		}
+		finally {
+			if (lockable) {
+				testee.releasePutFromLoadLock(KEY1);
+			}
+		}
 
 	}
 
@@ -163,15 +185,24 @@
 		PutFromLoadValidator testee = new PutFromLoadValidator(
 				transactional ? tm : null);
 		if (removeRegion) {
-			testee.regionRemoved();
+			testee.invalidateRegion();
 		} else {
-			testee.keyRemoved(KEY1);
+			testee.invalidateKey(KEY1);
 		}
 		if (transactional) {
 			tm.begin();
 		}
 		testee.registerPendingPut(KEY1);
-		assertTrue(testee.isPutValid(KEY1));
+		
+		boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+		try {
+			assertTrue(lockable);
+		}
+		finally {
+			if (lockable) {
+				testee.releasePutFromLoadLock(KEY1);
+			}
+		}
 	}
 
 	public void testRegisteredPutWithInterveningKeyRemoval() throws Exception {
@@ -202,11 +233,20 @@
 		}
 		testee.registerPendingPut(KEY1);
 		if (removeRegion) {
-			testee.regionRemoved();
+			testee.invalidateRegion();
 		} else {
-			testee.keyRemoved(KEY1);
+			testee.invalidateKey(KEY1);
 		}
-		assertFalse(testee.isPutValid(KEY1));
+		
+		boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+		try {
+			assertFalse(lockable);
+		}
+		finally {
+			if (lockable) {
+				testee.releasePutFromLoadLock(KEY1);
+			}
+		}
 	}
 
 	public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
@@ -232,15 +272,24 @@
 		PutFromLoadValidator testee = new TestValidator(transactional ? tm
 				: null, 100, 1000, 500, 10000);
 		if (removeRegion) {
-			testee.regionRemoved();
+			testee.invalidateRegion();
 		} else {
-			testee.keyRemoved(KEY1);
+			testee.invalidateKey(KEY1);
 		}
 		if (transactional) {
 			tm.begin();
 		}
 		Thread.sleep(110);
-		assertTrue(testee.isPutValid(KEY1));
+		
+		boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+		try {
+			assertTrue(lockable);
+		}
+		finally {
+			if (lockable) {
+				testee.releasePutFromLoadLock(KEY1);
+			}
+		}
 
 	}
 	
@@ -268,8 +317,13 @@
 					testee.registerPendingPut(KEY1);
 					registeredLatch.countDown();
 					registeredLatch.await(5, TimeUnit.SECONDS);
-					if (testee.isPutValid(KEY1)) {
-						success.incrementAndGet();
+					if (testee.acquirePutFromLoadLock(KEY1)) {
+						try {
+							success.incrementAndGet();
+						}
+						finally {
+							testee.releasePutFromLoadLock(KEY1);
+						}
 					}
 					finishedLatch.countDown();
 				}
@@ -284,7 +338,7 @@
 		// Start with a removal so the "isPutValid" calls will fail if
 		// any of the concurrent activity isn't handled properly
 		
-		testee.regionRemoved();
+		testee.invalidateRegion();
 		
 		// Do the registration + isPutValid calls
 		executor.execute(r);
@@ -303,13 +357,13 @@
 	 */
 	public void testRemovalCleanup() throws Exception {
 		TestValidator testee = new TestValidator(null, 200, 1000, 500, 10000);
-		testee.keyRemoved("KEY1");
-		testee.keyRemoved("KEY2");
+		testee.invalidateKey("KEY1");
+		testee.invalidateKey("KEY2");
 		Thread.sleep(210);
 		assertEquals(2, testee.getRemovalQueueLength());
-		testee.keyRemoved("KEY1");
+		testee.invalidateKey("KEY1");
 		assertEquals(2, testee.getRemovalQueueLength());
-		testee.keyRemoved("KEY2");
+		testee.invalidateKey("KEY2");
 		assertEquals(2, testee.getRemovalQueueLength());
 	}
 	
@@ -324,7 +378,7 @@
 		
 		// Start with a regionRemoval so we can confirm at the end that all
 		// registrations have been cleaned out
-		testee.regionRemoved();
+		testee.invalidateRegion();
 		
 		testee.registerPendingPut("1");
 		testee.registerPendingPut("2");
@@ -332,8 +386,10 @@
 		testee.registerPendingPut("4");
 		testee.registerPendingPut("5");
 		testee.registerPendingPut("6");
-		testee.isPutValid("6");
-		testee.isPutValid("2");
+		testee.acquirePutFromLoadLock("6");
+		testee.releasePutFromLoadLock("6");
+		testee.acquirePutFromLoadLock("2");
+		testee.releasePutFromLoadLock("2");
         // ppq = [1,2(c),3,4,5,6(c)]
 		assertEquals(6, testee.getPendingPutQueueLength());
 		assertEquals(0, testee.getOveragePendingPutQueueLength());
@@ -358,7 +414,8 @@
 		
 		// Sleep past "maxPendingPutDelay"
 		Thread.sleep(310);
-		testee.isPutValid("3");
+		testee.acquirePutFromLoadLock("3");
+		testee.releasePutFromLoadLock("3");
 		// White box -- should have cleaned out 1 (overage) and 
 		// moved 7 to overage queue
 		// oppq = [3(c),4,5,7] ppq=[8]
@@ -380,21 +437,116 @@
 		
 		// Validate that only expected items can do puts, thus indirectly
 		// proving the others have been cleaned out of pendingPuts map
-		assertFalse(testee.isPutValid("1"));
+		boolean locked = testee.acquirePutFromLoadLock("1");
+		if (locked) {
+			testee.releasePutFromLoadLock("1");
+		}
+		assertFalse(locked);
 		// 5 was overage, so should have been cleaned
 		assertEquals(2, testee.getOveragePendingPutQueueLength());
-		assertFalse(testee.isPutValid("2"));
+		locked = testee.acquirePutFromLoadLock("2");
+		if (locked) {
+			testee.releasePutFromLoadLock("1");
+		}
+		assertFalse(locked);
 		// 7 was overage, so should have been cleaned
 		assertEquals(1, testee.getOveragePendingPutQueueLength());
-		assertFalse(testee.isPutValid("3"));
-		assertFalse(testee.isPutValid("4"));
-		assertFalse(testee.isPutValid("5"));
-		assertFalse(testee.isPutValid("6"));
-		assertFalse(testee.isPutValid("7"));
-		assertTrue(testee.isPutValid("8"));
+		locked = testee.acquirePutFromLoadLock("3");
+		if (locked) {
+			testee.releasePutFromLoadLock("1");
+		}
+		assertFalse(locked);
+		locked = testee.acquirePutFromLoadLock("4");
+		if (locked) {
+			testee.releasePutFromLoadLock("1");
+		}
+		assertFalse(locked);
+		locked = testee.acquirePutFromLoadLock("5");
+		if (locked) {
+			testee.releasePutFromLoadLock("1");
+		}
+		assertFalse(locked);
+		locked = testee.acquirePutFromLoadLock("1");
+		if (locked) {
+			testee.releasePutFromLoadLock("1");
+		}
+		assertFalse(testee.acquirePutFromLoadLock("6"));
+		locked = testee.acquirePutFromLoadLock("7");
+		if (locked) {
+			testee.releasePutFromLoadLock("1");
+		}
+		assertFalse(locked);
+		assertTrue(testee.acquirePutFromLoadLock("8"));
+		testee.releasePutFromLoadLock("8");
 		tm.resume(tx);
-		assertTrue(testee.isPutValid("7"));
+		assertTrue(testee.acquirePutFromLoadLock("7"));
+		testee.releasePutFromLoadLock("7");
 	}
+	
+	public void testInvalidateKeyBlocksForInProgressPut() throws Exception {
+		invalidationBlocksForInProgressPutTest(true);
+	}
+	
+	public void testInvalidateRegionBlocksForInProgressPut() throws Exception {
+		invalidationBlocksForInProgressPutTest(false);
+	}
+	
+	private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws Exception {
+		final PutFromLoadValidator testee = new PutFromLoadValidator(null);
+		final CountDownLatch removeLatch = new CountDownLatch(1);
+		final CountDownLatch pferLatch = new CountDownLatch(1);
+		final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
+		
+		Callable<Boolean> pferCallable = new Callable<Boolean>() {
+		    public Boolean call() throws Exception {
+		        testee.registerPendingPut(KEY1);
+		        if (testee.acquirePutFromLoadLock(KEY1)) {
+		        	try {
+		        		removeLatch.countDown();
+		        		pferLatch.await();
+				        cache.set("PFER");
+				        return Boolean.TRUE;
+		        	}
+		        	finally {
+		        		testee.releasePutFromLoadLock(KEY1);
+		        	}
+		        }
+		        return Boolean.FALSE;
+		    }
+	    };
+		
+		Callable<Void> invalidateCallable = new Callable<Void>() {
+		    public Void call() throws Exception {
+		        removeLatch.await();
+		        if (keyOnly) {
+		        	testee.invalidateKey(KEY1);
+		        }
+		        else {
+		        	testee.invalidateRegion();
+		        }		        
+		        cache.set(null);
+		        return null;
+		    }
+	    };
+	    
+	    ExecutorService executorService = Executors.newCachedThreadPool();
+	    Future<Boolean> pferFuture = executorService.submit(pferCallable);
+	    Future<Void> invalidateFuture = executorService.submit(invalidateCallable);
+	    
+	    try {
+	    	invalidateFuture.get(1, TimeUnit.SECONDS);
+	    	fail("invalidateFuture did not block");
+	    }
+	    catch (TimeoutException good) {}
+	    
+	    pferLatch.countDown();
+	    
+	    assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
+	    invalidateFuture.get(5, TimeUnit.SECONDS);
+	    
+	    assertNull(cache.get());
+	    
+	}
 
 	private static class TestValidator extends PutFromLoadValidator {
 
@@ -408,19 +560,16 @@
 
 		@Override
 		public int getOveragePendingPutQueueLength() {
-			// TODO Auto-generated method stub
 			return super.getOveragePendingPutQueueLength();
 		}
 
 		@Override
 		public int getPendingPutQueueLength() {
-			// TODO Auto-generated method stub
 			return super.getPendingPutQueueLength();
 		}
 
 		@Override
 		public int getRemovalQueueLength() {
-			// TODO Auto-generated method stub
 			return super.getRemovalQueueLength();
 		}
 



More information about the hibernate-commits mailing list