[infinispan-commits] Infinispan SVN: r2247 - branches/4.1.x/core/src/main/java/org/infinispan/distribution.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Aug 18 09:08:22 EDT 2010
Author: vblagojevic at jboss.com
Date: 2010-08-18 09:08:21 -0400 (Wed, 18 Aug 2010)
New Revision: 2247
Removed:
branches/4.1.x/core/src/main/java/org/infinispan/distribution/LeaveTask.java
Modified:
branches/4.1.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
Log:
remove deprecated LeaveTask
Modified: branches/4.1.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java 2010-08-18 12:46:24 UTC (rev 2246)
+++ branches/4.1.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java 2010-08-18 13:08:21 UTC (rev 2247)
@@ -5,6 +5,7 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -15,6 +16,7 @@
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.control.RehashControlCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.WriteCommand;
@@ -216,3 +218,98 @@
return log;
}
}
+
+abstract class StateMap<S> {
+ List<Address> leavers;
+ ConsistentHash oldCH, newCH;
+ int replCount;
+ Map<Address, S> state = new HashMap<Address, S>();
+ protected static final Log log = LogFactory.getLog(InvertedLeaveTask.class);
+
+ StateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ this.leavers = leavers;
+ this.oldCH = oldCH;
+ this.newCH = newCH;
+ this.replCount = replCount;
+ }
+
+ Map<Address, S> getState() {
+ return state;
+ }
+}
+
+/**
+ * A state map that aggregates {@link ReplicableCommand}s according to recipient affected.
+ *
+ * @param <T> type of replicable command to aggregate
+ */
+abstract class CommandAggregatingStateMap<T extends ReplicableCommand> extends StateMap<List<T>> {
+ Set<Object> keysHandled = new HashSet<Object>();
+
+ CommandAggregatingStateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ super(leavers, oldCH, newCH, replCount);
+ }
+
+ // if only Java had duck-typing!
+ abstract Set<Object> getAffectedKeys(T payload);
+
+ /**
+ * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
+ * owner list
+ *
+ * @param payload payload to consider when adding to the aggregate state
+ */
+ void addState(T payload) {
+ for (Object key : getAffectedKeys(payload)) {
+ for (Address leaver : leavers) {
+ List<Address> owners = oldCH.locate(key, replCount);
+ int leaverIndex = owners.indexOf(leaver);
+ if (leaverIndex > -1) {
+ // add to state map!
+ List<Address> newOwners = newCH.locate(key, replCount);
+ newOwners.removeAll(owners);
+ if (!newOwners.isEmpty()) {
+ for (Address no : newOwners) {
+ List<T> s = state.get(no);
+ if (s == null) {
+ s = new LinkedList<T>();
+ state.put(no, s);
+ }
+ s.add(payload);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+/**
+ * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush pending prepares
+ * to nodes during a leave.
+ */
+class PendingPreparesMap extends CommandAggregatingStateMap<PrepareCommand> {
+ PendingPreparesMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ super(leavers, oldCH, newCH, replCount);
+ }
+
+ @Override
+ Set<Object> getAffectedKeys(PrepareCommand payload) {
+ return payload.getAffectedKeys();
+ }
+}
+
+/**
+ * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush writes
+ * made while state was being transferred to nodes during a leave.
+ */
+class TransactionLogMap extends CommandAggregatingStateMap<WriteCommand> {
+ TransactionLogMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ super(leavers, oldCH, newCH, replCount);
+ }
+
+ @Override
+ Set<Object> getAffectedKeys(WriteCommand payload) {
+ return payload.getAffectedKeys();
+ }
+}
Deleted: branches/4.1.x/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2010-08-18 12:46:24 UTC (rev 2246)
+++ branches/4.1.x/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2010-08-18 13:08:21 UTC (rev 2247)
@@ -1,369 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.commands.CommandsFactory;
-import org.infinispan.commands.ReplicableCommand;
-import org.infinispan.commands.control.RehashControlCommand;
-import org.infinispan.commands.tx.PrepareCommand;
-import org.infinispan.commands.write.WriteCommand;
-import org.infinispan.config.Configuration;
-import org.infinispan.container.DataContainer;
-import org.infinispan.container.entries.InternalCacheEntry;
-import org.infinispan.container.entries.InternalCacheValue;
-import org.infinispan.loaders.CacheLoader;
-import org.infinispan.loaders.CacheStore;
-import org.infinispan.remoting.rpc.RpcManager;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
-import org.infinispan.util.Util;
-import org.infinispan.util.concurrent.NotifyingFutureImpl;
-import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-/**
- * A task to handle rehashing for when a node leaves the cluster
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public class LeaveTask extends RehashTask {
- private static final Log log = LogFactory.getLog(LeaveTask.class);
- private static final boolean trace = log.isTraceEnabled();
- private final List<Address> leavers;
- private final Address self;
- private final List<Address> leaversHandled;
-
-
- protected LeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager,
- Configuration configuration, List<Address> leavers, CommandsFactory cf,
- DataContainer dataContainer) {
- super(dmi, rpcManager, configuration, cf, dataContainer);
- this.leavers = leavers;
- this.leaversHandled = new LinkedList<Address>(leavers);
- this.self = rpcManager.getTransport().getAddress();
- }
-
- protected void performRehash() throws Exception {
- long start = System.currentTimeMillis();
- if (log.isDebugEnabled()) log.debug("Commencing. Leavers' list is {0}", leavers);
- boolean completedSuccessfully = false;
- List<Address> leaversHandled = new LinkedList<Address>(leavers);
- ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled);
- int replCount = configuration.getNumOwners();
-
- try {
- InMemoryStateMap statemap = new InMemoryStateMap(leaversHandled, self, oldCH, dmi.getConsistentHash(), replCount);
- if (log.isTraceEnabled()) log.trace("Examining state in data container");
- // need to constantly detect whether we are interrupted. If so, abort accordingly.
- for (InternalCacheEntry ice : dataContainer) {
- List<Address> oldOwners = oldCH.locate(ice.getKey(), replCount);
- for (Address a : oldOwners) if (leaversHandled.contains(a)) statemap.addState(ice);
- }
-
- CacheStore cs = dmi.getCacheStoreForRehashing();
- if (cs != null) {
- if (log.isTraceEnabled()) log.trace("Examining state in cache store");
- for (Object key: cs.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(dataContainer))) statemap.addState(key, cs);
- }
-
- // push state.
- Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
- for (Map.Entry<Address, Map<Object, InternalCacheValue>> entry : statemap.getState().entrySet()) {
- if (log.isDebugEnabled()) log.debug("Pushing {0} entries to {1}", entry.getValue().size(), entry.getKey());
- RehashControlCommand push = cf.buildRehashControlCommand(self, entry.getValue());
- NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
- pushFutures.add(f);
- rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, configuration.getRehashRpcTimeout());
- }
-
- for (Future f : pushFutures) f.get();
-
- processAndDrainTxLog(oldCH, dmi.getConsistentHash(), replCount);
-
- completedSuccessfully = true;
- invalidateInvalidHolders(oldCH, dmi.getConsistentHash());
- if (log.isInfoEnabled())
- log.info("Completed in {0}!", Util.prettyPrintTime(System.currentTimeMillis() - start));
- } catch (InterruptedException ie) {
- if (log.isInfoEnabled())
- log.info("Interrupted after {0}! Completed successfully? {1}", Util.prettyPrintTime(System.currentTimeMillis() - start), completedSuccessfully);
- } catch (Exception e) {
- log.error("Caught exception! Completed successfully? {0}", e, completedSuccessfully);
- }
- finally {
- if (completedSuccessfully) leavers.removeAll(leaversHandled);
- }
- }
-
- @Override
- protected Log getLog() {
- return log;
- }
-
- private void processAndDrainTxLog(ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- if (trace) log.trace("Processing transaction log iteratively");
-
- List<WriteCommand> c;
- int i = 0;
- TransactionLogger transactionLogger = dmi.getTransactionLogger();
- while (transactionLogger.shouldDrainWithoutLock()) {
- if (trace) log.trace("Processing transaction log, iteration {0}", i++);
- c = transactionLogger.drain();
- if (trace) log.trace("Found {0} modifications", c.size());
- apply(oldCH, newCH, replCount, c);
- }
-
- if (trace) log.trace("Processing transaction log: final drain and lock");
- c = transactionLogger.drainAndLock();
- if (trace) log.trace("Found {0} modifications", c.size());
- apply(oldCH, newCH, replCount, c);
-
- if (trace) log.trace("Handling pending prepares");
- PendingPreparesMap state = new PendingPreparesMap(leavers, oldCH, newCH, replCount);
- Collection<PrepareCommand> pendingPrepares = transactionLogger.getPendingPrepares();
- if (trace) log.trace("Found {0} pending prepares", pendingPrepares.size());
- for (PrepareCommand pc : pendingPrepares) state.addState(pc);
-
- if (trace) log.trace("State map for pending prepares is {0}", state.getState());
-
- Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
- for (Map.Entry<Address, List<PrepareCommand>> e : state.getState().entrySet()) {
- if (log.isDebugEnabled())
- log.debug("Pushing {0} uncommitted prepares to {1}", e.getValue().size(), e.getKey());
- RehashControlCommand push = cf.buildRehashControlCommandTxLogPendingPrepares(self, e.getValue());
- NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
- pushFutures.add(f);
- rpcManager.invokeRemotelyInFuture(Collections.singleton(e.getKey()), push, true, f, configuration.getRehashRpcTimeout());
- }
-
- for (Future f : pushFutures) {
- try {
- f.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- log.error("Error pushing tx log", e);
- }
- }
- if (trace) log.trace("Finished pushing pending prepares; unlocking and disabling transaction logging");
-
- transactionLogger.unlockAndDisable();
- }
-
- private void apply(ConsistentHash oldCH, ConsistentHash newCH, int replCount, List<WriteCommand> wc) {
- // need to create another "state map"
- TransactionLogMap state = new TransactionLogMap(leavers, oldCH, newCH, replCount);
- for (WriteCommand c : wc) state.addState(c);
-
- if (trace) log.trace("State map for modifications is {0}", state.getState());
-
- Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
- for (Map.Entry<Address, List<WriteCommand>> entry : state.getState().entrySet()) {
- if (log.isDebugEnabled())
- log.debug("Pushing {0} modifications to {1}", entry.getValue().size(), entry.getKey());
- RehashControlCommand push = cf.buildRehashControlCommandTxLog(self, entry.getValue());
- NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
- pushFutures.add(f);
- rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, configuration.getRehashRpcTimeout());
- }
-
- for (Future f : pushFutures) {
- try {
- f.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- log.error("Error pushing tx log", e);
- }
- }
- }
-
- @Override
- protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
- Collection<Address> l = super.getInvalidHolders(key, chOld, chNew);
- l.removeAll(leaversHandled);
- return l;
- }
-}
-
-abstract class StateMap<S> {
- List<Address> leavers;
- ConsistentHash oldCH, newCH;
- int replCount;
- Map<Address, S> state = new HashMap<Address, S>();
- protected static final Log log = LogFactory.getLog(LeaveTask.class);
-
- StateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- this.leavers = leavers;
- this.oldCH = oldCH;
- this.newCH = newCH;
- this.replCount = replCount;
- }
-
- Map<Address, S> getState() {
- return state;
- }
-}
-
-class InMemoryStateMap extends StateMap<Map<Object, InternalCacheValue>> {
- Address self;
- Set<Object> keysHandled = new HashSet<Object>();
-
- InMemoryStateMap(List<Address> leavers, Address self, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- super(leavers, oldCH, newCH, replCount);
- this.self = self;
- }
-
- /**
- * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
- * owner list
- *
- * @param payload an InternalCacheEntry to add to the state map
- */
- void addState(InternalCacheEntry payload) {
- addState(payload, null, null);
- }
-
- /**
- * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
- * owner list, retrieving the value from a cache loader
- */
- void addState(Object key, CacheLoader loader) {
- addState(null, key, loader);
- }
-
-
- private void addState(InternalCacheEntry ice, Object k, CacheLoader loader) {
- Object key = ice == null ? k : ice.getKey();
- if (keysHandled.contains(key)) return;
-
- InternalCacheValue icv = null;
-
- for (Address leaver : leavers) {
- List<Address> owners = oldCH.locate(key, replCount);
- int leaverIndex = owners.indexOf(leaver);
- if (leaverIndex > -1) {
- int numOwners = owners.size();
- int selfIndex = owners.indexOf(self);
- boolean isLeaverLast = leaverIndex == numOwners - 1;
- if ((isLeaverLast && selfIndex == numOwners - 2) ||
- (!isLeaverLast && selfIndex == leaverIndex + 1)) {
- // add to state map!
- List<Address> newOwners = newCH.locate(key, replCount);
- newOwners.removeAll(owners);
- if (!newOwners.isEmpty()) {
- for (Address no : newOwners) {
- Map<Object, InternalCacheValue> s = state.get(no);
- if (s == null) {
- s = new HashMap<Object, InternalCacheValue>();
- state.put(no, s);
- }
-
- if (icv == null) {
- if (ice == null) {
- try {
- InternalCacheEntry payload = loader.load(key);
- if (payload != null) icv = payload.toInternalCacheValue();
- } catch (Exception e) {
- log.warn("Unable to load " + key + " from cache loader", e);
- }
- } else {
- icv = ice.toInternalCacheValue();
- }
- }
- s.put(key, icv);
- }
- }
- }
- }
- }
- keysHandled.add(key);
- }
-}
-
-/**
- * A state map that aggregates {@link ReplicableCommand}s according to recipient affected.
- *
- * @param <T> type of replicable command to aggregate
- */
-abstract class CommandAggregatingStateMap<T extends ReplicableCommand> extends StateMap<List<T>> {
- Set<Object> keysHandled = new HashSet<Object>();
-
- CommandAggregatingStateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- super(leavers, oldCH, newCH, replCount);
- }
-
- // if only Java had duck-typing!
- abstract Set<Object> getAffectedKeys(T payload);
-
- /**
- * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
- * owner list
- *
- * @param payload payload to consider when adding to the aggregate state
- */
- void addState(T payload) {
- for (Object key : getAffectedKeys(payload)) {
- for (Address leaver : leavers) {
- List<Address> owners = oldCH.locate(key, replCount);
- int leaverIndex = owners.indexOf(leaver);
- if (leaverIndex > -1) {
- // add to state map!
- List<Address> newOwners = newCH.locate(key, replCount);
- newOwners.removeAll(owners);
- if (!newOwners.isEmpty()) {
- for (Address no : newOwners) {
- List<T> s = state.get(no);
- if (s == null) {
- s = new LinkedList<T>();
- state.put(no, s);
- }
- s.add(payload);
- }
- }
- }
- }
- }
- }
-}
-
-/**
- * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush pending prepares
- * to nodes during a leave.
- */
-class PendingPreparesMap extends CommandAggregatingStateMap<PrepareCommand> {
- PendingPreparesMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- super(leavers, oldCH, newCH, replCount);
- }
-
- @Override
- Set<Object> getAffectedKeys(PrepareCommand payload) {
- return payload.getAffectedKeys();
- }
-}
-
-/**
- * Specific version of the CommandAggregatingStateMap that aggregates PrepareCommands, used to flush writes
- * made while state was being transferred to nodes during a leave.
- */
-class TransactionLogMap extends CommandAggregatingStateMap<WriteCommand> {
- TransactionLogMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
- super(leavers, oldCH, newCH, replCount);
- }
-
- @Override
- Set<Object> getAffectedKeys(WriteCommand payload) {
- return payload.getAffectedKeys();
- }
-}
\ No newline at end of file
More information about the infinispan-commits
mailing list