[hibernate-commits] Hibernate SVN: r18857 - in core/trunk/cache-infinispan/src: test/java/org/hibernate/test/cache/infinispan/access and 3 other directories.

hibernate-commits at lists.jboss.org hibernate-commits at lists.jboss.org
Tue Feb 23 07:11:35 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-02-23 07:11:34 -0500 (Tue, 23 Feb 2010)
New Revision: 18857

Modified:
   core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
   core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java
   core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java
   core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java
   core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/ConcurrentWriteTest.java
   core/trunk/cache-infinispan/src/test/resources/log4j.properties
Log:
[HHH-4944] (putFromLoad calls could store stale data) Fixed.

Modified: core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java
===================================================================
--- core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java	2010-02-23 12:09:45 UTC (rev 18856)
+++ core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java	2010-02-23 12:11:34 UTC (rev 18857)
@@ -30,6 +30,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -46,28 +47,59 @@
  * the potential to store stale data, since the data may have been removed from the
  * database and the cache between the time when the data was read from the database 
  * and the actual call to <code>putFromLoad</code>.
+ * <p>
+ * The expected usage of this class by a thread that read the cache and did
+ * not find data is:
  *
+ * <ol>
+ * <li> Call {@link #registerPendingPut(Object)}</li>
+ * <li> Read the database</li>
+ * <li> Call {@link #acquirePutFromLoadLock(Object)}
+ * <li> if above returns <code>false</code>, the thread should not cache the data;
+ *      only if above returns <code>true</code>, put data in the cache and...</li>
+ * <li> then call {@link #releasePutFromLoadLock(Object)}</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ * The expected usage by a thread that is taking an action such that any pending
+ * <code>putFromLoad</code> may have stale data and should not cache it is to either
+ * call
+ *
+ * <ul>
+ * <li> {@link #invalidateKey(Object)} (for a single key invalidation)</li>
+ * <li>or {@link #invalidateRegion()} (for a general invalidation all pending puts)</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * This class also supports the concept of "naked puts", which are calls to
+ * {@link #acquirePutFromLoadLock(Object)} without a preceding {@link #registerPendingPut(Object)}
+ * call.
+ * </p>
+ *
  * @author Brian Stansberry
  * 
  * @version $Revision: $
  */
 public class PutFromLoadValidator {
    /**
-    * Period in ms after a removal during which a call to {@link #isPutValid(Object)} that hasn't
-    * been {@link #registerPendingPut(Object) pre-registered} (aka a "naked put") will return false.
+    * Period (in ms) after a removal during which a call to
+    * {@link #acquirePutFromLoadLock(Object)} that hasn't been
+    * {@link #registerPendingPut(Object) pre-registered} (aka a "naked put")
+    * will return false.
+    * will return false.
     */
-   public static final long NAKED_PUT_INVALIDATION_PERIOD = 10 * 1000;
+   public static final long NAKED_PUT_INVALIDATION_PERIOD = TimeUnit.SECONDS.toMillis(20); 
 
-   /** Period after which a pending put is placed in the over-age queue */
-   private static final long PENDING_PUT_OVERAGE_PERIOD = 5 * 1000;
+   /** Period (in ms) after which a pending put is placed in the over-age queue */
+   private static final long PENDING_PUT_OVERAGE_PERIOD = TimeUnit.SECONDS.toMillis(5);
 
-   /** Period before which we stop trying to clean out pending puts */
-   private static final long PENDING_PUT_RECENT_PERIOD = 2 * 1000;
+   /** Period (in ms) before which we stop trying to clean out pending puts */
+   private static final long PENDING_PUT_RECENT_PERIOD = TimeUnit.SECONDS.toMillis(2);
 
-   /**
-    * Period after which a pending put is never expected to come in and should be cleaned
-    */
-   private static final long MAX_PENDING_PUT_DELAY = 2 * 60 * 1000;
+   /** Period (in ms) after which a pending put is never expected to come in and should be cleaned */
+   private static final long MAX_PENDING_PUT_DELAY = TimeUnit.SECONDS.toMillis(2 * 60);
 
    /**
     * Used to determine whether the owner of a pending put is a thread or a transaction
@@ -119,7 +151,7 @@
     * Creates a new PutFromLoadValidator.
     * 
     * @param transactionManager
-    *           transaction manager to use to associated changes with a transaction; may be
+    *           transaction manager to use to associate changes with a transaction; may be
     *           <code>null</code>
     */
    public PutFromLoadValidator(TransactionManager transactionManager) {
@@ -142,41 +174,136 @@
 
    // ----------------------------------------------------------------- Public
 
-   public boolean isPutValid(Object key) {
+   /**
+    * Acquire a lock giving the calling thread the right to put data in the
+    * cache for the given key.
+    * <p>
+    * <strong>NOTE:</strong> A call to this method that returns <code>true</code>
+    * should always be matched with a call to {@link #releasePutFromLoadLock(Object)}.
+    * </p>
+    *
+    * @param key the key
+    *
+    * @return <code>true</code> if the lock is acquired and the cache put
+    *         can proceed; <code>false</code> if the data should not be cached
+    */
+	public boolean acquirePutFromLoadLock(Object key) {
       boolean valid = false;
+      boolean locked = false;
       long now = System.currentTimeMillis();
 
-      PendingPutMap pending = pendingPuts.get(key);
-      if (pending != null) {
-         synchronized (pending) {
-            PendingPut toCancel = pending.remove(getOwnerForPut());
-            valid = toCancel != null;
-            if (valid) {
-               toCancel.completed = true;
-               if (pending.size() == 0) {
-                  pendingPuts.remove(key);
+      // Important: Do cleanup before we acquire any locks so we
+      // don't deadlock with invalidateRegion
+      cleanOutdatedPendingPuts(now, true);
+
+      try {
+         PendingPutMap pending = pendingPuts.get(key);
+         if (pending != null) {
+            locked = pending.acquireLock(100, TimeUnit.MILLISECONDS);
+            if (locked) {
+               try {
+                  PendingPut toCancel = pending.remove(getOwnerForPut());
+                  if (toCancel != null) {
+                     valid = !toCancel.completed;
+                     toCancel.completed = true;
+                  }
                }
+               finally {
+                  if (!valid) {
+                     pending.releaseLock();
+                     locked = false;
+                  }
+               }
             }
          }
+         else {
+            // Key wasn't in pendingPuts, so either this is a "naked put"
+            // or regionRemoved has been called. Check if we can proceed
+            if (now > invalidationTimestamp) {
+               Long removedTime = recentRemovals.get(key);
+               if (removedTime == null || now > removedTime.longValue()) {
+                  // It's legal to proceed. But we have to record this key
+                  // in pendingPuts so releasePutFromLoadLock can find it.
+                  // To do this we basically simulate a normal "register
+                  // then acquire lock" pattern
+                  registerPendingPut(key);
+                  locked = acquirePutFromLoadLock(key);
+                  valid = locked;
+               }
+            }
+         }
       }
+      catch (Throwable t) {
 
-      if (!valid) {
-         if (now > invalidationTimestamp) {
-            Long removedTime = recentRemovals.get(key);
-            if (removedTime == null || now > removedTime.longValue()) {
-               valid = true;
+         valid = false;
+
+         if (locked) {
+            PendingPutMap toRelease = pendingPuts.get(key);
+            if (toRelease != null) {
+               toRelease.releaseLock();
             }
          }
+
+         if (t instanceof RuntimeException) {
+            throw (RuntimeException) t;
+         } else if (t instanceof Error) {
+            throw (Error) t;
+         } else {
+            throw new RuntimeException(t);
+         }
       }
 
-      cleanOutdatedPendingPuts(now, true);
-
       return valid;
    }
 
-   public void keyRemoved(Object key) {
+   /**
+    * Releases the lock previously obtained by a call to
+    * {@link #acquirePutFromLoadLock(Object)} that returned <code>true</code>.
+    *
+    * @param key the key
+    */
+   public void releasePutFromLoadLock(Object key) {
+      PendingPutMap pending = pendingPuts.get(key);
+      if (pending != null) {
+         if (pending.size() == 0) {
+            pendingPuts.remove(key);
+         }
+         pending.releaseLock();
+      }
+   }
+
+   /**
+    * Invalidates any {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to
+    * {@link #acquirePutFromLoadLock(Object)} will return <code>false</code>. <p> This method will block until any
+    * concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the given key
+    * has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
+    * returns, possibly caching stale data. </p>
+    *
+    * @param key key identifying data whose pending puts should be invalidated
+    * @return <code>true</code> if the invalidation was successful; <code>false</code> if a problem occured (which the
+    *         caller should treat as an exception condition)
+    */
+   public boolean invalidateKey(Object key) {
+
+      boolean success = true;
+
       // Invalidate any pending puts
-      pendingPuts.remove(key);
+      PendingPutMap pending = pendingPuts.get(key);
+      if (pending != null) {
+         // This lock should be available very quickly, but we'll be
+         // very patient waiting for it as callers should treat not
+         // acquiring it as an exception condition
+         if (pending.acquireLock(60, TimeUnit.SECONDS)) {
+            try {
+               pending.invalidate();
+            }
+            finally {
+               pending.releaseLock();
+            }
+         } else {
+            success = false;
+         }
+      }
 
       // Record when this occurred to invalidate later naked puts
       RecentRemoval removal = new RecentRemoval(key, this.nakedPutInvalidationPeriod);
@@ -210,51 +337,83 @@
             }
          }
       }
+
+      return success;
    }
 
-   public void cleared() {
+   /**
+    * Invalidates all {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to
+    * {@link #acquirePutFromLoadLock(Object)} will return <code>false</code>. <p> This method will block until any
+    * concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the any key has
+    * released the lock. This allows the caller to be certain the putFromLoad will not execute after this method returns,
+    * possibly caching stale data. </p>
+    *
+    * @return <code>true</code> if the invalidation was successful; <code>false</code> if a problem occured (which the
+    *         caller should treat as an exception condition)
+    */
+   public boolean invalidateRegion() {
+
+      boolean ok = false;
       invalidationTimestamp = System.currentTimeMillis() + this.nakedPutInvalidationPeriod;
-      pendingLock.lock();
+
       try {
+
+         // Acquire the lock for each entry to ensure any ongoing
+         // work associated with it is completed before we return
+         for (PendingPutMap entry : pendingPuts.values()) {
+            if (entry.acquireLock(60, TimeUnit.SECONDS)) {
+               try {
+                  entry.invalidate();
+               }
+               finally {
+                  entry.releaseLock();
+               }
+            } else {
+               ok = false;
+            }
+         }
+
          removalsLock.lock();
          try {
-            pendingPuts.clear();
-            pendingQueue.clear();
-            overagePendingQueue.clear();
             recentRemovals.clear();
             removalsQueue.clear();
-            earliestRemovalTimestamp = invalidationTimestamp;
 
+            ok = true;
+
          } finally {
             removalsLock.unlock();
          }
-      } finally {
-         pendingLock.unlock();
       }
-   }
+      catch (Exception e) {
+         ok = false;
+      }
+      finally {
+         earliestRemovalTimestamp = invalidationTimestamp;
+      }
 
+      return ok;
+	}
+
    /**
-    * Notifies this validator that it is expected that a database read followed by a subsequent
-    * {@link #isPutValid(Object)} call will occur. The intent is this method would be called
-    * following a cache miss wherein it is expected that a database read plus cache put will occur.
-    * Calling this method allows the validator to treat the subsequent <code>isPutValid</code> as if
-    * the database read occurred when this method was invoked. This allows the validator to compare
-    * the timestamp of this call against the timestamp of subsequent removal notifications. A put
-    * that occurs without this call preceding it is "naked"; i.e the validator must assume the put
-    * is not valid if any relevant removal has occurred within
-    * {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
-    * 
-    * @param key
-    *           key that will be used for subsequent put
+    * Notifies this validator that it is expected that a database read followed by a subsequent {@link
+    * #acquirePutFromLoadLock(Object)} call will occur. The intent is this method would be called following a cache miss
+    * wherein it is expected that a database read plus cache put will occur. Calling this method allows the validator to
+    * treat the subsequent <code>acquirePutFromLoadLock</code> as if the database read occurred when this method was
+    * invoked. This allows the validator to compare the timestamp of this call against the timestamp of subsequent removal
+    * notifications. A put that occurs without this call preceding it is "naked"; i.e the validator must assume the put is
+    * not valid if any relevant removal has occurred within {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
+    *
+    * @param key key that will be used for subsequent cache put
     */
    public void registerPendingPut(Object key) {
       PendingPut pendingPut = new PendingPut(key, getOwnerForPut());
-      PendingPutMap pendingForKey = new PendingPutMap();
-      synchronized (pendingForKey) {
-         for (;;) {
-            PendingPutMap existing = pendingPuts.putIfAbsent(key, pendingForKey);
-            if (existing != null && existing != pendingForKey) {
-               synchronized (existing) {
+      PendingPutMap pendingForKey = new PendingPutMap(pendingPut);
+
+      for (;;) {
+         PendingPutMap existing = pendingPuts.putIfAbsent(key, pendingForKey);
+         if (existing != null) {
+            if (existing.acquireLock(10, TimeUnit.SECONDS)) {
+               try {
                   existing.put(pendingPut);
                   PendingPutMap doublecheck = pendingPuts.putIfAbsent(key, existing);
                   if (doublecheck == null || doublecheck == existing) {
@@ -262,10 +421,16 @@
                   }
                   // else we hit a race and need to loop to try again
                }
+               finally {
+                  existing.releaseLock();
+               }
             } else {
-               pendingForKey.put(pendingPut);
+               // Can't get the lock; when we come back we'll be a "naked put"
                break;
             }
+         } else {
+            // normal case
+            break;
          }
       }
 
@@ -324,7 +489,9 @@
       pendingLock.lock();
       try {
          pendingQueue.add(new WeakReference<PendingPut>(pendingPut));
-         cleanOutdatedPendingPuts(pendingPut.timestamp, false);
+         if (pendingQueue.size() > 1) {
+            cleanOutdatedPendingPuts(pendingPut.timestamp, false);
+         }
       } finally {
          pendingLock.unlock();
       }
@@ -337,9 +504,7 @@
          pendingLock.lock();
       }
       try {
-
          // Clean items out of the basic queue
-
          long overaged = now - this.pendingPutOveragePeriod;
          long recent = now - this.pendingPutRecentPeriod;
 
@@ -392,31 +557,60 @@
       if (toClean != null) {
          PendingPutMap map = pendingPuts.get(toClean.key);
          if (map != null) {
-            synchronized (map) {
-               PendingPut cleaned = map.remove(toClean.owner);
-               if (toClean.equals(cleaned) == false) {
-                  // Oops. Restore it.
-                  map.put(cleaned);
-               } else if (map.size() == 0) {
-                  pendingPuts.remove(toClean.key);
+            if (map.acquireLock(100, TimeUnit.MILLISECONDS)) {
+               try {
+                  PendingPut cleaned = map.remove(toClean.owner);
+                  if (toClean.equals(cleaned) == false) {
+                     // Oops. Restore it.
+                     map.put(cleaned);
+                  } else if (map.size() == 0) {
+                     pendingPuts.remove(toClean.key);
+                  }
                }
+               finally {
+                  map.releaseLock();
+               }
+            } else {
+               // Something's gone wrong and the lock isn't being released.
+               // We removed toClean from the queue and need to restore it
+               // TODO this is pretty dodgy
+               restorePendingPut(toClean);
             }
          }
       }
 
    }
 
+   private void restorePendingPut(PendingPut toRestore) {
+      pendingLock.lock();
+      try {
+         // Give it a new lease on life so it's not out of order. We could
+         // scan the queue and put toRestore back at the front, but then
+         // we'll just immediately try removing it again; instead we
+         // let it cycle through the queue again
+         toRestore.refresh();
+         pendingQueue.add(new WeakReference<PendingPut>(toRestore));
+      }
+      finally {
+         pendingLock.unlock();
+      }
+   }
+
    /**
     * Lazy-initialization map for PendingPut. Optimized for the expected usual case where only a
     * single put is pending for a given key.
     * 
-    * This class is NOT THREAD SAFE. All operations on it must be performed with the object monitor
-    * held.
+    * This class is NOT THREAD SAFE. All operations on it must be performed with the lock held.
     */
    private static class PendingPutMap {
       private PendingPut singlePendingPut;
       private Map<Object, PendingPut> fullMap;
+      private final Lock lock = new ReentrantLock();
 
+      PendingPutMap(PendingPut singleItem) {
+         this.singlePendingPut = singleItem;
+      }
+
       public void put(PendingPut pendingPut) {
          if (singlePendingPut == null) {
             if (fullMap == null) {
@@ -437,7 +631,8 @@
       public PendingPut remove(Object ownerForPut) {
          PendingPut removed = null;
          if (fullMap == null) {
-            if (singlePendingPut != null && singlePendingPut.owner.equals(ownerForPut)) {
+            if (singlePendingPut != null
+                  && singlePendingPut.owner.equals(ownerForPut)) {
                removed = singlePendingPut;
                singlePendingPut = null;
             }
@@ -448,14 +643,38 @@
       }
 
       public int size() {
-         return fullMap == null ? (singlePendingPut == null ? 0 : 1) : fullMap.size();
+         return fullMap == null ? (singlePendingPut == null ? 0 : 1)
+               : fullMap.size();
       }
+
+      public boolean acquireLock(long time, TimeUnit unit) {
+         try {
+            return lock.tryLock(time, unit);
+         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return false;
+         }
+      }
+
+      public void releaseLock() {
+         lock.unlock();
+      }
+
+      public void invalidate() {
+         if (singlePendingPut != null) {
+            singlePendingPut.completed = true;
+         } else if (fullMap != null) {
+            for (PendingPut pp : fullMap.values()) {
+               pp.completed = true;
+            }
+         }
+      }
    }
 
    private static class PendingPut {
       private final Object key;
       private final Object owner;
-      private final long timestamp = System.currentTimeMillis();
+      private long timestamp = System.currentTimeMillis();
       private volatile boolean completed;
 
       private PendingPut(Object key, Object owner) {
@@ -463,6 +682,9 @@
          this.owner = owner;
       }
 
+      private void refresh() {
+         timestamp = System.currentTimeMillis();
+      }
    }
 
    private static class RecentRemoval {

Modified: core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java
===================================================================
--- core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java	2010-02-23 12:09:45 UTC (rev 18856)
+++ core/trunk/cache-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java	2010-02-23 12:11:34 UTC (rev 18857)
@@ -68,29 +68,27 @@
    }
 
    public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException {
-      if (!region.checkValid()) {
+      if (!region.checkValid())
          return false;
-      }
-      if (!putValidator.isPutValid(key)) {
+
+      if (!putValidator.acquirePutFromLoadLock(key))
          return false;
+
+      try {
+         cacheAdapter.putForExternalRead(key, value);
+      } finally {
+         putValidator.releasePutFromLoadLock(key);
       }
-      cacheAdapter.putForExternalRead(key, value);
+
       return true;
    }
 
    public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride)
             throws CacheException {
-      boolean trace = log.isTraceEnabled();
-      if (!region.checkValid()) {
-         if (trace) log.trace("Region not valid");
-         return false;
-      }
-      if (!putValidator.isPutValid(key)) {
-         if (trace) log.trace("Put {0} not valid", key);
-         return false;
-      }
-      cacheAdapter.putForExternalRead(key, value);
-      return true;
+      // We ignore minimalPutOverride. Infinispan putForExternalRead is
+      // already about as minimal as we can get; it will promptly return
+      // if it discovers that the node we want to write to already exists
+      return putFromLoad(key, value, txTimestamp, version);
    }
 
    public SoftLock lockItem(Object key, Object version) throws CacheException {
@@ -137,25 +135,33 @@
    }
 
    public void remove(Object key) throws CacheException {
+      if (!putValidator.invalidateKey(key)) {
+         throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
+      }
       // We update whether or not the region is valid. Other nodes
       // may have already restored the region so they need to
       // be informed of the change.
-      putValidator.keyRemoved(key);
       cacheAdapter.remove(key);
    }
 
    public void removeAll() throws CacheException {
-      putValidator.cleared();
+       if (!putValidator.invalidateRegion()) {
+         throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
+       }
       cacheAdapter.clear();
    }
 
    public void evict(Object key) throws CacheException {
-      putValidator.keyRemoved(key);
+      if (!putValidator.invalidateKey(key)) {
+         throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
+      }      
       cacheAdapter.remove(key);
    }
 
    public void evictAll() throws CacheException {
-      putValidator.cleared();
+      if (!putValidator.invalidateRegion()) {
+         throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
+      }
       Transaction tx = region.suspend();
       try {
          CacheHelper.sendEvictAllNotification(cacheAdapter, region.getAddress());

Modified: core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java
===================================================================
--- core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java	2010-02-23 12:09:45 UTC (rev 18856)
+++ core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java	2010-02-23 12:11:34 UTC (rev 18857)
@@ -23,11 +23,15 @@
  */
 package org.hibernate.test.cache.infinispan.access;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
@@ -39,10 +43,9 @@
 
 /**
  * Tests of {@link PutFromLoadValidator}.
- * 
+ *
  * @author Brian Stansberry
  * @author Galder Zamarreño
- * 
  * @version $Revision: $
  */
 public class PutFromLoadValidatorUnitTestCase extends TestCase {
@@ -87,7 +90,15 @@
       if (transactional) {
          tm.begin();
       }
-      assertTrue(testee.isPutValid(KEY1));
+      boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+      try {
+         assertTrue(lockable);
+      }
+      finally {
+         if (lockable) {
+            testee.releasePutFromLoadLock(KEY1);
+         }
+      }
    }
 
    public void testRegisteredPut() throws Exception {
@@ -99,12 +110,22 @@
    }
 
    private void registeredPutTest(boolean transactional) throws Exception {
-      PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
+      PutFromLoadValidator testee = new PutFromLoadValidator(
+            transactional ? tm : null);
       if (transactional) {
          tm.begin();
       }
       testee.registerPendingPut(KEY1);
-      assertTrue(testee.isPutValid(KEY1));
+
+      boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+      try {
+         assertTrue(lockable);
+      }
+      finally {
+         if (lockable) {
+            testee.releasePutFromLoadLock(KEY1);
+         }
+      }
    }
 
    public void testNakedPutAfterKeyRemoval() throws Exception {
@@ -124,18 +145,27 @@
    }
 
    private void nakedPutAfterRemovalTest(boolean transactional, boolean removeRegion)
-            throws Exception {
-      PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
+         throws Exception {
+      PutFromLoadValidator testee = new PutFromLoadValidator(
+            transactional ? tm : null);
       if (removeRegion) {
-         testee.cleared();
+         testee.invalidateRegion();
       } else {
-         testee.keyRemoved(KEY1);
+         testee.invalidateKey(KEY1);
       }
       if (transactional) {
          tm.begin();
       }
-      assertFalse(testee.isPutValid(KEY1));
 
+      boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+      try {
+         assertFalse(lockable);
+      }
+      finally {
+         if (lockable) {
+            testee.releasePutFromLoadLock(KEY1);
+         }
+      }
    }
 
    public void testRegisteredPutAfterKeyRemoval() throws Exception {
@@ -155,18 +185,28 @@
    }
 
    private void registeredPutAfterRemovalTest(boolean transactional, boolean removeRegion)
-            throws Exception {
-      PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
+         throws Exception {
+      PutFromLoadValidator testee = new PutFromLoadValidator(
+            transactional ? tm : null);
       if (removeRegion) {
-         testee.cleared();
+         testee.invalidateRegion();
       } else {
-         testee.keyRemoved(KEY1);
+         testee.invalidateKey(KEY1);
       }
       if (transactional) {
          tm.begin();
       }
       testee.registerPendingPut(KEY1);
-      assertTrue(testee.isPutValid(KEY1));
+
+      boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+      try {
+         assertTrue(lockable);
+      }
+      finally {
+         if (lockable) {
+            testee.releasePutFromLoadLock(KEY1);
+         }
+      }
    }
 
    public void testRegisteredPutWithInterveningKeyRemoval() throws Exception {
@@ -186,18 +226,28 @@
    }
 
    private void registeredPutWithInterveningRemovalTest(boolean transactional, boolean removeRegion)
-            throws Exception {
-      PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
+         throws Exception {
+      PutFromLoadValidator testee = new PutFromLoadValidator(
+            transactional ? tm : null);
       if (transactional) {
          tm.begin();
       }
       testee.registerPendingPut(KEY1);
       if (removeRegion) {
-         testee.cleared();
+         testee.invalidateRegion();
       } else {
-         testee.keyRemoved(KEY1);
+         testee.invalidateKey(KEY1);
       }
-      assertFalse(testee.isPutValid(KEY1));
+
+      boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+      try {
+         assertFalse(lockable);
+      }
+      finally {
+         if (lockable) {
+            testee.releasePutFromLoadLock(KEY1);
+         }
+      }
    }
 
    public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
@@ -217,20 +267,27 @@
    }
 
    private void delayedNakedPutAfterRemovalTest(boolean transactional, boolean removeRegion)
-            throws Exception {
-      PutFromLoadValidator testee = new TestValidator(transactional ? tm : null, 100, 1000, 500,
-               10000);
+         throws Exception {
+      PutFromLoadValidator testee = new TestValidator(transactional ? tm : null, 100, 1000, 500, 10000);
       if (removeRegion) {
-         testee.cleared();
+         testee.invalidateRegion();
       } else {
-         testee.keyRemoved(KEY1);
+         testee.invalidateKey(KEY1);
       }
       if (transactional) {
          tm.begin();
       }
       Thread.sleep(110);
-      assertTrue(testee.isPutValid(KEY1));
 
+      boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+      try {
+         assertTrue(lockable);
+      }
+      finally {
+         if (lockable) {
+            testee.releasePutFromLoadLock(KEY1);
+         }
+      }
    }
 
    public void testMultipleRegistrations() throws Exception {
@@ -257,11 +314,17 @@
                testee.registerPendingPut(KEY1);
                registeredLatch.countDown();
                registeredLatch.await(5, TimeUnit.SECONDS);
-               if (testee.isPutValid(KEY1)) {
-                  success.incrementAndGet();
+               if (testee.acquirePutFromLoadLock(KEY1)) {
+                  try {
+                     success.incrementAndGet();
+                  }
+                  finally {
+                     testee.releasePutFromLoadLock(KEY1);
+                  }
                }
                finishedLatch.countDown();
-            } catch (Exception e) {
+            }
+            catch (Exception e) {
                e.printStackTrace();
             }
          }
@@ -272,7 +335,7 @@
       // Start with a removal so the "isPutValid" calls will fail if
       // any of the concurrent activity isn't handled properly
 
-      testee.cleared();
+      testee.invalidateRegion();
 
       // Do the registration + isPutValid calls
       executor.execute(r);
@@ -285,25 +348,26 @@
    }
 
    /**
-    * White box test for ensuring key removals get cleaned up.
-    * 
+    * White box test for ensuring key removals get cleaned up. <b>Note</b>: Since this test is test sensitive, if you
+    * add trace logging, it might fail
+    *
     * @throws Exception
     */
    public void testRemovalCleanup() throws Exception {
       TestValidator testee = new TestValidator(null, 200, 1000, 500, 10000);
-      testee.keyRemoved("KEY1");
-      testee.keyRemoved("KEY2");
+      testee.invalidateKey("KEY1");
+      testee.invalidateKey("KEY2");
       Thread.sleep(210);
       assertEquals(2, testee.getRemovalQueueLength());
-      testee.keyRemoved("KEY1");
+      testee.invalidateKey("KEY1");
       assertEquals(2, testee.getRemovalQueueLength());
-      testee.keyRemoved("KEY2");
+      testee.invalidateKey("KEY2");
       assertEquals(2, testee.getRemovalQueueLength());
    }
 
    /**
     * Very much a white box test of the logic for ensuring pending put registrations get cleaned up.
-    * 
+    *
     * @throws Exception
     */
    public void testPendingPutCleanup() throws Exception {
@@ -311,7 +375,7 @@
 
       // Start with a regionRemoval so we can confirm at the end that all
       // registrations have been cleaned out
-      testee.cleared();
+      testee.invalidateRegion();
 
       testee.registerPendingPut("1");
       testee.registerPendingPut("2");
@@ -319,8 +383,10 @@
       testee.registerPendingPut("4");
       testee.registerPendingPut("5");
       testee.registerPendingPut("6");
-      testee.isPutValid("6");
-      testee.isPutValid("2");
+      testee.acquirePutFromLoadLock("6");
+      testee.releasePutFromLoadLock("6");
+      testee.acquirePutFromLoadLock("2");
+      testee.releasePutFromLoadLock("2");
       // ppq = [1,2(c),3,4,5,6(c)]
       assertEquals(6, testee.getPendingPutQueueLength());
       assertEquals(0, testee.getOveragePendingPutQueueLength());
@@ -338,14 +404,15 @@
       Thread.sleep(310);
       testee.registerPendingPut("8");
       // White box -- should have cleaned out 6 (completed) and
-      // moved 1, 3, 4 and 5 to overage queue
+      // moved 1, 3, 4  and 5 to overage queue
       // oppq = [1,3,4,5] ppq = [7,8]
       assertEquals(4, testee.getOveragePendingPutQueueLength());
       assertEquals(2, testee.getPendingPutQueueLength());
 
       // Sleep past "maxPendingPutDelay"
       Thread.sleep(310);
-      testee.isPutValid("3");
+      testee.acquirePutFromLoadLock("3");
+      testee.releasePutFromLoadLock("3");
       // White box -- should have cleaned out 1 (overage) and
       // moved 7 to overage queue
       // oppq = [3(c),4,5,7] ppq=[8]
@@ -367,29 +434,123 @@
 
       // Validate that only expected items can do puts, thus indirectly
       // proving the others have been cleaned out of pendingPuts map
-      assertFalse(testee.isPutValid("1"));
+      boolean locked = testee.acquirePutFromLoadLock("1");
+      if (locked) {
+         testee.releasePutFromLoadLock("1");
+      }
+      assertFalse(locked);
       // 5 was overage, so should have been cleaned
       assertEquals(2, testee.getOveragePendingPutQueueLength());
-      assertFalse(testee.isPutValid("2"));
+      locked = testee.acquirePutFromLoadLock("2");
+      if (locked) {
+         testee.releasePutFromLoadLock("1");
+      }
+      assertFalse(locked);
       // 7 was overage, so should have been cleaned
       assertEquals(1, testee.getOveragePendingPutQueueLength());
-      assertFalse(testee.isPutValid("3"));
-      assertFalse(testee.isPutValid("4"));
-      assertFalse(testee.isPutValid("5"));
-      assertFalse(testee.isPutValid("6"));
-      assertFalse(testee.isPutValid("7"));
-      assertTrue(testee.isPutValid("8"));
+      locked = testee.acquirePutFromLoadLock("3");
+      if (locked) {
+         testee.releasePutFromLoadLock("1");
+      }
+      assertFalse(locked);
+      locked = testee.acquirePutFromLoadLock("4");
+      if (locked) {
+         testee.releasePutFromLoadLock("1");
+      }
+      assertFalse(locked);
+      locked = testee.acquirePutFromLoadLock("5");
+      if (locked) {
+         testee.releasePutFromLoadLock("1");
+      }
+      assertFalse(locked);
+      locked = testee.acquirePutFromLoadLock("1");
+      if (locked) {
+         testee.releasePutFromLoadLock("1");
+      }
+      assertFalse(testee.acquirePutFromLoadLock("6"));
+      locked = testee.acquirePutFromLoadLock("7");
+      if (locked) {
+         testee.releasePutFromLoadLock("1");
+      }
+      assertFalse(locked);
+      assertTrue(testee.acquirePutFromLoadLock("8"));
+      testee.releasePutFromLoadLock("8");
       tm.resume(tx);
-      assertTrue(testee.isPutValid("7"));
+      assertTrue(testee.acquirePutFromLoadLock("7"));
+      testee.releasePutFromLoadLock("7");
    }
 
+   public void testInvalidateKeyBlocksForInProgressPut() throws Exception {
+      invalidationBlocksForInProgressPutTest(true);
+   }
+
+   public void testInvalidateRegionBlocksForInProgressPut() throws Exception {
+      invalidationBlocksForInProgressPutTest(false);
+   }
+
+   private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws Exception {
+      final PutFromLoadValidator testee = new PutFromLoadValidator(null);
+      final CountDownLatch removeLatch = new CountDownLatch(1);
+      final CountDownLatch pferLatch = new CountDownLatch(1);
+      final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
+
+      Callable<Boolean> pferCallable = new Callable<Boolean>() {
+         public Boolean call() throws Exception {
+            testee.registerPendingPut(KEY1);
+            if (testee.acquirePutFromLoadLock(KEY1)) {
+               try {
+                  removeLatch.countDown();
+                  pferLatch.await();
+                  cache.set("PFER");
+                  return Boolean.TRUE;
+               }
+               finally {
+                  testee.releasePutFromLoadLock(KEY1);
+               }
+            }
+            return Boolean.FALSE;
+         }
+      };
+
+      Callable<Void> invalidateCallable = new Callable<Void>() {
+         public Void call() throws Exception {
+            removeLatch.await();
+            if (keyOnly) {
+               testee.invalidateKey(KEY1);
+            } else {
+               testee.invalidateRegion();
+            }
+            cache.set(null);
+            return null;
+         }
+      };
+
+      ExecutorService executorService = Executors.newCachedThreadPool();
+      Future<Boolean> pferFuture = executorService.submit(pferCallable);
+      Future<Void> invalidateFuture = executorService.submit(invalidateCallable);
+
+      try {
+         invalidateFuture.get(1, TimeUnit.SECONDS);
+         fail("invalidateFuture did not block");
+      }
+      catch (TimeoutException good) {}
+
+      pferLatch.countDown();
+
+      assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
+      invalidateFuture.get(5, TimeUnit.SECONDS);
+
+      assertNull(cache.get());
+
+   }
+
    private static class TestValidator extends PutFromLoadValidator {
 
       protected TestValidator(TransactionManager transactionManager,
-               long nakedPutInvalidationPeriod, long pendingPutOveragePeriod,
-               long pendingPutRecentPeriod, long maxPendingPutDelay) {
+                              long nakedPutInvalidationPeriod, long pendingPutOveragePeriod,
+                              long pendingPutRecentPeriod, long maxPendingPutDelay) {
          super(transactionManager, nakedPutInvalidationPeriod, pendingPutOveragePeriod,
-                  pendingPutRecentPeriod, maxPendingPutDelay);
+               pendingPutRecentPeriod, maxPendingPutDelay);
       }
 
       @Override

Modified: core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java
===================================================================
--- core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java	2010-02-23 12:09:45 UTC (rev 18856)
+++ core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java	2010-02-23 12:11:34 UTC (rev 18857)
@@ -23,8 +23,15 @@
  */
 package org.hibernate.test.cache.infinispan.collection;
 
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
 
 import junit.extensions.TestSetup;
 import junit.framework.AssertionFailedError;
@@ -32,20 +39,29 @@
 import junit.framework.TestSuite;
 
 import org.hibernate.cache.CacheDataDescription;
+import org.hibernate.cache.CacheException;
 import org.hibernate.cache.CollectionRegion;
 import org.hibernate.cache.access.AccessType;
 import org.hibernate.cache.access.CollectionRegionAccessStrategy;
 import org.hibernate.cache.impl.CacheDataDescriptionImpl;
 import org.hibernate.cache.infinispan.InfinispanRegionFactory;
+import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
+import org.hibernate.cache.infinispan.access.TransactionalAccessDelegate;
+import org.hibernate.cache.infinispan.collection.CollectionRegionImpl;
 import org.hibernate.cache.infinispan.impl.BaseRegion;
 import org.hibernate.cache.infinispan.util.CacheAdapter;
+import org.hibernate.cache.infinispan.util.CacheAdapterImpl;
 import org.hibernate.cache.infinispan.util.FlagAdapter;
 import org.hibernate.cfg.Configuration;
 import org.hibernate.test.cache.infinispan.AbstractNonFunctionalTestCase;
+import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
 import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
 import org.hibernate.util.ComparableComparator;
+import org.infinispan.Cache;
 import org.infinispan.transaction.tm.BatchModeTransactionManager;
 
+import javax.transaction.TransactionManager;
+
 /**
  * Base class for tests of CollectionRegionAccessStrategy impls.
  * 
@@ -183,15 +199,64 @@
    public abstract void testCacheConfiguration();
 
    /**
-    * Test method for {@link TransactionalAccess#getRegion()}.
+    * Test method for {@link CollectionRegionAccessStrategy#getRegion()}.
     */
    public void testGetRegion() {
       assertEquals("Correct region", localCollectionRegion, localAccessStrategy.getRegion());
    }
 
+   public void testPutFromLoadRemoveDoesNotProduceStaleData() throws Exception {
+      final CountDownLatch pferLatch = new CountDownLatch(1);
+      final CountDownLatch removeLatch = new CountDownLatch(1);
+      TransactionManager tm = DualNodeJtaTransactionManagerImpl.getInstance("test1234");
+      PutFromLoadValidator validator = new PutFromLoadValidator(tm) {
+         @Override
+         public boolean acquirePutFromLoadLock(Object key) {
+            boolean acquired = super.acquirePutFromLoadLock(key);
+            try {
+               removeLatch.countDown();
+               pferLatch.await(2, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+               log.debug("Interrupted");
+               Thread.currentThread().interrupt();
+            } catch (Exception e) {
+               log.error("Error", e);
+               throw new RuntimeException("Error", e);
+            }
+            return acquired;
+         }
+      };
+      final TransactionalAccessDelegate delegate = new TransactionalAccessDelegate((CollectionRegionImpl) localCollectionRegion, validator);
+
+      Callable<Void> pferCallable = new Callable<Void>() {
+         public Void call() throws Exception {
+            delegate.putFromLoad("k1", "v1", 0, null);
+            return null;
+         }
+      };
+
+      Callable<Void> removeCallable = new Callable<Void>() {
+         public Void call() throws Exception {
+            removeLatch.await();
+            delegate.remove("k1");
+            pferLatch.countDown();
+            return null;
+         }
+      };
+
+      ExecutorService executorService = Executors.newCachedThreadPool();
+      Future<Void> pferFuture = executorService.submit(pferCallable);
+      Future<Void> removeFuture = executorService.submit(removeCallable);
+
+      pferFuture.get();
+      removeFuture.get();
+
+      assertFalse(localCache.containsKey("k1"));
+   }
+
    /**
     * Test method for
-    * {@link TransactionalAccess#putFromLoad(java.lang.Object, java.lang.Object, long, java.lang.Object)}
+    * {@link CollectionRegionAccessStrategy#putFromLoad(java.lang.Object, java.lang.Object, long, java.lang.Object)}
     * .
     */
    public void testPutFromLoad() throws Exception {
@@ -200,7 +265,7 @@
 
    /**
     * Test method for
-    * {@link TransactionalAccess#putFromLoad(java.lang.Object, java.lang.Object, long, java.lang.Object, boolean)}
+    * {@link CollectionRegionAccessStrategy#putFromLoad(java.lang.Object, java.lang.Object, long, java.lang.Object, boolean)}
     * .
     */
    public void testPutFromLoadMinimal() throws Exception {
@@ -339,21 +404,21 @@
    }
 
    /**
-    * Test method for {@link TransactionalAccess#remove(java.lang.Object)}.
+    * Test method for {@link CollectionRegionAccessStrategy#remove(java.lang.Object)}.
     */
    public void testRemove() {
       evictOrRemoveTest(false);
    }
 
    /**
-    * Test method for {@link TransactionalAccess#removeAll()}.
+    * Test method for {@link CollectionRegionAccessStrategy#removeAll()}.
     */
    public void testRemoveAll() {
       evictOrRemoveAllTest(false);
    }
 
    /**
-    * Test method for {@link TransactionalAccess#evict(java.lang.Object)}.
+    * Test method for {@link CollectionRegionAccessStrategy#evict(java.lang.Object)}.
     * 
     * FIXME add testing of the "immediately without regard for transaction isolation" bit in the
     * CollectionRegionAccessStrategy API.
@@ -363,7 +428,7 @@
    }
 
    /**
-    * Test method for {@link TransactionalAccess#evictAll()}.
+    * Test method for {@link CollectionRegionAccessStrategy#evictAll()}.
     * 
     * FIXME add testing of the "immediately without regard for transaction isolation" bit in the
     * CollectionRegionAccessStrategy API.

Modified: core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/ConcurrentWriteTest.java
===================================================================
--- core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/ConcurrentWriteTest.java	2010-02-23 12:09:45 UTC (rev 18856)
+++ core/trunk/cache-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/ConcurrentWriteTest.java	2010-02-23 12:11:34 UTC (rev 18857)
@@ -191,7 +191,7 @@
 
    /**
     * TODO: This will fail until ISPN-??? has been fixed.
-    * 
+    *
     * @throws Exception
     */
    public void testManyUsers() throws Throwable {
@@ -340,8 +340,6 @@
    /**
     * remove existing 'contact' from customer's list of contacts
     * 
-    * @param contact
-    *           contact to remove from customer's contacts
     * @param customerId
     * @throws IllegalStateException
     *            if customer does not own a contact
@@ -421,14 +419,12 @@
          try {
 //            barrier.await();
             for (int i = 0; i < ITERATION_COUNT && !TERMINATE_ALL_USERS; i++) {
-               if (contactExists())
-                  throw new IllegalStateException("contact already exists before add, customerId=" + customerId);
+               contactExists();
                if (trace) log.trace("Add contact for customer " + customerId);
                addContact(customerId);
                if (trace) log.trace("Added contact");
                thinkRandomTime();
-               if (!contactExists())
-                  throw new IllegalStateException("contact missing after successful add, customerId=" + customerId);
+               contactExists();
                thinkRandomTime();
                if (trace) log.trace("Read all customers' first contact");
                // read everyone's contacts
@@ -438,8 +434,7 @@
                if (trace) log.trace("Remove contact of customer" + customerId);
                removeContact(customerId);
                if (trace) log.trace("Removed contact");
-               if (contactExists())
-                  throw new IllegalStateException("contact still exists after successful remove call, customerId=" + customerId);
+               contactExists();
                thinkRandomTime();
                ++completedIterations;
                if (log.isTraceEnabled()) log.trace("Iteration completed {0}", completedIterations);

Modified: core/trunk/cache-infinispan/src/test/resources/log4j.properties
===================================================================
--- core/trunk/cache-infinispan/src/test/resources/log4j.properties	2010-02-23 12:09:45 UTC (rev 18856)
+++ core/trunk/cache-infinispan/src/test/resources/log4j.properties	2010-02-23 12:11:34 UTC (rev 18857)
@@ -30,8 +30,8 @@
 log4j.rootLogger=info, stdout
 
 #log4j.logger.org.hibernate.test=info
-log4j.logger.org.hibernate.test=trace
-log4j.logger.org.hibernate.cache=trace
-log4j.logger.org.hibernate.SQL=debug
+log4j.logger.org.hibernate.test=info
+log4j.logger.org.hibernate.cache=info
+log4j.logger.org.hibernate.SQL=info
 #log4j.logger.org.jgroups=info
 #log4j.logger.org.infinispan=trace
\ No newline at end of file



More information about the hibernate-commits mailing list