[infinispan-commits] Infinispan SVN: r2245 - in branches/4.1.x/core/src: main/java/org/infinispan/eviction and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Aug 18 08:45:43 EDT 2010


Author: vblagojevic at jboss.com
Date: 2010-08-18 08:45:42 -0400 (Wed, 18 Aug 2010)
New Revision: 2245

Modified:
   branches/4.1.x/core/src/main/java/org/infinispan/container/DataContainer.java
   branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java
   branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManager.java
   branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java
   branches/4.1.x/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java
   branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java
   branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java
Log:
[ISPN-598] - Fix LIRS with passivation

Modified: branches/4.1.x/core/src/main/java/org/infinispan/container/DataContainer.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/container/DataContainer.java	2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/main/java/org/infinispan/container/DataContainer.java	2010-08-18 12:45:42 UTC (rev 2245)
@@ -127,13 +127,4 @@
     * Purges entries that have passed their expiry time
     */
    void purgeExpired();
-   
-   
-   /**
-    * Returns a set of eviction candidates. Containers not supporting eviction should return an
-    * empty set.
-    * 
-    * @return a set of entries that should be evicted from this container.
-    */
-   Set<InternalCacheEntry> getEvictionCandidates();
 }

Modified: branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java	2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java	2010-08-18 12:45:42 UTC (rev 2245)
@@ -4,11 +4,7 @@
 import java.util.AbstractSet;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -16,24 +12,16 @@
 
 import net.jcip.annotations.ThreadSafe;
 
-import org.infinispan.Cache;
 import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.container.entries.InternalEntryFactory;
+import org.infinispan.eviction.EvictionManager;
 import org.infinispan.eviction.EvictionStrategy;
 import org.infinispan.eviction.EvictionThreadPolicy;
-import org.infinispan.eviction.PassivationManager;
 import org.infinispan.factories.annotations.Inject;
-import org.infinispan.factories.annotations.Start;
-import org.infinispan.interceptors.PassivationInterceptor;
-import org.infinispan.loaders.CacheLoaderException;
-import org.infinispan.loaders.CacheLoaderManager;
-import org.infinispan.notifications.cachelistener.CacheNotifier;
 import org.infinispan.util.Immutables;
 import org.infinispan.util.concurrent.BoundedConcurrentHashMap;
 import org.infinispan.util.concurrent.BoundedConcurrentHashMap.Eviction;
 import org.infinispan.util.concurrent.BoundedConcurrentHashMap.EvictionListener;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
 
 /**
  * Simple data container that does not order entries for eviction, implemented using two ConcurrentHashMaps, one for
@@ -54,10 +42,8 @@
    final ConcurrentMap<Object, InternalCacheEntry> mortalEntries;
    final AtomicInteger numEntries = new AtomicInteger(0);
    final InternalEntryFactory entryFactory;
-   final DefaultEvictionListener evictionListener;
-   protected Cache<Object, Object> cache;
-   private PassivationManager passivator;
-   private CacheNotifier notifier;
+   final DefaultEvictionListener evictionListener;   
+   private EvictionManager evictionManager;   
 
    protected DefaultDataContainer(int concurrencyLevel) {
       immortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(128, 0.75f, concurrencyLevel);
@@ -97,10 +83,8 @@
    }
 
    @Inject
-   public void initialize(Cache<Object, Object> cache, PassivationManager passivator, CacheNotifier notifier) {
-      this.cache = cache;
-      this.passivator = passivator;
-      this.notifier = notifier;
+   public void initialize(EvictionManager evictionManager) {
+      this.evictionManager = evictionManager;     
    }
 
    public static DataContainer boundedDataContainer(int concurrencyLevel, int maxEntries, EvictionStrategy strategy, EvictionThreadPolicy policy) {
@@ -116,11 +100,6 @@
       return new DefaultDataContainer(concurrencyLevel);
    }
 
-   @Override
-   public Set<InternalCacheEntry> getEvictionCandidates() {
-      return Collections.emptySet();
-   }
-
    public InternalCacheEntry peek(Object key) {
       InternalCacheEntry e = immortalEntries.get(key);
       if (e == null) e = mortalEntries.get(key);
@@ -243,19 +222,16 @@
       return new EntryIterator(immortalEntries.values().iterator(), mortalEntries.values().iterator());
    }
 
-   private class DefaultEvictionListener implements EvictionListener<Object, InternalCacheEntry> {
-      private final Log log = LogFactory.getLog(DefaultEvictionListener.class);
-
+   private class DefaultEvictionListener implements EvictionListener<Object, InternalCacheEntry> {      
       @Override
-      public void evicted(Object key, InternalCacheEntry value) {
-         notifier.notifyCacheEntryEvicted(key, true, null);
-         try {
-            passivator.passivate(key, value, null);
-         } catch (CacheLoaderException e) {
-            log.warn("Unable to passivate entry under {0}", key, e);
-         }
-         notifier.notifyCacheEntryEvicted(key, false, null);
+      public void preEvict(Object key) {
+         evictionManager.preEvict(key);
       }
+      
+      @Override
+      public void postEvict(Object key, InternalCacheEntry value) {
+         evictionManager.postEvict(key, value);
+      }      
    }
 
    private class KeySet extends AbstractSet<Object> {

Modified: branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManager.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManager.java	2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManager.java	2010-08-18 12:45:42 UTC (rev 2245)
@@ -1,6 +1,8 @@
 package org.infinispan.eviction;
 
 import net.jcip.annotations.ThreadSafe;
+
+import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.factories.scopes.Scope;
 import org.infinispan.factories.scopes.Scopes;
 
@@ -31,4 +33,8 @@
     * @return true if eviction is enabled, false otherwise
     */
    boolean isEnabled();
