[infinispan-commits] Infinispan SVN: r2204 - in branches/4.1.x/core/src/main/java/org/infinispan: eviction and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Aug 12 09:35:42 EDT 2010


Author: manik.surtani at jboss.com
Date: 2010-08-12 09:35:41 -0400 (Thu, 12 Aug 2010)
New Revision: 2204

Added:
   branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManager.java
   branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManagerImpl.java
Modified:
   branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java
   branches/4.1.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
   branches/4.1.x/core/src/main/java/org/infinispan/interceptors/PassivationInterceptor.java
   branches/4.1.x/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java
Log:
[ISPN-582] (Eviction with passivation introduces a race where entries are invisible to callers)

Modified: branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java	2010-08-12 09:48:01 UTC (rev 2203)
+++ branches/4.1.x/core/src/main/java/org/infinispan/container/DefaultDataContainer.java	2010-08-12 13:35:41 UTC (rev 2204)
@@ -21,11 +21,19 @@
 import org.infinispan.container.entries.InternalEntryFactory;
 import org.infinispan.eviction.EvictionStrategy;
 import org.infinispan.eviction.EvictionThreadPolicy;
+import org.infinispan.eviction.PassivationManager;
 import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.interceptors.PassivationInterceptor;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
 import org.infinispan.util.Immutables;
 import org.infinispan.util.concurrent.BoundedConcurrentHashMap;
 import org.infinispan.util.concurrent.BoundedConcurrentHashMap.Eviction;
 import org.infinispan.util.concurrent.BoundedConcurrentHashMap.EvictionListener;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 
 /**
  * Simple data container that does not order entries for eviction, implemented using two ConcurrentHashMaps, one for
@@ -46,19 +54,20 @@
    final ConcurrentMap<Object, InternalCacheEntry> mortalEntries;
    final AtomicInteger numEntries = new AtomicInteger(0);
    final InternalEntryFactory entryFactory;
-   final DefaultEvictionListener evictionListener; 
+   final DefaultEvictionListener evictionListener;
    protected Cache<Object, Object> cache;
+   private PassivationManager passivator;
+   private CacheNotifier notifier;
 
-
    protected DefaultDataContainer(int concurrencyLevel) {
       immortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(128, 0.75f, concurrencyLevel);
       mortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(64, 0.75f, concurrencyLevel);
       entryFactory = new InternalEntryFactory();
       evictionListener = null;
    }
-   
+
    protected DefaultDataContainer(int concurrencyLevel, int maxEntries, EvictionStrategy strategy, EvictionThreadPolicy policy) {
-      
+
       // translate eviction policy and strategy
       switch (policy) {
          case DEFAULT:
@@ -70,16 +79,16 @@
          default:
             throw new IllegalArgumentException("No such eviction thread policy " + strategy);
       }
-      
+
       Eviction eviction;
       switch (strategy) {
          case FIFO:
-         case UNORDERED:   
+         case UNORDERED:
          case LRU:
-            eviction = Eviction.LRU;            
+            eviction = Eviction.LRU;
             break;
          case LIRS:
-            eviction = Eviction.LIRS;            
+            eviction = Eviction.LIRS;
             break;
          default:
             throw new IllegalArgumentException("No such eviction strategy " + strategy);
@@ -88,14 +97,16 @@
       mortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(64, 0.75f, concurrencyLevel);
       entryFactory = new InternalEntryFactory();
    }
-   
+
    @Inject
-   public void initialize(Cache<Object, Object> cache) {      
-      this.cache = cache;    
+   public void initialize(Cache<Object, Object> cache, PassivationManager passivator, CacheNotifier notifier) {
+      this.cache = cache;
+      this.passivator = passivator;
+      this.notifier = notifier;
    }
-   
+
    public static DataContainer boundedDataContainer(int concurrencyLevel, int maxEntries, EvictionStrategy strategy, EvictionThreadPolicy policy) {
-      return new DefaultDataContainer(concurrencyLevel, maxEntries, strategy,policy) {
+      return new DefaultDataContainer(concurrencyLevel, maxEntries, strategy, policy) {
 
          @Override
          public int size() {
@@ -108,11 +119,11 @@
          }
       };
    }
-   
+
    public static DataContainer unBoundedDataContainer(int concurrencyLevel) {
-      return new DefaultDataContainer(concurrencyLevel) ;
+      return new DefaultDataContainer(concurrencyLevel);
    }
-   
+
    @Override
    public Set<InternalCacheEntry> getEvictionCandidates() {
       return Collections.emptySet();
@@ -129,7 +140,7 @@
       if (e != null) {
          if (e.isExpired()) {
             if (mortalEntries.remove(k) != null) {
-               numEntries.getAndDecrement();               
+               numEntries.getAndDecrement();
             }
             e = null;
          } else {
@@ -189,7 +200,7 @@
       InternalCacheEntry ice = peek(k);
       if (ice != null && ice.isExpired()) {
          if (mortalEntries.remove(k) != null) {
-            numEntries.getAndDecrement();            
+            numEntries.getAndDecrement();
          }
          ice = null;
       }
@@ -239,15 +250,22 @@
    public Iterator<InternalCacheEntry> iterator() {
       return new EntryIterator(immortalEntries.values().iterator(), mortalEntries.values().iterator());
    }
-   
-   private class DefaultEvictionListener implements EvictionListener<Object, InternalCacheEntry>{
-      final List <InternalCacheEntry> evicted = Collections.synchronizedList(new LinkedList<InternalCacheEntry>());
 
+   private class DefaultEvictionListener implements EvictionListener<Object, InternalCacheEntry> {
+      final List<InternalCacheEntry> evicted = Collections.synchronizedList(new LinkedList<InternalCacheEntry>());
+      private final Log log = LogFactory.getLog(DefaultEvictionListener.class);
+
       @Override
       public void evicted(Object key, InternalCacheEntry value) {
-         evicted.add(value);
-      }   
-      
+         notifier.notifyCacheEntryEvicted(key, true, null);
+         try {
+            passivator.passivate(key, value, null);
+         } catch (CacheLoaderException e) {
+            log.warn("Unable to passivate entry under {0}", key, e);
+         }
+         notifier.notifyCacheEntryEvicted(key, false, null);
+      }
+
       public Set<InternalCacheEntry> getEvicted() {
          Set<InternalCacheEntry> result = Collections.emptySet();
          synchronized (evicted) {
@@ -260,14 +278,10 @@
          return result;
       }
    }
-   
-   private class PiggybackEvictionListener extends  DefaultEvictionListener{
-      
+
+   private class PiggybackEvictionListener extends DefaultEvictionListener {
+
       @Override
-      public void evicted(Object key, InternalCacheEntry value) {
-         cache.getAdvancedCache().evict(key);
-      }  
-      
       public Set<InternalCacheEntry> getEvicted() {
          return Collections.emptySet();
       }

Added: branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManager.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManager.java	                        (rev 0)
+++ branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManager.java	2010-08-12 13:35:41 UTC (rev 2204)
@@ -0,0 +1,27 @@
+package org.infinispan.eviction;
+
+import net.jcip.annotations.ThreadSafe;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.factories.scopes.Scope;
+import org.infinispan.factories.scopes.Scopes;
+import org.infinispan.loaders.CacheLoaderException;
+
+/**
+ * A passivation manager
+ *
+ * @author Manik Surtani
+ * @version 4.1
+ */
+ at ThreadSafe
+ at Scope(Scopes.NAMED_CACHE)
+public interface PassivationManager {
+   
+   boolean isEnabled();
+
+   void passivate(Object key, InternalCacheEntry entry, InvocationContext ctx) throws CacheLoaderException;
+
+   long getPassivationCount();
+
+   void resetPassivationCount();   
+}

