[infinispan-commits] Infinispan SVN: r2245 - in branches/4.1.x/core/src: main/java/org/infinispan/eviction and 2 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Aug 18 08:45:43 EDT 2010
Author: vblagojevic at jboss.com
Date: 2010-08-18 08:45:42 -0400 (Wed, 18 Aug 2010)
New Revision: 2245
Modified:
branches/4.1.x/core/src/main/java/org/infinispan/container/DataContainer.java
branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java
branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManager.java
branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java
branches/4.1.x/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java
branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java
branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java
Log:
[ISPN-598] - Fix LIRS with passivation
Modified: branches/4.1.x/core/src/main/java/org/infinispan/container/DataContainer.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/container/DataContainer.java 2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/main/java/org/infinispan/container/DataContainer.java 2010-08-18 12:45:42 UTC (rev 2245)
@@ -127,13 +127,4 @@
* Purges entries that have passed their expiry time
*/
void purgeExpired();
-
-
- /**
- * Returns a set of eviction candidates. Containers not supporting eviction should return an
- * empty set.
- *
- * @return a set of entries that should be evicted from this container.
- */
- Set<InternalCacheEntry> getEvictionCandidates();
}
Modified: branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java 2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java 2010-08-18 12:45:42 UTC (rev 2245)
@@ -4,11 +4,7 @@
import java.util.AbstractSet;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -16,24 +12,16 @@
import net.jcip.annotations.ThreadSafe;
-import org.infinispan.Cache;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalEntryFactory;
+import org.infinispan.eviction.EvictionManager;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.eviction.EvictionThreadPolicy;
-import org.infinispan.eviction.PassivationManager;
import org.infinispan.factories.annotations.Inject;
-import org.infinispan.factories.annotations.Start;
-import org.infinispan.interceptors.PassivationInterceptor;
-import org.infinispan.loaders.CacheLoaderException;
-import org.infinispan.loaders.CacheLoaderManager;
-import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.util.Immutables;
import org.infinispan.util.concurrent.BoundedConcurrentHashMap;
import org.infinispan.util.concurrent.BoundedConcurrentHashMap.Eviction;
import org.infinispan.util.concurrent.BoundedConcurrentHashMap.EvictionListener;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
/**
* Simple data container that does not order entries for eviction, implemented using two ConcurrentHashMaps, one for
@@ -54,10 +42,8 @@
final ConcurrentMap<Object, InternalCacheEntry> mortalEntries;
final AtomicInteger numEntries = new AtomicInteger(0);
final InternalEntryFactory entryFactory;
- final DefaultEvictionListener evictionListener;
- protected Cache<Object, Object> cache;
- private PassivationManager passivator;
- private CacheNotifier notifier;
+ final DefaultEvictionListener evictionListener;
+ private EvictionManager evictionManager;
protected DefaultDataContainer(int concurrencyLevel) {
immortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(128, 0.75f, concurrencyLevel);
@@ -97,10 +83,8 @@
}
@Inject
- public void initialize(Cache<Object, Object> cache, PassivationManager passivator, CacheNotifier notifier) {
- this.cache = cache;
- this.passivator = passivator;
- this.notifier = notifier;
+ public void initialize(EvictionManager evictionManager) {
+ this.evictionManager = evictionManager;
}
public static DataContainer boundedDataContainer(int concurrencyLevel, int maxEntries, EvictionStrategy strategy, EvictionThreadPolicy policy) {
@@ -116,11 +100,6 @@
return new DefaultDataContainer(concurrencyLevel);
}
- @Override
- public Set<InternalCacheEntry> getEvictionCandidates() {
- return Collections.emptySet();
- }
-
public InternalCacheEntry peek(Object key) {
InternalCacheEntry e = immortalEntries.get(key);
if (e == null) e = mortalEntries.get(key);
@@ -243,19 +222,16 @@
return new EntryIterator(immortalEntries.values().iterator(), mortalEntries.values().iterator());
}
- private class DefaultEvictionListener implements EvictionListener<Object, InternalCacheEntry> {
- private final Log log = LogFactory.getLog(DefaultEvictionListener.class);
-
+ private class DefaultEvictionListener implements EvictionListener<Object, InternalCacheEntry> {
@Override
- public void evicted(Object key, InternalCacheEntry value) {
- notifier.notifyCacheEntryEvicted(key, true, null);
- try {
- passivator.passivate(key, value, null);
- } catch (CacheLoaderException e) {
- log.warn("Unable to passivate entry under {0}", key, e);
- }
- notifier.notifyCacheEntryEvicted(key, false, null);
+ public void preEvict(Object key) {
+ evictionManager.preEvict(key);
}
+
+ @Override
+ public void postEvict(Object key, InternalCacheEntry value) {
+ evictionManager.postEvict(key, value);
+ }
}
private class KeySet extends AbstractSet<Object> {
Modified: branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManager.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManager.java 2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManager.java 2010-08-18 12:45:42 UTC (rev 2245)
@@ -1,6 +1,8 @@
package org.infinispan.eviction;
import net.jcip.annotations.ThreadSafe;
+
+import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
@@ -31,4 +33,8 @@
* @return true if eviction is enabled, false otherwise
*/
boolean isEnabled();
+
+ void preEvict(Object key);
+
+ void postEvict(Object key, InternalCacheEntry value);
}
Modified: branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java 2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java 2010-08-18 12:45:42 UTC (rev 2245)
@@ -1,30 +1,34 @@
package org.infinispan.eviction;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
import net.jcip.annotations.ThreadSafe;
-import org.infinispan.AdvancedCache;
-import org.infinispan.Cache;
+
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.InvocationContextContainer;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
+import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
+import org.infinispan.marshall.MarshalledValue;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.infinispan.context.Flag.FAIL_SILENTLY;
-
@ThreadSafe
public class EvictionManagerImpl implements EvictionManager {
private static final Log log = LogFactory.getLog(EvictionManagerImpl.class);
@@ -32,24 +36,33 @@
ScheduledFuture <?> evictionTask;
// components to be injected
- ScheduledExecutorService executor;
- Configuration configuration;
- Cache<Object, Object> cache;
- CacheLoaderManager cacheLoaderManager;
- DataContainer dataContainer;
- CacheStore cacheStore;
- boolean enabled;
- int maxEntries;
+ private ScheduledExecutorService executor;
+ private Configuration configuration;
+ private CacheLoaderManager cacheLoaderManager;
+ private DataContainer dataContainer;
+ private CacheStore cacheStore;
+ private CacheNotifier cacheNotifier;
+ private LockManager lockManager;
+ private PassivationManager passivator;
+ private InvocationContextContainer ctxContainer;
+
+
+ private boolean enabled;
volatile CountDownLatch startLatch = new CountDownLatch(1);
@Inject
public void initialize(@ComponentName(KnownComponentNames.EVICTION_SCHEDULED_EXECUTOR) ScheduledExecutorService executor,
- Configuration configuration, Cache<Object, Object> cache, DataContainer dataContainer, CacheLoaderManager cacheLoaderManager) {
+ Configuration configuration, DataContainer dataContainer,
+ CacheLoaderManager cacheLoaderManager, CacheNotifier cacheNotifier,
+ LockManager lockManager, PassivationManager passivator, InvocationContextContainer ctxContainer) {
this.executor = executor;
this.configuration = configuration;
- this.cache = cache;
this.dataContainer = dataContainer;
this.cacheLoaderManager = cacheLoaderManager;
+ this.cacheNotifier = cacheNotifier;
+ this.lockManager = lockManager;
+ this.passivator = passivator;
+ this.ctxContainer = ctxContainer;
}
@Start(priority = 55)
@@ -57,8 +70,7 @@
public void start() {
// first check if eviction is enabled!
enabled = configuration.getEvictionStrategy() != EvictionStrategy.NONE;
- if (enabled) {
- maxEntries = configuration.getEvictionMaxEntries();
+ if (enabled) {
if (cacheLoaderManager != null && cacheLoaderManager.isEnabled())
cacheStore = cacheLoaderManager.getCacheStore();
// Set up the eviction timer task
@@ -106,31 +118,7 @@
} catch (Exception e) {
log.warn("Caught exception purging cache store!", e);
}
- }
-
- // finally iterate through data container if too big
- Set<InternalCacheEntry> evictionCandidates = dataContainer.getEvictionCandidates();
- if(!evictionCandidates.isEmpty()) {
- AdvancedCache<Object, Object> ac = cache.getAdvancedCache();
- if (trace) {
- log.trace("Evicting data container entries");
- start = System.currentTimeMillis();
- }
- for (InternalCacheEntry entry : evictionCandidates) {
- Object k = entry.getKey();
- try {
- if (trace) log.trace("Attempting to evict key [{0}]", k);
- ac.withFlags(FAIL_SILENTLY).evict(k);
- }
- catch (Exception e) {
- log.warn("Caught exception when iterating through data container. Current entry is under key [{0}]", e, k);
- }
- }
- if (trace)
- log.trace("Eviction process completed in {0}", Util.prettyPrintTime(System.currentTimeMillis() - start));
- } else {
- if (trace) log.trace("Data container is smaller than or equal to the maxEntries; not doing anything");
- }
+ }
}
public boolean isEnabled() {
@@ -148,4 +136,83 @@
processEviction();
}
}
+
+ @Override
+ public void preEvict(Object key) {
+ try {
+ acquireLock(getInvocationContext(), key);
+ } catch (Exception e) {
+ log.warn("Could not acquire lock for eviction of {0}", key, e);
+ }
+ cacheNotifier.notifyCacheEntryEvicted(key, true, null);
+ }
+
+ @Override
+ public void postEvict(Object key, InternalCacheEntry value) {
+ try {
+ passivator.passivate(key, value, null);
+ } catch (CacheLoaderException e) {
+ log.warn("Unable to passivate entry under {0}", key, e);
+ }
+ cacheNotifier.notifyCacheEntryEvicted(key, false, null);
+ releaseLock(key);
+ }
+
+ private InvocationContext getInvocationContext(){
+ return ctxContainer.getInvocationContext();
+ }
+
+ /**
+ * Attempts to lock an entry if the lock isn't already held in the current scope, and records the lock in the
+ * context.
+ *
+ * @param ctx context
+ * @param key Key to lock
+ * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was
+ * already held)
+ * @throws InterruptedException if interrupted
+ * @throws org.infinispan.util.concurrent.TimeoutException
+ * if we are unable to acquire the lock after a specified timeout.
+ */
+ private boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException {
+ // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
+ // lock which may be shared with another key that we have a lock for already.
+ // nothing wrong, just means that we fail to record the lock. And that is a problem.
+ // Better to check our records and lock again if necessary.
+
+ boolean shouldSkipLocking = ctx.hasFlag(Flag.SKIP_LOCKING);
+
+ if (!ctx.hasLockedKey(key) && !shouldSkipLocking) {
+ if (lockManager.lockAndRecord(key, ctx)) {
+ return true;
+ } else {
+ Object owner = lockManager.getOwner(key);
+ // if lock cannot be acquired, expose the key itself, not the marshalled value
+ if (key instanceof MarshalledValue) {
+ key = ((MarshalledValue) key).get();
+ }
+ throw new TimeoutException("Unable to acquire lock after [" + Util.prettyPrintTime(getLockAcquisitionTimeout(ctx)) + "] on key [" + key + "] for requestor [" +
+ ctx.getLockOwner() + "]! Lock held by [" + owner + "]");
+ }
+ } else {
+ if (trace) {
+ if (shouldSkipLocking)
+ log.trace("SKIP_LOCKING flag used!");
+ else
+ log.trace("Already own lock for entry");
+ }
+ }
+
+ return false;
+ }
+
+ public final void releaseLock(Object key) {
+ lockManager.unlock(key);
+ }
+
+ private long getLockAcquisitionTimeout(InvocationContext ctx) {
+ return ctx.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT) ?
+ 0 : configuration.getLockAcquisitionTimeout();
+ }
+
}
Modified: branches/4.1.x/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java 2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java 2010-08-18 12:45:42 UTC (rev 2245)
@@ -274,13 +274,18 @@
}
public interface EvictionListener<K, V> {
- void evicted(K key, V value);
+ void preEvict(K key);
+ void postEvict(K key, V value);
}
- static class NullEvictionListener<K,V> implements EvictionListener<K, V>{
- @Override
- public void evicted(K key, V value) {
- }
+ static class NullEvictionListener<K, V> implements EvictionListener<K, V> {
+ @Override
+ public void postEvict(K key, V value) {
+ }
+
+ @Override
+ public void preEvict(K key) {
+ }
}
public interface EvictionPolicy<K, V> {
@@ -421,12 +426,13 @@
}
while (isOverflow()) {
HashEntry<K, V> first = lruQueue.getLast();
+ segment.getEvictionListener().preEvict(first.key);
segment.remove(first.key, first.hash, null);
evicted.add(first);
- }
+ }
} finally {
- accessQueue.clear();
- }
+ accessQueue.clear();
+ }
return evicted;
}
@@ -595,8 +601,9 @@
return evicted;
}
- private void removeFromSegment(Set<HashEntry<K, V>> evicted) {
+ private void removeFromSegment(Set<HashEntry<K, V>> evicted) {
for (HashEntry<K, V> e : evicted) {
+ segment.getEvictionListener().preEvict(e.key);
segment.remove(e.key, e.hash, null);
}
}
@@ -756,6 +763,10 @@
static final <K,V> Segment<K,V>[] newArray(int i) {
return new Segment[i];
}
+
+ EvictionListener<K, V> getEvictionListener() {
+ return evictionListener;
+ }
/**
* Sets table to new HashEntry array.
@@ -816,8 +827,8 @@
Set<HashEntry<K, V>> evicted = attemptEviction(false);
// piggyback listener invocation on callers thread outside lock
if (evicted != null) {
- for (HashEntry<K, V> he : evicted) {
- evictionListener.evicted(he.key, he.value);
+ for (HashEntry<K, V> he : evicted) {
+ evictionListener.postEvict(he.key, he.value);
}
}
}
@@ -895,8 +906,8 @@
unlock();
// piggyback listener invocation on callers thread outside lock
if (evicted != null) {
- for (HashEntry<K, V> he : evicted) {
- evictionListener.evicted(he.key, he.value);
+ for (HashEntry<K, V> he : evicted) {
+ evictionListener.postEvict(he.key, he.value);
}
}
}
@@ -923,8 +934,8 @@
unlock();
// piggyback listener invocation on callers thread outside lock
if(evicted != null) {
- for (HashEntry<K, V> he : evicted) {
- evictionListener.evicted(he.key, he.value);
+ for (HashEntry<K, V> he : evicted) {
+ evictionListener.postEvict(he.key, he.value);
}
}
}
@@ -982,8 +993,8 @@
unlock();
// piggyback listener invocation on callers thread outside lock
if(evicted != null) {
- for (HashEntry<K, V> he : evicted) {
- evictionListener.evicted(he.key, he.value);
+ for (HashEntry<K, V> he : evicted) {
+ evictionListener.postEvict(he.key, he.value);
}
}
}
Modified: branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java 2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java 2010-08-18 12:45:42 UTC (rev 2245)
@@ -33,7 +33,7 @@
cfg.setEvictionWakeUpInterval(0);
ScheduledExecutorService mockService = createMock(ScheduledExecutorService.class);
- em.initialize(mockService, cfg, null, null, null);
+ em.initialize(mockService, cfg, null, null, null,null,null,null);
replay(mockService);
em.start();
@@ -47,7 +47,7 @@
cfg.setEvictionWakeUpInterval(789);
ScheduledExecutorService mockService = createMock(ScheduledExecutorService.class);
- em.initialize(mockService, cfg, null, null, null);
+ em.initialize(mockService, cfg, null, null, null,null,null,null);
ScheduledFuture mockFuture = createNiceMock(ScheduledFuture.class);
expect(mockService.scheduleWithFixedDelay(isA(EvictionManagerImpl.ScheduledTask.class), eq((long) 789),
Modified: branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java 2010-08-17 16:44:26 UTC (rev 2244)
+++ branches/4.1.x/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java 2010-08-18 12:45:42 UTC (rev 2245)
@@ -42,7 +42,7 @@
runTest(EvictionThreadPolicy.PIGGYBACK, EvictionStrategy.LRU);
}
- @Test (enabled = false, description = "See ISPN-598")
+
public void testPiggybackLIRS() {
runTest(EvictionThreadPolicy.PIGGYBACK, EvictionStrategy.LIRS);
}
@@ -59,7 +59,7 @@
runTest(EvictionThreadPolicy.DEFAULT, EvictionStrategy.LRU);
}
- @Test (enabled = false, description = "See ISPN-598")
+
public void testDefaultLIRS() {
runTest(EvictionThreadPolicy.DEFAULT, EvictionStrategy.LIRS);
}
More information about the infinispan-commits
mailing list