[infinispan-commits] Infinispan SVN: r2249 - in trunk/core/src: main/java/org/infinispan/container and 5 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Aug 18 10:03:56 EDT 2010
Author: manik.surtani at jboss.com
Date: 2010-08-18 10:03:55 -0400 (Wed, 18 Aug 2010)
New Revision: 2249
Removed:
trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
trunk/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationConfigurationTest.java
Modified:
trunk/core/src/main/java/org/infinispan/config/ConfigurationValidatingVisitor.java
trunk/core/src/main/java/org/infinispan/container/DataContainer.java
trunk/core/src/main/java/org/infinispan/container/DefaultDataContainer.java
trunk/core/src/main/java/org/infinispan/context/InvocationContextContainer.java
trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
trunk/core/src/main/java/org/infinispan/eviction/EvictionManager.java
trunk/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java
trunk/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java
trunk/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java
trunk/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java
Log:
Merged in 2245 ~ 2248
Modified: trunk/core/src/main/java/org/infinispan/config/ConfigurationValidatingVisitor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/config/ConfigurationValidatingVisitor.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/main/java/org/infinispan/config/ConfigurationValidatingVisitor.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -48,20 +48,6 @@
}
@Override
- public void visitEvictionType(Configuration.EvictionType bean) {
- this.eviction = bean;
- if (this.eviction != null && this.clmc != null) checkEvictionPassivationSettings();
- super.visitEvictionType(bean);
- }
-
- @Override
- public void visitCacheLoaderManagerConfig(CacheLoaderManagerConfig bean) {
- this.clmc = bean;
- if (this.eviction != null && this.clmc != null) checkEvictionPassivationSettings();
- super.visitCacheLoaderManagerConfig(bean);
- }
-
- @Override
public void visitConfiguration(Configuration bean) {
checkEagerLockingAndDld(bean);
}
@@ -77,9 +63,4 @@
throw new ConfigurationException("Deadlock detection cannot be used with eager locking until ISPN-596 is fixed. See https://jira.jboss.org/browse/ISPN-596");
}
}
-
- private void checkEvictionPassivationSettings() {
- if (eviction != null && clmc != null && clmc.isPassivation() && eviction.strategy == EvictionStrategy.LIRS)
- throw new ConfigurationException("Eviction strategy LIRS cannot be used with passivation until ISPN-598 is fixed. See https://jira.jboss.org/browse/ISPN-598");
- }
}
Modified: trunk/core/src/main/java/org/infinispan/container/DataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/DataContainer.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/main/java/org/infinispan/container/DataContainer.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -127,13 +127,4 @@
* Purges entries that have passed their expiry time
*/
void purgeExpired();
-
-
- /**
- * Returns a set of eviction candidates. Containers not supporting eviction should return an
- * empty set.
- *
- * @return a set of entries that should be evicted from this container.
- */
- Set<InternalCacheEntry> getEvictionCandidates();
}
Modified: trunk/core/src/main/java/org/infinispan/container/DefaultDataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/DefaultDataContainer.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/main/java/org/infinispan/container/DefaultDataContainer.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -4,11 +4,7 @@
import java.util.AbstractSet;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -16,24 +12,16 @@
import net.jcip.annotations.ThreadSafe;
-import org.infinispan.Cache;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalEntryFactory;
+import org.infinispan.eviction.EvictionManager;
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.eviction.EvictionThreadPolicy;
-import org.infinispan.eviction.PassivationManager;
import org.infinispan.factories.annotations.Inject;
-import org.infinispan.factories.annotations.Start;
-import org.infinispan.interceptors.PassivationInterceptor;
-import org.infinispan.loaders.CacheLoaderException;
-import org.infinispan.loaders.CacheLoaderManager;
-import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.util.Immutables;
import org.infinispan.util.concurrent.BoundedConcurrentHashMap;
import org.infinispan.util.concurrent.BoundedConcurrentHashMap.Eviction;
import org.infinispan.util.concurrent.BoundedConcurrentHashMap.EvictionListener;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
/**
* Simple data container that does not order entries for eviction, implemented using two ConcurrentHashMaps, one for
@@ -54,10 +42,8 @@
final ConcurrentMap<Object, InternalCacheEntry> mortalEntries;
final AtomicInteger numEntries = new AtomicInteger(0);
final InternalEntryFactory entryFactory;
- final DefaultEvictionListener evictionListener;
- protected Cache<Object, Object> cache;
- private PassivationManager passivator;
- private CacheNotifier notifier;
+ final DefaultEvictionListener evictionListener;
+ private EvictionManager evictionManager;
protected DefaultDataContainer(int concurrencyLevel) {
immortalEntries = new ConcurrentHashMap<Object, InternalCacheEntry>(128, 0.75f, concurrencyLevel);
@@ -97,10 +83,8 @@
}
@Inject
- public void initialize(Cache<Object, Object> cache, PassivationManager passivator, CacheNotifier notifier) {
- this.cache = cache;
- this.passivator = passivator;
- this.notifier = notifier;
+ public void initialize(EvictionManager evictionManager) {
+ this.evictionManager = evictionManager;
}
public static DataContainer boundedDataContainer(int concurrencyLevel, int maxEntries, EvictionStrategy strategy, EvictionThreadPolicy policy) {
@@ -116,11 +100,6 @@
return new DefaultDataContainer(concurrencyLevel);
}
- @Override
- public Set<InternalCacheEntry> getEvictionCandidates() {
- return Collections.emptySet();
- }
-
public InternalCacheEntry peek(Object key) {
InternalCacheEntry e = immortalEntries.get(key);
if (e == null) e = mortalEntries.get(key);
@@ -243,19 +222,16 @@
return new EntryIterator(immortalEntries.values().iterator(), mortalEntries.values().iterator());
}
- private class DefaultEvictionListener implements EvictionListener<Object, InternalCacheEntry> {
- private final Log log = LogFactory.getLog(DefaultEvictionListener.class);
-
+ private class DefaultEvictionListener implements EvictionListener<Object, InternalCacheEntry> {
@Override
- public void evicted(Object key, InternalCacheEntry value) {
- notifier.notifyCacheEntryEvicted(key, true, null);
- try {
- passivator.passivate(key, value, null);
- } catch (CacheLoaderException e) {
- log.warn("Unable to passivate entry under {0}", key, e);
- }
- notifier.notifyCacheEntryEvicted(key, false, null);
+ public void preEvict(Object key) {
+ evictionManager.preEvict(key);
}
+
+ @Override
+ public void postEvict(Object key, InternalCacheEntry value) {
+ evictionManager.postEvict(key, value);
+ }
}
private class KeySet extends AbstractSet<Object> {
Modified: trunk/core/src/main/java/org/infinispan/context/InvocationContextContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/context/InvocationContextContainer.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/main/java/org/infinispan/context/InvocationContextContainer.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -8,9 +8,10 @@
import org.infinispan.factories.scopes.Scopes;
/**
- * Manages the association between an {@link org.infinispan.context.InvocationContext} and the calling thread. Also acts
- * as a factory for creating various types of {@link org.infinispan.context.InvocationContext}s.
- *
+ * Manages the association between an {@link org.infinispan.context.InvocationContext} and the
+ * calling thread. Also acts as a factory for creating various types of
+ * {@link org.infinispan.context.InvocationContext}s.
+ *
* @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
* @author Mircea.Markus at jboss.com
* @since 4.0
@@ -20,49 +21,55 @@
public interface InvocationContextContainer {
/**
- * If we are in a tx scope this will return an {@link org.infinispan.context.impl.TxInvocationContext}. Otherwise it
- * will return an {@link org.infinispan.context.impl.NonTxInvocationContext}. Either way, both context will be marked
- * as local, i.e. {@link InvocationContext#isOriginLocal()} will be true. The context is also associated with the
- * current thread, so further calls to {@link #getInvocationContext()} will return same instace.
+ * If we are in a tx scope this will return an
+ * {@link org.infinispan.context.impl.TxInvocationContext}. Otherwise it will return an
+ * {@link org.infinispan.context.impl.NonTxInvocationContext}. Either way, both context will be
+ * marked as local, i.e. {@link InvocationContext#isOriginLocal()} will be true. The context is
+ * also associated with the current thread, so further calls to {@link #getInvocationContext()}
+ * will return same instance.
*/
InvocationContext createInvocationContext();
/**
- * Will create an {@link org.infinispan.context.impl.NonTxInvocationContext} with the {@link org.infinispan.context.impl.NonTxInvocationContext#isOriginLocal()}
- * returning true.
+ * Will create an {@link org.infinispan.context.impl.NonTxInvocationContext} with the
+ * {@link org.infinispan.context.impl.NonTxInvocationContext#isOriginLocal()} returning true.
*/
NonTxInvocationContext createNonTxInvocationContext();
/**
- * Returns a {@link org.infinispan.context.impl.LocalTxInvocationContext}. The context is also associated with the
- * current thread, so further calls to {@link #getInvocationContext()} will return same instace.
+ * Returns a {@link org.infinispan.context.impl.LocalTxInvocationContext}. The context is also
+ * associated with the current thread, so further calls to {@link #getInvocationContext()} will
+ * return same instance.
*/
LocalTxInvocationContext createTxInvocationContext();
/**
- * Returns an {@link org.infinispan.context.impl.RemoteTxInvocationContext}. The context is also associated with the
- * current thread, so further calls to {@link #getInvocationContext()} will return same instace.
+ * Returns an {@link org.infinispan.context.impl.RemoteTxInvocationContext}. The context is also
+ * associated with the current thread, so further calls to {@link #getInvocationContext()} will
+ * return same instance.
*/
RemoteTxInvocationContext createRemoteTxInvocationContext();
/**
- * Returns an {@link org.infinispan.context.impl.NonTxInvocationContext} whose {@link
- * org.infinispan.context.impl.NonTxInvocationContext#isOriginLocal()} flag will be true. The context is also
- * associated with the current thread, so further calls to {@link #getInvocationContext()} will return same instace.
+ * Returns an {@link org.infinispan.context.impl.NonTxInvocationContext} whose
+ * {@link org.infinispan.context.impl.NonTxInvocationContext#isOriginLocal()} flag will be true.
+ * The context is also associated with the current thread, so further calls to
+ * {@link #getInvocationContext()} will return same instance.
*/
NonTxInvocationContext createRemoteInvocationContext();
/**
- * Returns the {@link InvocationContext} that is currently associated with the calling thread. Important:
- * implementations of this metrhod is most likely expensive (ThreadLocal.get), it is recommanded to cache the result
- * of this method rather than repeting the call.
- *
- * @throws IllegalStateException if there is no context associated with the current thread.
+ * Returns the {@link InvocationContext} that is currently associated with the calling thread.
+ * Important: implementations of this method is most likely expensive (ThreadLocal.get), it is
+ * recommended to cache the result of this method rather than repeating the call.
+ *
+ * @throws IllegalStateException
+ * if there is no context associated with the current thread.
*/
InvocationContext getInvocationContext();
/**
- * Dissasociates thread's invocation context and returns the existing value.
+ * Disassociates thread's invocation context and returns the existing value.
*/
InvocationContext suspend();
Modified: trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -5,6 +5,7 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -15,6 +16,7 @@
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.control.RehashControlCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.WriteCommand;
@@ -216,3 +218,98 @@
return log;
}
}
+
+abstract class StateMap<S> {
+ List<Address> leavers;
+ ConsistentHash oldCH, newCH;
+ int replCount;
+ Map<Address, S> state = new HashMap<Address, S>();
+ protected static final Log log = LogFactory.getLog(InvertedLeaveTask.class);
+
+ StateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ this.leavers = leavers;
+ this.oldCH = oldCH;
+ this.newCH = newCH;
+ this.replCount = replCount;
+ }
+
+ Map<Address, S> getState() {
+ return state;
+ }
+}
+
+/**
+ * A state map that aggregates {@link ReplicableCommand}s according to recipient affected.
+ *
+ * @param <T> type of replicable command to aggregate
+ */
+abstract class CommandAggregatingStateMap<T extends ReplicableCommand> extends StateMap<List<T>> {
+ Set<Object> keysHandled = new HashSet<Object>();
+
+ CommandAggregatingStateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ super(leavers, oldCH, newCH, replCount);
+ }
+
+ // if only Java had duck-typing!
+ abstract Set<Object> getAffectedKeys(T payload);
+
+ /**
+ * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
+ * owner list
+ *
+ * @param payload payload to consider when adding to the aggregate state
+ */
+ void addState(T payload) {
+ for (Object key : getAffectedKeys(payload)) {
+ for (Address leaver : leavers) {
+ List<Address> owners = oldCH.locate(key, replCount);
+ int leaverIndex = owners.indexOf(leaver);
+ if (leaverIndex > -1) {
+ // add to state map!
+ List<Address> newOwners = newCH.locate(key, replCount);
+ newOwners.removeAll(owners);
+ if (!newOwners.isEmpty()) {
+ for (Address no : newOwners) {
+ List<T> s = state.get(no);
+ if (s == null) {
+ s = new LinkedList<T>();
+ state.put(no, s);
+ }
+ s.add(payload);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush pending prepares
+ * to nodes during a leave.
+ */
+class PendingPreparesMap extends CommandAggregatingStateMap<PrepareCommand> {
+ PendingPreparesMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ super(leavers, oldCH, newCH, replCount);
+ }
+
+ @Override
+ Set<Object> getAffectedKeys(PrepareCommand payload) {
+ return payload.getAffectedKeys();
+ }
+}
+
+/**
+ * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush writes
+ * made while state was being transferred to nodes during a leave.
+ */
+class TransactionLogMap extends CommandAggregatingStateMap<WriteCommand> {
+ TransactionLogMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ super(leavers, oldCH, newCH, replCount);
+ }
+
+ @Override
+ Set<Object> getAffectedKeys(WriteCommand payload) {
+ return payload.getAffectedKeys();
+ }
+}
Deleted: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -1,369 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.commands.CommandsFactory;
-import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.commands.control.RehashControlCommand;
-import org.infinispan.commands.tx.PrepareCommand;
-import org.infinispan.commands.write.WriteCommand;
-import org.infinispan.config.Configuration;
-import org.infinispan.container.DataContainer;
-import org.infinispan.container.entries.InternalCacheEntry;
-import org.infinispan.container.entries.InternalCacheValue;
-import org.infinispan.loaders.CacheLoader;
-import org.infinispan.loaders.CacheStore;
-import org.infinispan.remoting.rpc.RpcManager;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
-import org.infinispan.util.Util;
-import org.infinispan.util.concurrent.NotifyingFutureImpl;
-import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-/**
- * A task to handle rehashing for when a node leaves the cluster
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public class LeaveTask extends RehashTask {
- private static final Log log = LogFactory.getLog(LeaveTask.class);
- private static final boolean trace = log.isTraceEnabled();
- private final List<Address> leavers;
- private final Address self;
- private final List<Address> leaversHandled;
-
-
- protected LeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager,
- Configuration configuration, List<Address> leavers, CommandsFactory cf,
- DataContainer dataContainer) {
- super(dmi, rpcManager, configuration, cf, dataContainer);
- this.leavers = leavers;
- this.leaversHandled = new LinkedList<Address>(leavers);
- this.self = rpcManager.getTransport().getAddress();
- }
-
- protected void performRehash() throws Exception {
- long start = System.currentTimeMillis();
- if (log.isDebugEnabled()) log.debug("Commencing. Leavers' list is {0}", leavers);
- boolean completedSuccessfully = false;
- List<Address> leaversHandled = new LinkedList<Address>(leavers);
- ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled);
- int replCount = configuration.getNumOwners();
-
- try {
- InMemoryStateMap statemap = new InMemoryStateMap(leaversHandled, self, oldCH, dmi.getConsistentHash(), replCount);
- if (log.isTraceEnabled()) log.trace("Examining state in data container");
- // need to constantly detect whether we are interrupted. If so, abort accordingly.
- for (InternalCacheEntry ice : dataContainer) {
- List<Address> oldOwners = oldCH.locate(ice.getKey(), replCount);
- for (Address a : oldOwners) if (leaversHandled.contains(a)) statemap.addState(ice);
- }
-
- CacheStore cs = dmi.getCacheStoreForRehashing();
- if (cs != null) {
- if (log.isTraceEnabled()) log.trace("Examining state in cache store");
- for (Object key: cs.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(dataContainer))) statemap.addState(key, cs);
- }
-
- // push state.
- Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
- for (Map.Entry<Address, Map<Object, InternalCacheValue>> entry : statemap.getState().entrySet()) {
- if (log.isDebugEnabled()) log.debug("Pushing {0} entries to {1}", entry.getValue().size(), entry.getKey());
- RehashControlCommand push = cf.buildRehashControlCommand(self, entry.getValue());
- NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
- pushFutures.add(f);
- rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, configuration.getRehashRpcTimeout());
- }
-
- for (Future f : pushFutures) f.get();
-
- processAndDrainTxLog(oldCH, dmi.getConsistentHash(), replCount);
-
- completedSuccessfully = true;
- invalidateInvalidHolders(oldCH, dmi.getConsistentHash());
- if (log.isInfoEnabled())
- log.info("Completed in {0}!", Util.prettyPrintTime(System.currentTimeMillis() - start));
- } catch (InterruptedException ie) {
- if (log.isInfoEnabled())
- log.info("Interrupted after {0}! Completed successfully? {1}", Util.prettyPrintTime(System.currentTimeMillis() - start), completedSuccessfully);
- } catch (Exception e) {
- log.error("Caught exception! Completed successfully? {0}", e, completedSuccessfully);
- }
- finally {
- if (completedSuccessfully) leavers.removeAll(leaversHandled);
- }
- }
-
- @Override
- protected Log getLog() {
- return log;
- }
-
- private void processAndDrainTxLog(ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- if (trace) log.trace("Processing transaction log iteratively");
-
- List<WriteCommand> c;
- int i = 0;
- TransactionLogger transactionLogger = dmi.getTransactionLogger();
- while (transactionLogger.shouldDrainWithoutLock()) {
- if (trace) log.trace("Processing transaction log, iteration {0}", i++);
- c = transactionLogger.drain();
- if (trace) log.trace("Found {0} modifications", c.size());
- apply(oldCH, newCH, replCount, c);
- }
-
- if (trace) log.trace("Processing transaction log: final drain and lock");
- c = transactionLogger.drainAndLock();
- if (trace) log.trace("Found {0} modifications", c.size());
- apply(oldCH, newCH, replCount, c);
-
- if (trace) log.trace("Handling pending prepares");
- PendingPreparesMap state = new PendingPreparesMap(leavers, oldCH, newCH, replCount);
- Collection<PrepareCommand> pendingPrepares = transactionLogger.getPendingPrepares();
- if (trace) log.trace("Found {0} pending prepares", pendingPrepares.size());
- for (PrepareCommand pc : pendingPrepares) state.addState(pc);
-
- if (trace) log.trace("State map for pending prepares is {0}", state.getState());
-
- Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
- for (Map.Entry<Address, List<PrepareCommand>> e : state.getState().entrySet()) {
- if (log.isDebugEnabled())
- log.debug("Pushing {0} uncommitted prepares to {1}", e.getValue().size(), e.getKey());
- RehashControlCommand push = cf.buildRehashControlCommandTxLogPendingPrepares(self, e.getValue());
- NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
- pushFutures.add(f);
- rpcManager.invokeRemotelyInFuture(Collections.singleton(e.getKey()), push, true, f, configuration.getRehashRpcTimeout());
- }
-
- for (Future f : pushFutures) {
- try {
- f.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- log.error("Error pushing tx log", e);
- }
- }
- if (trace) log.trace("Finished pushing pending prepares; unlocking and disabling transaction logging");
-
- transactionLogger.unlockAndDisable();
- }
-
- private void apply(ConsistentHash oldCH, ConsistentHash newCH, int replCount, List<WriteCommand> wc) {
- // need to create another "state map"
- TransactionLogMap state = new TransactionLogMap(leavers, oldCH, newCH, replCount);
- for (WriteCommand c : wc) state.addState(c);
-
- if (trace) log.trace("State map for modifications is {0}", state.getState());
-
- Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
- for (Map.Entry<Address, List<WriteCommand>> entry : state.getState().entrySet()) {
- if (log.isDebugEnabled())
- log.debug("Pushing {0} modifications to {1}", entry.getValue().size(), entry.getKey());
- RehashControlCommand push = cf.buildRehashControlCommandTxLog(self, entry.getValue());
- NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
- pushFutures.add(f);
- rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, configuration.getRehashRpcTimeout());
- }
-
- for (Future f : pushFutures) {
- try {
- f.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- log.error("Error pushing tx log", e);
- }
- }
- }
-
- @Override
- protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
- Collection<Address> l = super.getInvalidHolders(key, chOld, chNew);
- l.removeAll(leaversHandled);
- return l;
- }
-}
-
-abstract class StateMap<S> {
- List<Address> leavers;
- ConsistentHash oldCH, newCH;
- int replCount;
- Map<Address, S> state = new HashMap<Address, S>();
- protected static final Log log = LogFactory.getLog(LeaveTask.class);
-
- StateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- this.leavers = leavers;
- this.oldCH = oldCH;
- this.newCH = newCH;
- this.replCount = replCount;
- }
-
- Map<Address, S> getState() {
- return state;
- }
-}
-
-class InMemoryStateMap extends StateMap<Map<Object, InternalCacheValue>> {
- Address self;
- Set<Object> keysHandled = new HashSet<Object>();
-
- InMemoryStateMap(List<Address> leavers, Address self, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- super(leavers, oldCH, newCH, replCount);
- this.self = self;
- }
-
- /**
- * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
- * owner list
- *
- * @param payload an InternalCacheEntry to add to the state map
- */
- void addState(InternalCacheEntry payload) {
- addState(payload, null, null);
- }
-
- /**
- * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
- * owner list, retrieving the value from a cache loader
- */
- void addState(Object key, CacheLoader loader) {
- addState(null, key, loader);
- }
-
-
- private void addState(InternalCacheEntry ice, Object k, CacheLoader loader) {
- Object key = ice == null ? k : ice.getKey();
- if (keysHandled.contains(key)) return;
-
- InternalCacheValue icv = null;
-
- for (Address leaver : leavers) {
- List<Address> owners = oldCH.locate(key, replCount);
- int leaverIndex = owners.indexOf(leaver);
- if (leaverIndex > -1) {
- int numOwners = owners.size();
- int selfIndex = owners.indexOf(self);
- boolean isLeaverLast = leaverIndex == numOwners - 1;
- if ((isLeaverLast && selfIndex == numOwners - 2) ||
- (!isLeaverLast && selfIndex == leaverIndex + 1)) {
- // add to state map!
- List<Address> newOwners = newCH.locate(key, replCount);
- newOwners.removeAll(owners);
- if (!newOwners.isEmpty()) {
- for (Address no : newOwners) {
- Map<Object, InternalCacheValue> s = state.get(no);
- if (s == null) {
- s = new HashMap<Object, InternalCacheValue>();
- state.put(no, s);
- }
-
- if (icv == null) {
- if (ice == null) {
- try {
- InternalCacheEntry payload = loader.load(key);
- if (payload != null) icv = payload.toInternalCacheValue();
- } catch (Exception e) {
- log.warn("Unable to load " + key + " from cache loader", e);
- }
- } else {
- icv = ice.toInternalCacheValue();
- }
- }
- s.put(key, icv);
- }
- }
- }
- }
- }
- keysHandled.add(key);
- }
-}
-
-/**
- * A state map that aggregates {@link ReplicableCommand}s according to recipient affected.
- *
- * @param <T> type of replicable command to aggregate
- */
-abstract class CommandAggregatingStateMap<T extends ReplicableCommand> extends StateMap<List<T>> {
- Set<Object> keysHandled = new HashSet<Object>();
-
- CommandAggregatingStateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- super(leavers, oldCH, newCH, replCount);
- }
-
- // if only Java had duck-typing!
- abstract Set<Object> getAffectedKeys(T payload);
-
- /**
- * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
- * owner list
- *
- * @param payload payload to consider when adding to the aggregate state
- */
- void addState(T payload) {
- for (Object key : getAffectedKeys(payload)) {
- for (Address leaver : leavers) {
- List<Address> owners = oldCH.locate(key, replCount);
- int leaverIndex = owners.indexOf(leaver);
- if (leaverIndex > -1) {
- // add to state map!
- List<Address> newOwners = newCH.locate(key, replCount);
- newOwners.removeAll(owners);
- if (!newOwners.isEmpty()) {
- for (Address no : newOwners) {
- List<T> s = state.get(no);
- if (s == null) {
- s = new LinkedList<T>();
- state.put(no, s);
- }
- s.add(payload);
- }
- }
- }
- }
- }
- }
-}
-
-/**
- * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush pending prepares
- * to nodes during a leave.
- */
-class PendingPreparesMap extends CommandAggregatingStateMap<PrepareCommand> {
- PendingPreparesMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- super(leavers, oldCH, newCH, replCount);
- }
-
- @Override
- Set<Object> getAffectedKeys(PrepareCommand payload) {
- return payload.getAffectedKeys();
- }
-}
-
-/**
- * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush writes
- * made while state was being transferred to nodes during a leave.
- */
-class TransactionLogMap extends CommandAggregatingStateMap<WriteCommand> {
- TransactionLogMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- super(leavers, oldCH, newCH, replCount);
- }
-
- @Override
- Set<Object> getAffectedKeys(WriteCommand payload) {
- return payload.getAffectedKeys();
- }
-}
\ No newline at end of file
Modified: trunk/core/src/main/java/org/infinispan/eviction/EvictionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/eviction/EvictionManager.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/main/java/org/infinispan/eviction/EvictionManager.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -1,6 +1,8 @@
package org.infinispan.eviction;
import net.jcip.annotations.ThreadSafe;
+
+import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
@@ -31,4 +33,8 @@
* @return true if eviction is enabled, false otherwise
*/
boolean isEnabled();
+
+ void preEvict(Object key);
+
+ void postEvict(Object key, InternalCacheEntry value);
}
Modified: trunk/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/main/java/org/infinispan/eviction/EvictionManagerImpl.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -1,30 +1,34 @@
package org.infinispan.eviction;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
import net.jcip.annotations.ThreadSafe;
-import org.infinispan.AdvancedCache;
-import org.infinispan.Cache;
+
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.InvocationContextContainer;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
+import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.loaders.CacheStore;
+import org.infinispan.marshall.MarshalledValue;
+import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.infinispan.context.Flag.FAIL_SILENTLY;
-
@ThreadSafe
public class EvictionManagerImpl implements EvictionManager {
private static final Log log = LogFactory.getLog(EvictionManagerImpl.class);
@@ -32,24 +36,33 @@
ScheduledFuture <?> evictionTask;
// components to be injected
- ScheduledExecutorService executor;
- Configuration configuration;
- Cache<Object, Object> cache;
- CacheLoaderManager cacheLoaderManager;
- DataContainer dataContainer;
- CacheStore cacheStore;
- boolean enabled;
- int maxEntries;
+ private ScheduledExecutorService executor;
+ private Configuration configuration;
+ private CacheLoaderManager cacheLoaderManager;
+ private DataContainer dataContainer;
+ private CacheStore cacheStore;
+ private CacheNotifier cacheNotifier;
+ private LockManager lockManager;
+ private PassivationManager passivator;
+ private InvocationContextContainer ctxContainer;
+
+
+ private boolean enabled;
volatile CountDownLatch startLatch = new CountDownLatch(1);
@Inject
public void initialize(@ComponentName(KnownComponentNames.EVICTION_SCHEDULED_EXECUTOR) ScheduledExecutorService executor,
- Configuration configuration, Cache<Object, Object> cache, DataContainer dataContainer, CacheLoaderManager cacheLoaderManager) {
+ Configuration configuration, DataContainer dataContainer,
+ CacheLoaderManager cacheLoaderManager, CacheNotifier cacheNotifier,
+ LockManager lockManager, PassivationManager passivator, InvocationContextContainer ctxContainer) {
this.executor = executor;
this.configuration = configuration;
- this.cache = cache;
this.dataContainer = dataContainer;
this.cacheLoaderManager = cacheLoaderManager;
+ this.cacheNotifier = cacheNotifier;
+ this.lockManager = lockManager;
+ this.passivator = passivator;
+ this.ctxContainer = ctxContainer;
}
@Start(priority = 55)
@@ -57,8 +70,7 @@
public void start() {
// first check if eviction is enabled!
enabled = configuration.getEvictionStrategy() != EvictionStrategy.NONE;
- if (enabled) {
- maxEntries = configuration.getEvictionMaxEntries();
+ if (enabled) {
if (cacheLoaderManager != null && cacheLoaderManager.isEnabled())
cacheStore = cacheLoaderManager.getCacheStore();
// Set up the eviction timer task
@@ -106,31 +118,7 @@
} catch (Exception e) {
log.warn("Caught exception purging cache store!", e);
}
- }
-
- // finally iterate through data container if too big
- Set<InternalCacheEntry> evictionCandidates = dataContainer.getEvictionCandidates();
- if(!evictionCandidates.isEmpty()) {
- AdvancedCache<Object, Object> ac = cache.getAdvancedCache();
- if (trace) {
- log.trace("Evicting data container entries");
- start = System.currentTimeMillis();
- }
- for (InternalCacheEntry entry : evictionCandidates) {
- Object k = entry.getKey();
- try {
- if (trace) log.trace("Attempting to evict key [{0}]", k);
- ac.withFlags(FAIL_SILENTLY).evict(k);
- }
- catch (Exception e) {
- log.warn("Caught exception when iterating through data container. Current entry is under key [{0}]", e, k);
- }
- }
- if (trace)
- log.trace("Eviction process completed in {0}", Util.prettyPrintTime(System.currentTimeMillis() - start));
- } else {
- if (trace) log.trace("Data container is smaller than or equal to the maxEntries; not doing anything");
- }
+ }
}
public boolean isEnabled() {
@@ -148,4 +136,83 @@
processEviction();
}
}
+
+ @Override
+ public void preEvict(Object key) {
+ try {
+ acquireLock(getInvocationContext(), key);
+ } catch (Exception e) {
+ log.warn("Could not acquire lock for eviction of {0}", key, e);
+ }
+ cacheNotifier.notifyCacheEntryEvicted(key, true, null);
+ }
+
+ @Override
+ public void postEvict(Object key, InternalCacheEntry value) {
+ try {
+ passivator.passivate(key, value, null);
+ } catch (CacheLoaderException e) {
+ log.warn("Unable to passivate entry under {0}", key, e);
+ }
+ cacheNotifier.notifyCacheEntryEvicted(key, false, null);
+ releaseLock(key);
+ }
+
+ private InvocationContext getInvocationContext(){
+ return ctxContainer.getInvocationContext();
+ }
+
+ /**
+ * Attempts to lock an entry if the lock isn't already held in the current scope, and records the lock in the
+ * context.
+ *
+ * @param ctx context
+ * @param key Key to lock
+ * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was
+ * already held)
+ * @throws InterruptedException if interrupted
+ * @throws org.infinispan.util.concurrent.TimeoutException
+ * if we are unable to acquire the lock after a specified timeout.
+ */
+ private boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException {
+ // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
+ // lock which may be shared with another key that we have a lock for already.
+ // nothing wrong, just means that we fail to record the lock. And that is a problem.
+ // Better to check our records and lock again if necessary.
+
+ boolean shouldSkipLocking = ctx.hasFlag(Flag.SKIP_LOCKING);
+
+ if (!ctx.hasLockedKey(key) && !shouldSkipLocking) {
+ if (lockManager.lockAndRecord(key, ctx)) {
+ return true;
+ } else {
+ Object owner = lockManager.getOwner(key);
+ // if lock cannot be acquired, expose the key itself, not the marshalled value
+ if (key instanceof MarshalledValue) {
+ key = ((MarshalledValue) key).get();
+ }
+ throw new TimeoutException("Unable to acquire lock after [" + Util.prettyPrintTime(getLockAcquisitionTimeout(ctx)) + "] on key [" + key + "] for requestor [" +
+ ctx.getLockOwner() + "]! Lock held by [" + owner + "]");
+ }
+ } else {
+ if (trace) {
+ if (shouldSkipLocking)
+ log.trace("SKIP_LOCKING flag used!");
+ else
+ log.trace("Already own lock for entry");
+ }
+ }
+
+ return false;
+ }
+
+ public final void releaseLock(Object key) {
+ lockManager.unlock(key);
+ }
+
+ private long getLockAcquisitionTimeout(InvocationContext ctx) {
+ return ctx.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT) ?
+ 0 : configuration.getLockAcquisitionTimeout();
+ }
+
}
Modified: trunk/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/BoundedConcurrentHashMap.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -274,13 +274,18 @@
}
public interface EvictionListener<K, V> {
- void evicted(K key, V value);
+ void preEvict(K key);
+ void postEvict(K key, V value);
}
- static class NullEvictionListener<K,V> implements EvictionListener<K, V>{
- @Override
- public void evicted(K key, V value) {
- }
+ static class NullEvictionListener<K, V> implements EvictionListener<K, V> {
+ @Override
+ public void postEvict(K key, V value) {
+ }
+
+ @Override
+ public void preEvict(K key) {
+ }
}
public interface EvictionPolicy<K, V> {
@@ -421,12 +426,13 @@
}
while (isOverflow()) {
HashEntry<K, V> first = lruQueue.getLast();
+ segment.getEvictionListener().preEvict(first.key);
segment.remove(first.key, first.hash, null);
evicted.add(first);
- }
+ }
} finally {
- accessQueue.clear();
- }
+ accessQueue.clear();
+ }
return evicted;
}
@@ -595,8 +601,9 @@
return evicted;
}
- private void removeFromSegment(Set<HashEntry<K, V>> evicted) {
+ private void removeFromSegment(Set<HashEntry<K, V>> evicted) {
for (HashEntry<K, V> e : evicted) {
+ segment.getEvictionListener().preEvict(e.key);
segment.remove(e.key, e.hash, null);
}
}
@@ -756,6 +763,10 @@
static final <K,V> Segment<K,V>[] newArray(int i) {
return new Segment[i];
}
+
+ EvictionListener<K, V> getEvictionListener() {
+ return evictionListener;
+ }
/**
* Sets table to new HashEntry array.
@@ -816,8 +827,8 @@
Set<HashEntry<K, V>> evicted = attemptEviction(false);
// piggyback listener invocation on callers thread outside lock
if (evicted != null) {
- for (HashEntry<K, V> he : evicted) {
- evictionListener.evicted(he.key, he.value);
+ for (HashEntry<K, V> he : evicted) {
+ evictionListener.postEvict(he.key, he.value);
}
}
}
@@ -895,8 +906,8 @@
unlock();
// piggyback listener invocation on callers thread outside lock
if (evicted != null) {
- for (HashEntry<K, V> he : evicted) {
- evictionListener.evicted(he.key, he.value);
+ for (HashEntry<K, V> he : evicted) {
+ evictionListener.postEvict(he.key, he.value);
}
}
}
@@ -923,8 +934,8 @@
unlock();
// piggyback listener invocation on callers thread outside lock
if(evicted != null) {
- for (HashEntry<K, V> he : evicted) {
- evictionListener.evicted(he.key, he.value);
+ for (HashEntry<K, V> he : evicted) {
+ evictionListener.postEvict(he.key, he.value);
}
}
}
@@ -982,8 +993,8 @@
unlock();
// piggyback listener invocation on callers thread outside lock
if(evicted != null) {
- for (HashEntry<K, V> he : evicted) {
- evictionListener.evicted(he.key, he.value);
+ for (HashEntry<K, V> he : evicted) {
+ evictionListener.postEvict(he.key, he.value);
}
}
}
Modified: trunk/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/test/java/org/infinispan/eviction/EvictionManagerTest.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -33,7 +33,7 @@
cfg.setEvictionWakeUpInterval(0);
ScheduledExecutorService mockService = createMock(ScheduledExecutorService.class);
- em.initialize(mockService, cfg, null, null, null);
+ em.initialize(mockService, cfg, null, null, null,null,null,null);
replay(mockService);
em.start();
@@ -47,7 +47,7 @@
cfg.setEvictionWakeUpInterval(789);
ScheduledExecutorService mockService = createMock(ScheduledExecutorService.class);
- em.initialize(mockService, cfg, null, null, null);
+ em.initialize(mockService, cfg, null, null, null,null,null,null);
ScheduledFuture mockFuture = createNiceMock(ScheduledFuture.class);
expect(mockService.scheduleWithFixedDelay(isA(EvictionManagerImpl.ScheduledTask.class), eq((long) 789),
Deleted: trunk/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationConfigurationTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationConfigurationTest.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationConfigurationTest.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -1,24 +0,0 @@
-package org.infinispan.eviction;
-
-import org.infinispan.config.CacheLoaderManagerConfig;
-import org.infinispan.config.Configuration;
-import org.infinispan.config.ConfigurationException;
-import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
-import org.infinispan.manager.DefaultCacheManager;
-import org.infinispan.test.AbstractInfinispanTest;
-import org.testng.annotations.Test;
-
- at Test(groups = "unit", testName = "eviction.EvictionWithPassivationConfigurationTest")
-public class EvictionWithPassivationConfigurationTest extends AbstractInfinispanTest {
-
- @Test (expectedExceptions = ConfigurationException.class)
- public void testConfig() {
- Configuration c = new Configuration();
- c.setEvictionStrategy(EvictionStrategy.LIRS);
- CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
- clmc.setPassivation(true);
- clmc.addCacheLoaderConfig(new DummyInMemoryCacheStore.Cfg());
- c.setCacheLoaderManagerConfig(clmc);
- new DefaultCacheManager(c);
- }
-}
Modified: trunk/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java 2010-08-18 13:57:06 UTC (rev 2248)
+++ trunk/core/src/test/java/org/infinispan/eviction/EvictionWithPassivationTest.java 2010-08-18 14:03:55 UTC (rev 2249)
@@ -42,7 +42,7 @@
runTest(EvictionThreadPolicy.PIGGYBACK, EvictionStrategy.LRU);
}
- @Test (enabled = false, description = "See ISPN-598")
+
public void testPiggybackLIRS() {
runTest(EvictionThreadPolicy.PIGGYBACK, EvictionStrategy.LIRS);
}
@@ -59,7 +59,7 @@
runTest(EvictionThreadPolicy.DEFAULT, EvictionStrategy.LRU);
}
- @Test (enabled = false, description = "See ISPN-598")
+
public void testDefaultLIRS() {
runTest(EvictionThreadPolicy.DEFAULT, EvictionStrategy.LIRS);
}
More information about the infinispan-commits
mailing list