[infinispan-commits] Infinispan SVN: r2204 - in branches/4.1.x/core/src/main/java/org/infinispan: eviction and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Aug 12 09:35:42 EDT 2010
Author: manik.surtani at jboss.com
Date: 2010-08-12 09:35:41 -0400 (Thu, 12 Aug 2010)
New Revision: 2204
Added:
branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManager.java
branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManagerImpl.java
Modified:
branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java
branches/4.1.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
branches/4.1.x/core/src/main/java/org/infinispan/interceptors/PassivationInterceptor.java
branches/4.1.x/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java
Log:
[ISPN-582] (Eviction with passivation introduces a race where entries are invisible to callers)
Modified: branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java 2010-08-12 09:48:01 UTC (rev 2203)
+++ branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java 2010-08-12 13:35:41 UTC (rev 2204)
@@ -21,11 +21,19 @@
import org.infinispan.container.entries.InternalEntryFactory;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.eviction.EvictionThreadPolicy;
+import org.infinispan.eviction.PassivationManager;
import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.interceptors.PassivationInterceptor;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.util.Immutables;
import org.infinispan.util.concurrent.BoundedConcurrentHashMap;
import org.infinispan.util.concurrent.BoundedConcurrentHashMap.Eviction;
import org.infinispan.util.concurrent.BoundedConcurrentHashMap.EvictionListener;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
/**
* Simple data container that does not order entries for eviction, implemented using two ConcurrentHashMaps, one for
@@ -46,19 +54,20 @@
final ConcurrentMap<Object, InternalCacheEntry> mortalEntries;
final AtomicInteger numEntries = new AtomicInteger(0);
final InternalEntryFactory entryFactory;
- final DefaultEvictionListener evictionListener;
+ final DefaultEvictionListener evictionListener;
protected Cache<Object, Object> cache;
+ private PassivationManager passivator;
+ private CacheNotifier notifier;
-
protected DefaultDataContainer(int concurrencyLevel) {
immortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(128, 0.75f, concurrencyLevel);
mortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(64, 0.75f, concurrencyLevel);
entryFactory = new InternalEntryFactory();
evictionListener = null;
}
-
+
protected DefaultDataContainer(int concurrencyLevel, int maxEntries, EvictionStrategy strategy, EvictionThreadPolicy policy) {
-
+
// translate eviction policy and strategy
switch (policy) {
case DEFAULT:
@@ -70,16 +79,16 @@
default:
throw new IllegalArgumentException("No such eviction thread policy " + strategy);
}
-
+
Eviction eviction;
switch (strategy) {
case FIFO:
- case UNORDERED:
+ case UNORDERED:
case LRU:
- eviction = Eviction.LRU;
+ eviction = Eviction.LRU;
break;
case LIRS:
- eviction = Eviction.LIRS;
+ eviction = Eviction.LIRS;
break;
default:
throw new IllegalArgumentException("No such eviction strategy " + strategy);
@@ -88,14 +97,16 @@
mortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(64, 0.75f, concurrencyLevel);
entryFactory = new InternalEntryFactory();
}
-
+
@Inject
- public void initialize(Cache<Object, Object> cache) {
- this.cache = cache;
+ public void initialize(Cache<Object, Object> cache, PassivationManager passivator, CacheNotifier notifier) {
+ this.cache = cache;
+ this.passivator = passivator;
+ this.notifier = notifier;
}
-
+
public static DataContainer boundedDataContainer(int concurrencyLevel, int maxEntries, EvictionStrategy strategy, EvictionThreadPolicy policy) {
- return new DefaultDataContainer(concurrencyLevel, maxEntries, strategy,policy) {
+ return new DefaultDataContainer(concurrencyLevel, maxEntries, strategy, policy) {
@Override
public int size() {
@@ -108,11 +119,11 @@
}
};
}
-
+
public static DataContainer unBoundedDataContainer(int concurrencyLevel) {
- return new DefaultDataContainer(concurrencyLevel) ;
+ return new DefaultDataContainer(concurrencyLevel);
}
-
+
@Override
public Set<InternalCacheEntry> getEvictionCandidates() {
return Collections.emptySet();
@@ -129,7 +140,7 @@
if (e != null) {
if (e.isExpired()) {
if (mortalEntries.remove(k) != null) {
- numEntries.getAndDecrement();
+ numEntries.getAndDecrement();
}
e = null;
} else {
@@ -189,7 +200,7 @@
InternalCacheEntry ice = peek(k);
if (ice != null && ice.isExpired()) {
if (mortalEntries.remove(k) != null) {
- numEntries.getAndDecrement();
+ numEntries.getAndDecrement();
}
ice = null;
}
@@ -239,15 +250,22 @@
public Iterator<InternalCacheEntry> iterator() {
return new EntryIterator(immortalEntries.values().iterator(), mortalEntries.values().iterator());
}
-
- private class DefaultEvictionListener implements EvictionListener<Object, InternalCacheEntry>{
- final List <InternalCacheEntry> evicted = Collections.synchronizedList(new LinkedList<InternalCacheEntry>());
+ private class DefaultEvictionListener implements EvictionListener<Object, InternalCacheEntry> {
+ final List<InternalCacheEntry> evicted = Collections.synchronizedList(new LinkedList<InternalCacheEntry>());
+ private final Log log = LogFactory.getLog(DefaultEvictionListener.class);
+
@Override
public void evicted(Object key, InternalCacheEntry value) {
- evicted.add(value);
- }
-
+ notifier.notifyCacheEntryEvicted(key, true, null);
+ try {
+ passivator.passivate(key, value, null);
+ } catch (CacheLoaderException e) {
+ log.warn("Unable to passivate entry under {0}", key, e);
+ }
+ notifier.notifyCacheEntryEvicted(key, false, null);
+ }
+
public Set<InternalCacheEntry> getEvicted() {
Set<InternalCacheEntry> result = Collections.emptySet();
synchronized (evicted) {
@@ -260,14 +278,10 @@
return result;
}
}
-
- private class PiggybackEvictionListener extends DefaultEvictionListener{
-
+
+ private class PiggybackEvictionListener extends DefaultEvictionListener {
+
@Override
- public void evicted(Object key, InternalCacheEntry value) {
- cache.getAdvancedCache().evict(key);
- }
-
public Set<InternalCacheEntry> getEvicted() {
return Collections.emptySet();
}
Added: branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManager.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManager.java (rev 0)
+++ branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManager.java 2010-08-12 13:35:41 UTC (rev 2204)
@@ -0,0 +1,27 @@
+package org.infinispan.eviction;
+
+import net.jcip.annotations.ThreadSafe;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.factories.scopes.Scope;
+import org.infinispan.factories.scopes.Scopes;
+import org.infinispan.loaders.CacheLoaderException;
+
+/**
+ * A passivation manager
+ *
+ * @author Manik Surtani
+ * @version 4.1
+ */
+ at ThreadSafe
+ at Scope(Scopes.NAMED_CACHE)
+public interface PassivationManager {
+
+ boolean isEnabled();
+
+ void passivate(Object key, InternalCacheEntry entry, InvocationContext ctx) throws CacheLoaderException;
+
+ long getPassivationCount();
+
+ void resetPassivationCount();
+}
Added: branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManagerImpl.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManagerImpl.java (rev 0)
+++ branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManagerImpl.java 2010-08-12 13:35:41 UTC (rev 2204)
@@ -0,0 +1,74 @@
+package org.infinispan.eviction;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.config.ConfigurationException;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.loaders.CacheStore;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PassivationManagerImpl implements PassivationManager {
+
+ CacheLoaderManager cacheLoaderManager;
+ CacheNotifier notifier;
+ CacheStore cacheStore;
+ Configuration cfg;
+
+ boolean statsEnabled = false;
+ boolean enabled = false;
+ private static final Log log = LogFactory.getLog(PassivationManagerImpl.class);
+ private final AtomicLong passivations = new AtomicLong(0);
+
+ @Inject
+ public void inject(CacheLoaderManager cacheLoaderManager, CacheNotifier notifier, Configuration cfg) {
+ this.cacheLoaderManager = cacheLoaderManager;
+ this.notifier = notifier;
+ this.cfg = cfg;
+ }
+
+ @Start(priority = 11)
+ public void start() {
+ enabled = cfg.getCacheLoaderManagerConfig().isPassivation();
+ if (enabled) {
+ cacheStore = cacheLoaderManager == null ? null : cacheLoaderManager.getCacheStore();
+ if (cacheStore == null)
+ throw new ConfigurationException("passivation can only be used with a CacheLoader that implements CacheStore!");
+
+ enabled = cacheLoaderManager.isEnabled() && cacheLoaderManager.isUsingPassivation();
+ statsEnabled = cfg.isExposeJmxStatistics();
+ }
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ @Override
+ public void passivate(Object key, InternalCacheEntry entry, InvocationContext ctx) throws CacheLoaderException {
+ if (enabled) {
+ // notify listeners that this entry is about to be passivated
+ notifier.notifyCacheEntryPassivated(key, true, ctx);
+ log.trace("Passivating entry {0}", key);
+ cacheStore.store(entry);
+ notifier.notifyCacheEntryPassivated(key, false, ctx);
+ if (statsEnabled && entry != null) passivations.getAndIncrement();
+ }
+ }
+
+ public long getPassivationCount() {
+ return passivations.get();
+ }
+
+ public void resetPassivationCount() {
+ passivations.set(0L);
+ }
+}
Modified: branches/4.1.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java 2010-08-12 09:48:01 UTC (rev 2203)
+++ branches/4.1.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java 2010-08-12 13:35:41 UTC (rev 2204)
@@ -27,6 +27,7 @@
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.context.InvocationContextContainerImpl;
import org.infinispan.eviction.EvictionManager;
+import org.infinispan.eviction.PassivationManager;
import org.infinispan.factories.annotations.DefaultFactoryFor;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.marshall.StreamingMarshaller;
@@ -43,7 +44,7 @@
* @since 4.0
*/
@DefaultFactoryFor(classes = {CacheNotifier.class, EntryFactory.class, CommandsFactory.class,
- CacheLoaderManager.class, InvocationContextContainer.class,
+ CacheLoaderManager.class, InvocationContextContainer.class, PassivationManager.class,
BatchContainer.class, TransactionLog.class, EvictionManager.class, InvocationContextContainer.class})
public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
@Override
Modified: branches/4.1.x/core/src/main/java/org/infinispan/interceptors/PassivationInterceptor.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/interceptors/PassivationInterceptor.java 2010-08-12 09:48:01 UTC (rev 2203)
+++ branches/4.1.x/core/src/main/java/org/infinispan/interceptors/PassivationInterceptor.java 2010-08-12 13:35:41 UTC (rev 2204)
@@ -5,12 +5,14 @@
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.InvocationContext;
+import org.infinispan.eviction.PassivationManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.base.JmxStatsCommandInterceptor;
import org.infinispan.jmx.annotations.MBean;
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
+import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
import org.infinispan.notifications.cachelistener.CacheNotifier;
@@ -27,50 +29,34 @@
*/
@MBean(objectName = "Passivation", description = "Component that handles passivating entries to a CacheStore on eviction.")
public class PassivationInterceptor extends JmxStatsCommandInterceptor {
- private final AtomicLong passivations = new AtomicLong(0);
+
- CacheStore cacheStore;
- CacheNotifier notifier;
- CacheLoaderManager cacheLoaderManager;
+ PassivationManager passivator;
DataContainer dataContainer;
@Inject
- public void setDependencies(CacheNotifier notifier, CacheLoaderManager cacheLoaderManager, DataContainer dataContainer) {
- this.notifier = notifier;
- this.cacheLoaderManager = cacheLoaderManager;
+ public void setDependencies(PassivationManager passivator, DataContainer dataContainer) {
+ this.passivator = passivator;
this.dataContainer = dataContainer;
}
- @Start(priority = 15)
- public void start() {
- cacheStore = cacheLoaderManager == null ? null : cacheLoaderManager.getCacheStore();
- if (cacheStore == null)
- throw new ConfigurationException("passivation can only be used with a CacheLoader that implements CacheStore!");
- }
-
@Override
public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws Throwable {
- // notify listeners that this entry is about to be passivated
Object key = command.getKey();
- notifier.notifyCacheEntryPassivated(key, true, ctx);
- log.trace("Passivating entry {0}", key);
- InternalCacheEntry entryForStorage = dataContainer.get(key);
- cacheStore.store(entryForStorage);
- notifier.notifyCacheEntryPassivated(key, false, ctx);
- if (getStatisticsEnabled() && entryForStorage != null) passivations.getAndIncrement();
+ passivator.passivate(key, dataContainer.get(key), ctx);
return invokeNextInterceptor(ctx, command);
}
@ManagedOperation(description = "Resets statistics gathered by this component")
@Operation(displayName = "Reset statistics")
public void resetStatistics() {
- passivations.set(0);
+ passivator.resetPassivationCount();
}
@ManagedAttribute(description = "Number of passivation events")
@Metric(displayName = "Number of cache passivations", measurementType = MeasurementType.TRENDSUP)
public String getPassivations() {
if (!getStatisticsEnabled()) return "N/A";
- return String.valueOf(passivations.get());
+ return String.valueOf(passivator.getPassivationCount());
}
}
Modified: branches/4.1.x/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java 2010-08-12 09:48:01 UTC (rev 2203)
+++ branches/4.1.x/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java 2010-08-12 13:35:41 UTC (rev 2204)
@@ -264,7 +264,7 @@
}
private void setTx(InvocationContext ctx, EventImpl e) {
- if (ctx.isInTxScope()) {
+ if (ctx != null && ctx.isInTxScope()) {
GlobalTransaction tx = ((TxInvocationContext) ctx).getGlobalTransaction();
e.setTransactionId(tx);
}
More information about the infinispan-commits
mailing list