Added: branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManagerImpl.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManagerImpl.java	                        (rev 0)
+++ branches/4.1.x/core/src/main/java/org/infinispan/eviction/PassivationManagerImpl.java	2010-08-12 13:35:41 UTC (rev 2204)
@@ -0,0 +1,74 @@
+package org.infinispan.eviction;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.config.ConfigurationException;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.factories.annotations.Start;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.loaders.CacheStore;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PassivationManagerImpl implements PassivationManager {
+
+   CacheLoaderManager cacheLoaderManager;
+   CacheNotifier notifier;
+   CacheStore cacheStore;
+   Configuration cfg;
+
+   boolean statsEnabled = false;
+   boolean enabled = false;
+   private static final Log log = LogFactory.getLog(PassivationManagerImpl.class);
+   private final AtomicLong passivations = new AtomicLong(0);
+
+   @Inject
+   public void inject(CacheLoaderManager cacheLoaderManager, CacheNotifier notifier, Configuration cfg) {
+      this.cacheLoaderManager = cacheLoaderManager;
+      this.notifier = notifier;
+      this.cfg = cfg;
+   }
+
+   @Start(priority = 11)
+   public void start() {
+      enabled = cfg.getCacheLoaderManagerConfig().isPassivation();
+      if (enabled) {
+         cacheStore = cacheLoaderManager == null ? null : cacheLoaderManager.getCacheStore();
+         if (cacheStore == null)
+            throw new ConfigurationException("passivation can only be used with a CacheLoader that implements CacheStore!");
+
+         enabled = cacheLoaderManager.isEnabled() && cacheLoaderManager.isUsingPassivation();
+         statsEnabled = cfg.isExposeJmxStatistics();
+      }
+   }
+
+   @Override
+   public boolean isEnabled() {
+      return enabled;
+   }
+
+   @Override
+   public void passivate(Object key, InternalCacheEntry entry, InvocationContext ctx) throws CacheLoaderException {
+      if (enabled) {
+         // notify listeners that this entry is about to be passivated
+         notifier.notifyCacheEntryPassivated(key, true, ctx);
+         log.trace("Passivating entry {0}", key);
+         cacheStore.store(entry);
+         notifier.notifyCacheEntryPassivated(key, false, ctx);
+         if (statsEnabled && entry != null) passivations.getAndIncrement();
+      }
+   }
+
+   public long getPassivationCount() {
+      return passivations.get();
+   }
+
+   public void resetPassivationCount() {
+      passivations.set(0L);
+   }
+}

