[infinispan-commits] Infinispan SVN: r1849 - in trunk/core/src/main/java/org/infinispan: commands/control and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue May 25 17:03:58 EDT 2010
Author: vblagojevic at jboss.com
Date: 2010-05-25 17:03:57 -0400 (Tue, 25 May 2010)
New Revision: 1849
Added:
trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
Modified:
trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
Log:
[ISPN-420] - Refactoring: merge rehash process when joining and leaving into a single codebase
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2010-05-25 21:03:57 UTC (rev 1849)
@@ -279,14 +279,20 @@
*/
RehashControlCommand buildRehashControlCommandTxLogPendingPrepares(Address sender, List<PrepareCommand> state);
+
/**
- * A more generic version of this factory method that allows the setting of various fields.
- *
- * @param subtype type of RehashControlCommand
- * @param sender sender's Address
- * @param state state to push
- * @param consistentHash consistent hash to deliver
- * @return a RehashControlCommand
+ * Builds a RehashControlCommand for coordinating a rehash event. This particular variation of RehashControlCommand
+ * coordinates rehashing of nodes when a node join or leaves
+ *
+ * @param subtype
+ * @param sender
+ * @param state
+ * @param oldCH
+ * @param leaversHandled
+ * @param newCH
+ * @return
*/
- RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash);
+ RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype,
+ Address sender, Map<Object, InternalCacheValue> state, ConsistentHash oldCH,
+ ConsistentHash newCH, List<Address> leaversHandled);
}
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2010-05-25 21:03:57 UTC (rev 1849)
@@ -290,11 +290,11 @@
}
public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender) {
- return buildRehashControlCommand(type, sender, null, null);
+ return buildRehashControlCommand(type, sender, null, null, null, null);
}
public RehashControlCommand buildRehashControlCommand(Address sender, Map<Object, InternalCacheValue> state) {
- return buildRehashControlCommand(PUSH_STATE, sender, state, null);
+ return buildRehashControlCommand(PUSH_STATE, sender, state, null, null, null);
}
public RehashControlCommand buildRehashControlCommandTxLog(Address sender, List<WriteCommand> commands) {
@@ -303,10 +303,11 @@
public RehashControlCommand buildRehashControlCommandTxLogPendingPrepares(Address sender, List<PrepareCommand> commands) {
return new RehashControlCommand(cacheName, DRAIN_TX_PREPARES, sender, null, commands, this);
+ }
+
+ public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type,
+ Address sender, Map<Object, InternalCacheValue> state, ConsistentHash oldCH,
+ ConsistentHash newCH, List<Address> leavers) {
+ return new RehashControlCommand(cacheName, type, sender, state, oldCH, newCH, leavers, this);
}
-
- public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type, Address sender, Map<Object, InternalCacheValue> state, ConsistentHash consistentHash) {
- return new RehashControlCommand(cacheName, type, sender, state, consistentHash, this);
- }
-
}
Modified: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2010-05-25 21:03:57 UTC (rev 1849)
@@ -45,13 +45,15 @@
public static final int COMMAND_ID = 17;
public enum Type {
- JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE, PUSH_STATE, DRAIN_TX, DRAIN_TX_PREPARES
+ JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_COMPLETE, JOIN_ABORT, PULL_STATE_JOIN, PULL_STATE_LEAVE, PUSH_STATE, DRAIN_TX, DRAIN_TX_PREPARES
}
Type type;
Address sender;
Map<Object, InternalCacheValue> state;
- ConsistentHash consistentHash;
+ ConsistentHash oldCH;
+ List<Address> nodesLeft;
+ ConsistentHash newCH;
// cache components
DistributionManager distributionManager;
@@ -67,13 +69,15 @@
}
- public RehashControlCommand(String cacheName, Type type, Address sender, Map<Object, InternalCacheValue> state,
- ConsistentHash consistentHash, CommandsFactory commandsFactory) {
+ public RehashControlCommand(String cacheName, Type type, Address sender, Map<Object, InternalCacheValue> state,ConsistentHash oldConsistentHash,
+ ConsistentHash consistentHash, List<Address> leavers, CommandsFactory commandsFactory) {
super(cacheName);
this.type = type;
this.sender = sender;
this.state = state;
- this.consistentHash = consistentHash;
+ this.oldCH = oldConsistentHash;
+ this.newCH = consistentHash;
+ this.nodesLeft = leavers;
this.commandsFactory = commandsFactory;
}
@@ -119,8 +123,10 @@
case JOIN_COMPLETE:
distributionManager.notifyJoinComplete(sender);
return null;
- case PULL_STATE:
- return pullState();
+ case PULL_STATE_JOIN:
+ return pullStateForJoin();
+ case PULL_STATE_LEAVE:
+ return pullStateForLeave();
case PUSH_STATE:
return pushState();
case DRAIN_TX:
@@ -133,29 +139,68 @@
throw new CacheException("Unknown rehash control command type " + type);
}
- public Map<Object, InternalCacheValue> pullState() throws CacheLoaderException {
- Address self = transport.getAddress();
- ConsistentHash oldCH = distributionManager.getConsistentHash();
- int numCopies = configuration.getNumOwners();
-
+ public Map<Object, InternalCacheValue> pullStateForJoin() throws CacheLoaderException {
+
Map<Object, InternalCacheValue> state = new HashMap<Object, InternalCacheValue>();
for (InternalCacheEntry ice : dataContainer) {
Object k = ice.getKey();
- if (shouldAddToMap(k, oldCH, numCopies, self)) state.put(k, ice.toInternalCacheValue());
+ if (shouldTransferOwnershipToJoinNode(k)) {
+ state.put(k, ice.toInternalCacheValue());
+ }
}
CacheStore cacheStore = distributionManager.getCacheStoreForRehashing();
if (cacheStore != null) {
for (Object k: cacheStore.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(dataContainer))) {
- if (!state.containsKey(k) && shouldAddToMap(k, oldCH, numCopies, self)) {
- InternalCacheValue v = loadValue(cacheStore, k);
+ if (!state.containsKey(k) && shouldTransferOwnershipToJoinNode(k)) {
+ InternalCacheValue v = loadValue(cacheStore, k);
if (v != null) state.put(k, v);
}
}
}
return state;
}
+
+ public Map<Object, InternalCacheValue> pullStateForLeave() throws CacheLoaderException {
+
+ Map<Object, InternalCacheValue> state = new HashMap<Object, InternalCacheValue>();
+ for (InternalCacheEntry ice : dataContainer) {
+ Object k = ice.getKey();
+ if (shouldTransferOwnershipFromLeftNodes(k)) {
+ state.put(k, ice.toInternalCacheValue());
+ }
+ }
+ CacheStore cacheStore = distributionManager.getCacheStoreForRehashing();
+ if (cacheStore != null) {
+ for (Object k : cacheStore.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(dataContainer))) {
+ if (!state.containsKey(k) && shouldTransferOwnershipFromLeftNodes(k)) {
+ InternalCacheValue v = loadValue(cacheStore, k);
+ if (v != null)
+ state.put(k, v);
+ }
+ }
+ }
+ return state;
+ }
+
+ private boolean shouldTransferOwnershipFromLeftNodes(Object k) {
+ Address self = transport.getAddress();
+ int numCopies = configuration.getNumOwners();
+
+ List<Address> oldList = oldCH.locate(k, numCopies);
+ boolean localToThisNode = oldList.indexOf(self) >= 0;
+ boolean senderIsNewOwner = newCH.isKeyLocalToAddress(sender, k, numCopies);
+ for (Address leftNodeAddress : nodesLeft) {
+ boolean localToLeftNode = oldList.indexOf(leftNodeAddress) >= 0;
+ if (localToLeftNode && senderIsNewOwner && localToThisNode) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
private InternalCacheValue loadValue(CacheStore cs, Object k) {
try {
InternalCacheEntry ice = cs.load(k);
@@ -166,16 +211,17 @@
return null;
}
- final boolean shouldAddToMap(Object k, ConsistentHash oldCH, int numCopies, Address self) {
- // if the current address is the current "owner" of this key (in old_ch), and the requestor is in the owner list
- // in new_ch, then add this to the map.
+ final boolean shouldTransferOwnershipToJoinNode(Object k) {
+ Address self = transport.getAddress();
+ int numCopies = configuration.getNumOwners();
List<Address> oldOwnerList = oldCH.locate(k, numCopies);
if (!oldOwnerList.isEmpty() && self.equals(oldOwnerList.get(0))) {
- List<Address> newOwnerList = consistentHash.locate(k, numCopies);
+ List<Address> newOwnerList = newCH.locate(k, numCopies);
if (newOwnerList.contains(sender)) return true;
}
return false;
}
+
public Object pushState() {
distributionManager.applyReceivedState(state);
@@ -187,7 +233,7 @@
}
public Object[] getParameters() {
- return new Object[]{cacheName, (byte) type.ordinal(), sender, state, consistentHash, txLogCommands, pendingPrepares};
+ return new Object[]{cacheName, (byte) type.ordinal(), sender, state, oldCH, nodesLeft, newCH, txLogCommands, pendingPrepares};
}
@SuppressWarnings("unchecked")
@@ -197,7 +243,9 @@
type = Type.values()[(Byte) parameters[i++]];
sender = (Address) parameters[i++];
state = (Map<Object, InternalCacheValue>) parameters[i++];
- consistentHash = (ConsistentHash) parameters[i++];
+ oldCH = (ConsistentHash) parameters[i++];
+ nodesLeft = (List<Address>) parameters[i++];
+ newCH = (ConsistentHash) parameters[i++];
txLogCommands = (List<WriteCommand>) parameters[i++];
pendingPrepares = (List<PrepareCommand>) parameters[i++];
}
@@ -208,7 +256,9 @@
"type=" + type +
", sender=" + sender +
", state=" + state +
- ", consistentHash=" + consistentHash +
+ ", oldConsistentHash=" + oldCH +
+ ", nodesLeft=" + nodesLeft +
+ ", consistentHash=" + newCH +
", txLogCommands=" + txLogCommands +
", pendingPrepares=" + pendingPrepares +
'}';
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-05-25 21:03:57 UTC (rev 1849)
@@ -57,7 +57,6 @@
import java.util.Set;
import java.util.HashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -160,7 +159,7 @@
consistentHash = createConsistentHash(configuration, members);
self = t.getAddress();
if (members.size() > 1 && !t.getCoordinator().equals(self)) {
- JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, transactionLogger, dataContainer, this);
+ JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, dataContainer, this);
joinFuture = rehashExecutor.submit(joinTask);
} else {
joinComplete = true;
@@ -189,6 +188,8 @@
boolean willReceiveLeaverState = willReceiveLeaverState(leaver);
boolean willSendLeaverState = willSendLeaverState(leaver);
+ List<Address> stateProviders = holdersOfLeaversState(newMembers, leaver);
+
try {
if (!(consistentHash instanceof UnionConsistentHash)) oldConsistentHash = consistentHash;
else oldConsistentHash = ((UnionConsistentHash) consistentHash).newCH;
@@ -196,23 +197,21 @@
} catch (Exception e) {
log.fatal("Unable to process leaver!!", e);
throw new CacheException(e);
- }
+ }
if (willReceiveLeaverState || willSendLeaverState) {
- log.info("Starting transaction logging!");
+ log.info("I {0} am participating in rehash", rpcManager.getTransport().getAddress());
transactionLogger.enable();
- }
- if (willSendLeaverState) {
- if (leaveTaskFuture != null && (!leaveTaskFuture.isCancelled() || !leaveTaskFuture.isDone())) {
+ if (leaveTaskFuture != null
+ && (!leaveTaskFuture.isCancelled() || !leaveTaskFuture.isDone())) {
leaveTaskFuture.cancel(true);
}
leavers.add(leaver);
- LeaveTask task = new LeaveTask(this, rpcManager, configuration, leavers, transactionLogger, cf, dataContainer);
+ InvertedLeaveTask task = new InvertedLeaveTask(this, rpcManager, configuration, cf,
+ dataContainer, leavers, stateProviders, willReceiveLeaverState);
leaveTaskFuture = rehashExecutor.submit(task);
-
- log.info("Need to rehash");
} else {
log.info("Not in same subspace, so ignoring leave event");
}
@@ -223,6 +222,18 @@
ConsistentHash ch = consistentHash instanceof UnionConsistentHash ? oldConsistentHash : consistentHash;
return ch.isAdjacent(leaver, self);
}
+
+ List<Address> holdersOfLeaversState(List<Address> members, Address leaver) {
+ ConsistentHash ch = consistentHash instanceof UnionConsistentHash ? oldConsistentHash: consistentHash;
+ Set<Address> holders = new HashSet<Address>();
+ for (Address address : members) {
+
+ if (ch.isAdjacent(leaver, address)) {
+ holders.add(address);
+ }
+ }
+ return new ArrayList<Address>(holders);
+ }
boolean willReceiveLeaverState(Address leaver) {
ConsistentHash ch = consistentHash instanceof UnionConsistentHash ? oldConsistentHash : consistentHash;
Added: trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java 2010-05-25 21:03:57 UTC (rev 1849)
@@ -0,0 +1,218 @@
+package org.infinispan.distribution;
+
+import static org.infinispan.commands.control.RehashControlCommand.Type.PULL_STATE_LEAVE;
+import static org.infinispan.remoting.rpc.ResponseMode.SYNCHRONOUS;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.infinispan.CacheException;
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.control.RehashControlCommand;
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.config.Configuration;
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.remoting.responses.Response;
+import org.infinispan.remoting.responses.SuccessfulResponse;
+import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ *A task to handle rehashing for when a node leaves the cluster
+ *
+ * @author Vladimir Blagojevic
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class InvertedLeaveTask extends RehashTask {
+
+ private static final Log log = LogFactory.getLog(InvertedLeaveTask.class);
+ private static final boolean trace = true;
+ private final List<Address> leavers;
+ private final Address self;
+ private final List<Address> leaversHandled;
+ private final List<Address> stateProviders;
+ private final boolean isReceiver;
+
+ public InvertedLeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration conf,
+ CommandsFactory commandsFactory, DataContainer dataContainer, List<Address> leavers,
+ List<Address> stateProviders, boolean isReceiver) {
+ super(dmi, rpcManager, conf, commandsFactory, dataContainer);
+ this.leavers = leavers;
+ this.leaversHandled = new LinkedList<Address>(leavers);
+ this.stateProviders = stateProviders;
+ this.isReceiver = isReceiver;
+ this.self = rpcManager.getTransport().getAddress();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<Object, InternalCacheValue> getStateFromResponse(SuccessfulResponse r) {
+ return (Map<Object, InternalCacheValue>) r.getResponseValue();
+ }
+
+ protected void performRehash() throws Exception {
+ long start = System.currentTimeMillis();
+ boolean trace = log.isTraceEnabled();
+
+ int replCount = configuration.getNumOwners();
+ ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled);
+ ConsistentHash newCH = dmi.getConsistentHash();
+ try {
+ if (log.isDebugEnabled()) {
+ if (isReceiver) {
+ log.debug("Commencing rehash at {0}, I am a state receiver", self);
+ } else {
+ log.debug("Commencing rehash at {0}, I am a state producer", self);
+ }
+ }
+ if (configuration.isRehashEnabled()) {
+ if (isReceiver) {
+ Address myAddress = rpcManager.getTransport().getAddress();
+ RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE_LEAVE, myAddress,
+ null, oldCH, newCH,leaversHandled);
+
+ List<Address> addressesWhoMaySendStuff = getStateProviderTargets();
+ log.debug("I {0} am pulling state from {1}", self, addressesWhoMaySendStuff);
+ List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd,
+ SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
+
+ log.debug("I {0} received response {1} ", self, resps);
+ for (Response r : resps) {
+ if (r instanceof SuccessfulResponse) {
+ Map<Object, InternalCacheValue> state = getStateFromResponse((SuccessfulResponse) r);
+ log.debug("I {0} am applying state {1} ", self, state);
+ dmi.applyState(newCH, state);
+ }
+ }
+ }
+ processAndDrainTxLog(oldCH, newCH, replCount);
+ invalidateInvalidHolders(leaversHandled, oldCH, newCH);
+ } else {
+ if (trace)
+ log.trace("Rehash not enabled, so not pulling state.");
+ }
+ if (trace)
+ log.info("{0} completed leave rehash in {1}!", self, Util.prettyPrintTime(System.currentTimeMillis()
+ - start));
+ } catch (Exception e) {
+ throw new CacheException("Unexpected exception", e);
+ } finally {
+ leavers.removeAll(leaversHandled);
+ }
+ }
+
+ private List<Address> getStateProviderTargets() {
+ return stateProviders;
+ }
+
+ private void processAndDrainTxLog(ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ if (trace)
+ log.trace("Processing transaction log iteratively");
+
+ List<WriteCommand> c;
+ int i = 0;
+ TransactionLogger transactionLogger = dmi.getTransactionLogger();
+ while (transactionLogger.shouldDrainWithoutLock()) {
+ if (trace)
+ log.trace("Processing transaction log, iteration {0}", i++);
+ c = transactionLogger.drain();
+ if (trace)
+ log.trace("Found {0} modifications", c.size());
+ apply(oldCH, newCH, replCount, c);
+ }
+
+ if (trace)
+ log.trace("Processing transaction log: final drain and lock");
+ c = transactionLogger.drainAndLock();
+ if (trace)
+ log.trace("Found {0} modifications", c.size());
+ apply(oldCH, newCH, replCount, c);
+
+ if (trace)
+ log.trace("Handling pending prepares");
+ PendingPreparesMap state = new PendingPreparesMap(leavers, oldCH, newCH, replCount);
+ Collection<PrepareCommand> pendingPrepares = transactionLogger.getPendingPrepares();
+ if (trace)
+ log.trace("Found {0} pending prepares", pendingPrepares.size());
+ for (PrepareCommand pc : pendingPrepares)
+ state.addState(pc);
+
+ if (trace)
+ log.trace("State map for pending prepares is {0}", state.getState());
+
+ Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
+ for (Map.Entry<Address, List<PrepareCommand>> e : state.getState().entrySet()) {
+ if (log.isDebugEnabled())
+ log.debug("Pushing {0} uncommitted prepares to {1}", e.getValue().size(), e.getKey());
+ RehashControlCommand push = cf.buildRehashControlCommandTxLogPendingPrepares(self, e.getValue());
+ NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
+ pushFutures.add(f);
+ rpcManager.invokeRemotelyInFuture(Collections.singleton(e.getKey()), push, true, f, configuration.getRehashRpcTimeout());
+ }
+
+ for (Future f : pushFutures) {
+ try {
+ f.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Error pushing tx log", e);
+ }
+ }
+ if (trace)
+ log.trace("Finished pushing pending prepares; unlocking and disabling transaction logging");
+
+ transactionLogger.unlockAndDisable();
+ }
+
+ private void apply(ConsistentHash oldCH, ConsistentHash newCH, int replCount,
+ List<WriteCommand> wc) {
+ // need to create another "state map"
+ TransactionLogMap state = new TransactionLogMap(leavers, oldCH, newCH, replCount);
+ for (WriteCommand c : wc)
+ state.addState(c);
+
+ if (trace)
+ log.trace("State map for modifications is {0}", state.getState());
+
+ Set<Future<Object>> pushFutures = new HashSet<Future<Object>>();
+ for (Map.Entry<Address, List<WriteCommand>> entry : state.getState().entrySet()) {
+ if (log.isDebugEnabled())
+ log.debug("Pushing {0} modifications to {1}", entry.getValue().size(), entry.getKey());
+ RehashControlCommand push = cf.buildRehashControlCommandTxLog(self, entry.getValue());
+ NotifyingNotifiableFuture<Object> f = new NotifyingFutureImpl(null);
+ pushFutures.add(f);
+ rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f,
+ configuration.getRehashRpcTimeout());
+ }
+
+ for (Future f : pushFutures) {
+ try {
+ f.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Error pushing tx log", e);
+ }
+ }
+ }
+
+ @Override
+ protected Log getLog() {
+ return log;
+ }
+}
Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2010-05-25 21:03:57 UTC (rev 1849)
@@ -45,15 +45,13 @@
public class JoinTask extends RehashTask {
private static final Log log = LogFactory.getLog(JoinTask.class);
- ConsistentHash chOld;
- ConsistentHash chNew;
- Address self;
+ private final Address self;
public JoinTask(RpcManager rpcManager, CommandsFactory commandsFactory, Configuration conf,
- TransactionLogger transactionLogger, DataContainer dataContainer, DistributionManagerImpl dmi) {
- super(dmi, rpcManager, conf, transactionLogger, commandsFactory, dataContainer);
+ DataContainer dataContainer, DistributionManagerImpl dmi) {
+ super(dmi, rpcManager, conf, commandsFactory, dataContainer);
this.dataContainer = dataContainer;
- self = rpcManager.getTransport().getAddress();
+ this.self = rpcManager.getTransport().getAddress();
}
@SuppressWarnings("unchecked")
@@ -75,45 +73,15 @@
long start = System.currentTimeMillis();
boolean trace = log.isTraceEnabled();
if (log.isDebugEnabled()) log.debug("Commencing");
+ TransactionLogger transactionLogger = dmi.getTransactionLogger();
boolean unlocked = false;
+ ConsistentHash chOld;
+ ConsistentHash chNew;
try {
dmi.joinComplete = false;
- // 1. Get chOld from coord.
- // this happens in a loop to ensure we receive the correct CH and not a "union".
- // TODO make at least *some* of these configurable!
- long minSleepTime = 500, maxSleepTime = 2000; // sleep time between retries
- int maxWaitTime = (int) configuration.getRehashRpcTimeout() * 10; // after which we give up!
- Random rand = new Random();
- long giveupTime = System.currentTimeMillis() + maxWaitTime;
- do {
- if (trace) log.trace("Requesting old consistent hash from coordinator");
- List<Response> resp;
- List<Address> addresses;
- try {
- resp = rpcManager.invokeRemotely(coordinator(), cf.buildRehashControlCommand(JOIN_REQ, self),
- SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
- addresses = parseResponses(resp);
- if (log.isDebugEnabled()) log.debug("Retrieved old consistent hash address list {0}", addresses);
- } catch (TimeoutException te) {
- // timed out waiting for responses; retry!
- resp = null;
- addresses = null;
- if (log.isDebugEnabled()) log.debug("Timed out waiting for responses.");
- }
+ // 1. Get chOld from coord.
+ chOld = retrieveOldCH(trace);
- if (addresses == null) {
- long time = rand.nextInt((int) (maxSleepTime - minSleepTime) / 10);
- time = (time * 10) + minSleepTime;
- if (trace) log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
- Thread.sleep(time); // sleep for a while and retry
- } else {
- chOld = createConsistentHash(configuration, addresses);
- }
- } while (chOld == null && System.currentTimeMillis() < giveupTime);
-
- if (chOld == null)
- throw new CacheException("Unable to retrieve old consistent hash from coordinator even after several attempts at sleeping and retrying!");
-
// 2. new CH instance
if (chOld.getCaches().contains(self))
chNew = chOld;
@@ -133,9 +101,10 @@
// 6. pull state from everyone.
Address myAddress = rpcManager.getTransport().getAddress();
- RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE, myAddress, null, chNew);
+
+ RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE_JOIN, myAddress, null, chOld, chNew,null);
// TODO I should be able to process state chunks from different nodes simultaneously!!
- List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff(configuration.getNumOwners());
+ List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff(chNew, configuration.getNumOwners());
List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd, SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
// 7. Apply state
@@ -177,6 +146,53 @@
}
}
+ private ConsistentHash retrieveOldCH(boolean trace) throws InterruptedException, IllegalAccessException,
+ InstantiationException, ClassNotFoundException {
+
+ // this happens in a loop to ensure we receive the correct CH and not a "union".
+ // TODO make at least *some* of these configurable!
+ ConsistentHash result = null;
+ long minSleepTime = 500, maxSleepTime = 2000; // sleep time between retries
+ int maxWaitTime = (int) configuration.getRehashRpcTimeout() * 10; // after which we give up!
+ Random rand = new Random();
+ long giveupTime = System.currentTimeMillis() + maxWaitTime;
+ do {
+ if (trace)
+ log.trace("Requesting old consistent hash from coordinator");
+ List<Response> resp;
+ List<Address> addresses;
+ try {
+ resp = rpcManager.invokeRemotely(coordinator(), cf.buildRehashControlCommand(
+ JOIN_REQ, self), SYNCHRONOUS, configuration.getRehashRpcTimeout(),
+ true);
+ addresses = parseResponses(resp);
+ if (log.isDebugEnabled())
+ log.debug("Retrieved old consistent hash address list {0}", addresses);
+ } catch (TimeoutException te) {
+ // timed out waiting for responses; retry!
+ resp = null;
+ addresses = null;
+ if (log.isDebugEnabled())
+ log.debug("Timed out waiting for responses.");
+ }
+
+ if (addresses == null) {
+ long time = rand.nextInt((int) (maxSleepTime - minSleepTime) / 10);
+ time = (time * 10) + minSleepTime;
+ if (trace)
+ log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
+ Thread.sleep(time); // sleep for a while and retry
+ } else {
+ result = createConsistentHash(configuration, addresses);
+ }
+ } while (result == null && System.currentTimeMillis() < giveupTime);
+
+ if (result == null)
+ throw new CacheException(
+ "Unable to retrieve old consistent hash from coordinator even after several attempts at sleeping and retrying!");
+ return result;
+ }
+
@Override
protected Log getLog() {
return log;
@@ -193,9 +209,9 @@
* @param replCount
* @return
*/
- List<Address> getAddressesWhoMaySendStuff(int replCount) {
+ List<Address> getAddressesWhoMaySendStuff(ConsistentHash ch, int replCount) {
List<Address> l = new LinkedList<Address>();
- List<Address> caches = chNew.getCaches();
+ List<Address> caches = ch.getCaches();
int selfIdx = caches.indexOf(self);
if (selfIdx >= replCount - 1) {
l.addAll(caches.subList(selfIdx - replCount + 1, selfIdx));
Modified: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2010-05-25 21:03:57 UTC (rev 1849)
@@ -45,9 +45,10 @@
private final List<Address> leaversHandled;
- protected LeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration, List<Address> leavers,
- TransactionLogger transactionLogger, CommandsFactory cf, DataContainer dataContainer) {
- super(dmi, rpcManager, configuration, transactionLogger, cf, dataContainer);
+ protected LeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager,
+ Configuration configuration, List<Address> leavers, CommandsFactory cf,
+ DataContainer dataContainer) {
+ super(dmi, rpcManager, configuration, cf, dataContainer);
this.leavers = leavers;
this.leaversHandled = new LinkedList<Address>(leavers);
this.self = rpcManager.getTransport().getAddress();
@@ -115,6 +116,7 @@
List<WriteCommand> c;
int i = 0;
+ TransactionLogger transactionLogger = dmi.getTransactionLogger();
while (transactionLogger.shouldDrainWithoutLock()) {
if (trace) log.trace("Processing transaction log, iteration {0}", i++);
c = transactionLogger.drain();
Modified: trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java 2010-05-25 15:46:19 UTC (rev 1848)
+++ trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java 2010-05-25 21:03:57 UTC (rev 1849)
@@ -33,17 +33,15 @@
DistributionManagerImpl dmi;
RpcManager rpcManager;
- Configuration configuration;
- TransactionLogger transactionLogger;
+ Configuration configuration;
CommandsFactory cf;
DataContainer dataContainer;
- protected RehashTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration,
- TransactionLogger transactionLogger, CommandsFactory cf, DataContainer dataContainer) {
+ protected RehashTask(DistributionManagerImpl dmi, RpcManager rpcManager,
+ Configuration configuration, CommandsFactory cf, DataContainer dataContainer) {
this.dmi = dmi;
this.rpcManager = rpcManager;
this.configuration = configuration;
- this.transactionLogger = transactionLogger;
this.cf = cf;
this.dataContainer = dataContainer;
}
@@ -66,7 +64,7 @@
return Collections.singleton(rpcManager.getTransport().getCoordinator());
}
- protected void invalidateInvalidHolders(ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
+ protected void invalidateInvalidHolders(List<Address> doNotInvalidate, ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
if (getLog().isDebugEnabled()) getLog().debug("Invalidating entries that have migrated across");
Map<Address, Set<Object>> invalidations = new HashMap<Address, Set<Object>>();
for (Object key : dataContainer.keySet()) {
@@ -81,6 +79,7 @@
}
}
+ invalidations.keySet().removeAll(doNotInvalidate);
Set<Future> futures = new HashSet<Future>();
for (Map.Entry<Address, Set<Object>> e : invalidations.entrySet()) {
@@ -92,6 +91,10 @@
for (Future f : futures) f.get();
}
+ protected void invalidateInvalidHolders(ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
+ List<Address> none = Collections.emptyList();
+ invalidateInvalidHolders(none, chOld, chNew);
+ }
protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
List<Address> oldOwners = chOld.locate(key, configuration.getNumOwners());
More information about the infinispan-commits
mailing list