+   
+   void preEvict(Object key);
+   
+   void postEvict(Object key, InternalCacheEntry value);
 }

Modified: branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java	2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java	2010-08-18 12:45:42 UTC (rev 2245)
@@ -1,30 +1,34 @@
 package org.infinispan.eviction;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
 import net.jcip.annotations.ThreadSafe;
-import org.infinispan.AdvancedCache;
-import org.infinispan.Cache;
+
 import org.infinispan.config.Configuration;
 import org.infinispan.container.DataContainer;
 import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.InvocationContextContainer;
 import org.infinispan.factories.KnownComponentNames;
 import org.infinispan.factories.annotations.ComponentName;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.factories.annotations.Stop;
+import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.CacheLoaderManager;
 import org.infinispan.loaders.CacheStore;
+import org.infinispan.marshall.MarshalledValue;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
 import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.infinispan.context.Flag.FAIL_SILENTLY;
-
 @ThreadSafe
 public class EvictionManagerImpl implements EvictionManager {
    private static final Log log = LogFactory.getLog(EvictionManagerImpl.class);
@@ -32,24 +36,33 @@
    ScheduledFuture <?> evictionTask;
 
    // components to be injected
-   ScheduledExecutorService executor;
-   Configuration configuration;
-   Cache<Object, Object> cache;
-   CacheLoaderManager cacheLoaderManager;
-   DataContainer dataContainer;
-   CacheStore cacheStore;
-   boolean enabled;
-   int maxEntries;
+   private ScheduledExecutorService executor;
+   private Configuration configuration;
+   private CacheLoaderManager cacheLoaderManager;
+   private DataContainer dataContainer;
+   private CacheStore cacheStore;
+   private CacheNotifier cacheNotifier;
+   private LockManager lockManager;
+   private PassivationManager passivator;
+   private InvocationContextContainer ctxContainer;
+   
+   
+   private boolean enabled;   
    volatile CountDownLatch startLatch = new CountDownLatch(1);
 
    @Inject
    public void initialize(@ComponentName(KnownComponentNames.EVICTION_SCHEDULED_EXECUTOR) ScheduledExecutorService executor,
-                          Configuration configuration, Cache<Object, Object> cache, DataContainer dataContainer, CacheLoaderManager cacheLoaderManager) {
+            Configuration configuration, DataContainer dataContainer,
+            CacheLoaderManager cacheLoaderManager, CacheNotifier cacheNotifier,
+            LockManager lockManager, PassivationManager passivator, InvocationContextContainer ctxContainer) {
       this.executor = executor;
       this.configuration = configuration;
-      this.cache = cache;
       this.dataContainer = dataContainer;
       this.cacheLoaderManager = cacheLoaderManager;
+      this.cacheNotifier = cacheNotifier;
+      this.lockManager = lockManager;
+      this.passivator = passivator;
+      this.ctxContainer = ctxContainer;
    }
 
    @Start(priority = 55)
@@ -57,8 +70,7 @@
    public void start() {
       // first check if eviction is enabled!
       enabled = configuration.getEvictionStrategy() != EvictionStrategy.NONE;
-      if (enabled) {
-         maxEntries = configuration.getEvictionMaxEntries();
+      if (enabled) {        
          if (cacheLoaderManager != null && cacheLoaderManager.isEnabled())
             cacheStore = cacheLoaderManager.getCacheStore();
          // Set up the eviction timer task
@@ -106,31 +118,7 @@
          } catch (Exception e) {
             log.warn("Caught exception purging cache store!", e);
          }
-      }
-
-      // finally iterate through data container if too big
-      Set<InternalCacheEntry> evictionCandidates = dataContainer.getEvictionCandidates();
-      if(!evictionCandidates.isEmpty()) {      
-         AdvancedCache<Object, Object> ac = cache.getAdvancedCache();
-         if (trace) {
-            log.trace("Evicting data container entries");
-            start = System.currentTimeMillis();
-         } 
-         for (InternalCacheEntry entry : evictionCandidates) {
-            Object k = entry.getKey();
-            try {                
-                  if (trace) log.trace("Attempting to evict key [{0}]", k);
-                  ac.withFlags(FAIL_SILENTLY).evict(k);
-               }
-            catch (Exception e) {
-               log.warn("Caught exception when iterating through data container.  Current entry is under key [{0}]", e, k);
-            }
-         }               
-         if (trace)
-            log.trace("Eviction process completed in {0}", Util.prettyPrintTime(System.currentTimeMillis() - start));
-      } else {
-         if (trace) log.trace("Data container is smaller than or equal to the maxEntries; not doing anything");
-      }
+      }    
    }
 
    public boolean isEnabled() {
@@ -148,4 +136,83 @@
          processEviction();
       }
    }