Modified: branches/4.1.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java	2010-08-12 09:48:01 UTC (rev 2203)
+++ branches/4.1.x/core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java	2010-08-12 13:35:41 UTC (rev 2204)
@@ -27,6 +27,7 @@
 import org.infinispan.context.InvocationContextContainer;
 import org.infinispan.context.InvocationContextContainerImpl;
 import org.infinispan.eviction.EvictionManager;
+import org.infinispan.eviction.PassivationManager;
 import org.infinispan.factories.annotations.DefaultFactoryFor;
 import org.infinispan.loaders.CacheLoaderManager;
 import org.infinispan.marshall.StreamingMarshaller;
@@ -43,7 +44,7 @@
  * @since 4.0
  */
 @DefaultFactoryFor(classes = {CacheNotifier.class, EntryFactory.class, CommandsFactory.class,
-        CacheLoaderManager.class, InvocationContextContainer.class,
+        CacheLoaderManager.class, InvocationContextContainer.class, PassivationManager.class,
         BatchContainer.class, TransactionLog.class, EvictionManager.class, InvocationContextContainer.class})
 public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
    @Override

Modified: branches/4.1.x/core/src/main/java/org/infinispan/interceptors/PassivationInterceptor.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/interceptors/PassivationInterceptor.java	2010-08-12 09:48:01 UTC (rev 2203)
+++ branches/4.1.x/core/src/main/java/org/infinispan/interceptors/PassivationInterceptor.java	2010-08-12 13:35:41 UTC (rev 2204)
@@ -5,12 +5,14 @@
 import org.infinispan.container.DataContainer;
 import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.context.InvocationContext;