+   
+   @Override
+   public void preEvict(Object key) {
+      try {
+         acquireLock(getInvocationContext(), key);
+      } catch (Exception e) {
+         log.warn("Could not acquire lock for eviction of {0}", key, e);
+      }
+      cacheNotifier.notifyCacheEntryEvicted(key, true, null);
+   }
+
+   @Override
+   public void postEvict(Object key, InternalCacheEntry value) {
+      try {
+         passivator.passivate(key, value, null);
+      } catch (CacheLoaderException e) {
+         log.warn("Unable to passivate entry under {0}", key, e);
+      }
+      cacheNotifier.notifyCacheEntryEvicted(key, false, null);
+      releaseLock(key);
+   }
+   
+   private InvocationContext getInvocationContext(){
+      return ctxContainer.getInvocationContext();
+   }
+   
+   /**
+    * Attempts to lock an entry if the lock isn't already held in the current scope, and records the lock in the
+    * context.
+    *
+    * @param ctx context
+    * @param key Key to lock
+    * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was
+    *         already held)
+    * @throws InterruptedException if interrupted
+    * @throws org.infinispan.util.concurrent.TimeoutException
+    *                              if we are unable to acquire the lock after a specified timeout.
+    */
+   private boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException {
+      // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
+      // lock which may be shared with another key that we have a lock for already.
+      // nothing wrong, just means that we fail to record the lock.  And that is a problem.
+      // Better to check our records and lock again if necessary.
+
+      boolean shouldSkipLocking = ctx.hasFlag(Flag.SKIP_LOCKING);
+
+      if (!ctx.hasLockedKey(key) && !shouldSkipLocking) {
+         if (lockManager.lockAndRecord(key, ctx)) {
+            return true;
+         } else {
+            Object owner = lockManager.getOwner(key);
+            // if lock cannot be acquired, expose the key itself, not the marshalled value
+            if (key instanceof MarshalledValue) {
+               key = ((MarshalledValue) key).get();
+            }
+            throw new TimeoutException("Unable to acquire lock after [" + Util.prettyPrintTime(getLockAcquisitionTimeout(ctx)) + "] on key [" + key + "] for requestor [" +
+                  ctx.getLockOwner() + "]! Lock held by [" + owner + "]");
+         }
+      } else {
+         if (trace) {
+            if (shouldSkipLocking)
+               log.trace("SKIP_LOCKING flag used!");
+            else
+               log.trace("Already own lock for entry");
+         }
+      }
+
+      return false;
+   }
+   
+   public final void releaseLock(Object key) {
+      lockManager.unlock(key);
+   }
+   
+   private long getLockAcquisitionTimeout(InvocationContext ctx) {
+      return ctx.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT) ?
+            0 : configuration.getLockAcquisitionTimeout();
+   }
+
 }

Modified: branches/4.1.x/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java	2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java	2010-08-18 12:45:42 UTC (rev 2245)
@@ -274,13 +274,18 @@
    }
    
    public interface EvictionListener<K, V> {
-       void evicted(K key, V value);
+       void preEvict(K key);
+       void postEvict(K key, V value);
    }
    