+import org.infinispan.eviction.PassivationManager;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.interceptors.base.JmxStatsCommandInterceptor;
 import org.infinispan.jmx.annotations.MBean;
 import org.infinispan.jmx.annotations.ManagedAttribute;
 import org.infinispan.jmx.annotations.ManagedOperation;
+import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.CacheLoaderManager;
 import org.infinispan.loaders.CacheStore;
 import org.infinispan.notifications.cachelistener.CacheNotifier;
@@ -27,50 +29,34 @@
  */
 @MBean(objectName = "Passivation", description = "Component that handles passivating entries to a CacheStore on eviction.")
 public class PassivationInterceptor extends JmxStatsCommandInterceptor {
-   private final AtomicLong passivations = new AtomicLong(0);
+   
 
-   CacheStore cacheStore;
-   CacheNotifier notifier;
-   CacheLoaderManager cacheLoaderManager;
+   PassivationManager passivator;
    DataContainer dataContainer;
 
    @Inject
-   public void setDependencies(CacheNotifier notifier, CacheLoaderManager cacheLoaderManager, DataContainer dataContainer) {
-      this.notifier = notifier;
-      this.cacheLoaderManager = cacheLoaderManager;
+   public void setDependencies(PassivationManager passivator, DataContainer dataContainer) {
+      this.passivator = passivator;
       this.dataContainer = dataContainer;
    }
 
-   @Start(priority = 15)
-   public void start() {
-      cacheStore = cacheLoaderManager == null ? null : cacheLoaderManager.getCacheStore();
-      if (cacheStore == null)
-         throw new ConfigurationException("passivation can only be used with a CacheLoader that implements CacheStore!");
-   }
-
    @Override
    public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws Throwable {
-      // notify listeners that this entry is about to be passivated
       Object key = command.getKey();
-      notifier.notifyCacheEntryPassivated(key, true, ctx);
-      log.trace("Passivating entry {0}", key);
-      InternalCacheEntry entryForStorage = dataContainer.get(key);
-      cacheStore.store(entryForStorage);
-      notifier.notifyCacheEntryPassivated(key, false, ctx);
-      if (getStatisticsEnabled() && entryForStorage != null) passivations.getAndIncrement();
+      passivator.passivate(key, dataContainer.get(key), ctx);
       return invokeNextInterceptor(ctx, command);
    }
 
    @ManagedOperation(description = "Resets statistics gathered by this component")
    @Operation(displayName = "Reset statistics")
    public void resetStatistics() {
-      passivations.set(0);
+      passivator.resetPassivationCount();
    }
 
    @ManagedAttribute(description = "Number of passivation events")
    @Metric(displayName = "Number of cache passivations", measurementType = MeasurementType.TRENDSUP)   
    public String getPassivations() {
       if (!getStatisticsEnabled()) return "N/A";
-      return String.valueOf(passivations.get());
+      return String.valueOf(passivator.getPassivationCount());
    }
 }

Modified: branches/4.1.x/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java	2010-08-12 09:48:01 UTC (rev 2203)
+++ branches/4.1.x/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java	2010-08-12 13:35:41 UTC (rev 2204)
@@ -264,7 +264,7 @@
    }
 
    private void setTx(InvocationContext ctx, EventImpl e) {
-      if (ctx.isInTxScope()) {
+      if (ctx != null && ctx.isInTxScope()) {
          GlobalTransaction tx = ((TxInvocationContext) ctx).getGlobalTransaction();
          e.setTransactionId(tx);
       }



More information about the infinispan-commits mailing list