-   static class NullEvictionListener<K,V> implements EvictionListener<K, V>{
-       @Override
-       public void evicted(K key, V value) {            
-       }        
+   static class NullEvictionListener<K, V> implements EvictionListener<K, V> {
+      @Override
+      public void postEvict(K key, V value) {
+      }
+
+      @Override
+      public void preEvict(K key) {
+      }
    }
 
    public interface EvictionPolicy<K, V> {
@@ -421,12 +426,13 @@
                }
                while (isOverflow()) {
                    HashEntry<K, V> first = lruQueue.getLast();
+                   segment.getEvictionListener().preEvict(first.key);
                    segment.remove(first.key, first.hash, null);
                    evicted.add(first);
-               }
+               }              
            } finally {
-               accessQueue.clear();
-           }
+               accessQueue.clear();               
+           }          
            return evicted;
        }
 
@@ -595,8 +601,9 @@
            return evicted;
        }
 
-       private void removeFromSegment(Set<HashEntry<K, V>> evicted) {
+       private void removeFromSegment(Set<HashEntry<K, V>> evicted) {          
            for (HashEntry<K, V> e : evicted) {
+               segment.getEvictionListener().preEvict(e.key);
                segment.remove(e.key, e.hash, null);
            }
        }
@@ -756,6 +763,10 @@
         static final <K,V> Segment<K,V>[] newArray(int i) {
             return new Segment[i];
         }
+        
+        EvictionListener<K, V> getEvictionListener() {
+           return evictionListener;
+        }
 
         /**
          * Sets table to new HashEntry array.
@@ -816,8 +827,8 @@
                        Set<HashEntry<K, V>> evicted = attemptEviction(false);
                        // piggyback listener invocation on callers thread outside lock
                        if (evicted != null) {
-                           for (HashEntry<K, V> he : evicted) {
-                               evictionListener.evicted(he.key, he.value);
+                           for (HashEntry<K, V> he : evicted) {                               
+                               evictionListener.postEvict(he.key, he.value);
                            }
                        }
                    }
@@ -895,8 +906,8 @@
                unlock();
                // piggyback listener invocation on callers thread outside lock   
                if (evicted != null) {
-                   for (HashEntry<K, V> he : evicted) {
-                       evictionListener.evicted(he.key, he.value);
+                   for (HashEntry<K, V> he : evicted) {                       
+                       evictionListener.postEvict(he.key, he.value);
                    }
                }
            }
@@ -923,8 +934,8 @@
                unlock();
                // piggyback listener invocation on callers thread outside lock
                if(evicted != null) {
-                   for (HashEntry<K, V> he : evicted) {
-                       evictionListener.evicted(he.key, he.value);
+                   for (HashEntry<K, V> he : evicted) {                       
+                       evictionListener.postEvict(he.key, he.value);
                    }                
                }
            }
@@ -982,8 +993,8 @@
               unlock();
               // piggyback listener invocation on callers thread outside lock
               if(evicted != null) {
-                  for (HashEntry<K, V> he : evicted) {
-                      evictionListener.evicted(he.key, he.value);
+                  for (HashEntry<K, V> he : evicted) {                      
+                      evictionListener.postEvict(he.key, he.value);
                   }                
               }
           }

Modified: branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java	2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java	2010-08-18 12:45:42 UTC (rev 2245)
@@ -33,7 +33,7 @@
       cfg.setEvictionWakeUpInterval(0);
 
       ScheduledExecutorService mockService = createMock(ScheduledExecutorService.class);
-      em.initialize(mockService, cfg, null, null, null);
+      em.initialize(mockService, cfg, null, null, null,null,null,null);
       replay(mockService);
       em.start();
 
@@ -47,7 +47,7 @@
       cfg.setEvictionWakeUpInterval(789);
 
       ScheduledExecutorService mockService = createMock(ScheduledExecutorService.class);
-      em.initialize(mockService, cfg, null, null, null);
+      em.initialize(mockService, cfg, null, null, null,null,null,null);
 
       ScheduledFuture mockFuture = createNiceMock(ScheduledFuture.class);
       expect(mockService.scheduleWithFixedDelay(isA(EvictionManagerImpl.ScheduledTask.class), eq((long) 789),

Modified: branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java	2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java	2010-08-18 12:45:42 UTC (rev 2245)
@@ -42,7 +42,7 @@
       runTest(EvictionThreadPolicy.PIGGYBACK, EvictionStrategy.LRU);
    }
 
-   @Test (enabled = false, description = "See ISPN-598")
+   
    public void testPiggybackLIRS() {
       runTest(EvictionThreadPolicy.PIGGYBACK, EvictionStrategy.LIRS);
    }
@@ -59,7 +59,7 @@
       runTest(EvictionThreadPolicy.DEFAULT, EvictionStrategy.LRU);
    }
 
-   @Test (enabled = false, description = "See ISPN-598")
+   
    public void testDefaultLIRS() {
       runTest(EvictionThreadPolicy.DEFAULT, EvictionStrategy.LIRS);
    }



More information about the infinispan-commits mailing list