DIST, take 2!
Previous design failed due to two flaws:
1. Transaction logs maintained on sender, for each recipient. Plenty of scope for races, or heavily synchronized.
2. Consistent hash attempted to be overly fair in evenly dispersing nodes across a hash space. Meant that there was an often large and unnecessary amount of rehashing to do, which exacerbated the problem in 1.
So we have a new approach, based on the following premises.
1. Consistent hash (CH) based on fixed positions in the hash space rather than relative ones.
1.1. Pros: limited and finite rehashing, particularly when there is a leave.
1.1.1. If the leaver is L, only node (L - 1) and node (L + 1) will have to push state, and only (L + 1) and (L + replCount) will have to receive state.
1.2. Cons: uneven spread of load (mitigated with grid size)
1.3. Virtual nodes is a good way to reduce this, but leaving VN out for now since it adds complexity. VN won't change the rest of the algorithm anyway.
2. State is both pulled (on JOIN) and pushed (on LEAVE)
3. State transfers are implemented using RPC commands and byte array payloads
3.1. Will later evolve into streaming scheme
4. Implementation notes:
4.1. Each node has a DistributionManager (DM)
4.2. Each DM has a RehashExecutor (with 1 low prio thread)
4.3. LeaveTask and JoinTask are 2 types of tasks handled by this executor encapsulating the processing that takes place when there is a leave or join.
4.4. InstallConsistentHashCommand - an RPC command that "installs" a consistent hash instance on remote nodes.
4.5. GetConsistentHashCommand - an RPC command that "asks" a node to serialize and transmit across its current CH impl.
4.6. PullStateCommand - an RPC command that "asks" for state. Command should have an Address of the requestor. Return value for this command is the state.
4.7. PushStateCommand - an RPC command that "pushes" state by providing a payload of state.
4.8. Current RPC responses are either SuccessfulResponses or UnsuccessfulResponses. Introducing a new type, UnsureResponse to deal with eventual consistency semantics of concurrent operation during a rehash. ClusteredGetResponseFilter should look for a quorum for UnsureResponses.
5. JoinTask: This is a PULL based rehash. JoinTask is kicked off on the JOINER.
5.1. Obtain OLD_CH from coordinator (using GetConsistentHashCommand)
5.2. Generate TEMP_CH (which is a union of OLD_CH and NEW_CH)
5.3. Broadcast TEMP_CH across the cluster (using InstallConsistentHashCommand)
5.4. Log all incoming writes/txs and respond with a positive ack.
5.5. Ignore incoming reads, forcing callers to check next owner of data.
5.6. Ping each node in OLD_CH's view and ask for state (PullStateCommand)
5.7. Apply state received from 5.6.
5.8. Drain tx log and apply, stop logging writes once drained.
5.9. Reverse 5.5.
5.10. Broadcast NEW_CH so this is applied (using InstallConsistentHashCommand)
5.11. Loop through data container and unicast invalidations for keys that "could" exist on OLD_CH and not in NEW_CH
6. When a leave is detected (view change with 1 less member):
6.1. Make a note of the Leaver (L)
6.2. All nodes switch to a NEW_CH, which does not include L
6.3. Nodes (L - replCount + 1), (L + 1) and (L + replCount) kick off a LeaveTask.
6.3.1. Not everyone in the cluster need be involved in this rehash thanks to fixed CH positions.
7. LeaveTask: This is PUSH based
7.1. If an existing LeaveTask is running, the existing task should be cancelled first.
7.2. if is_receiver(): Log all writes/incoming txs
7.3. if is_receiver(): Incoming reads:
7.3.1. If the key was owned by L in OLD_CH and not by self, and in NEW_CH is owned by self, return UnsureResponse. Else, SuccessfulResponse.
7.4. if is_pusher():
7.4.1. Iterate over all keys in data container. If any key maps to L in OLD_CH, that key needs to be rehashed. Call addState(stateMap, k). See below for details of this algo.
7.4.2. Call PushStateCommand for each recipient in stateMap.
7.5. if is_receiver() and on receiving PushStateCommand
7.5.1. Drain tx log and apply, stop logging writes once drained.
7.5.2. Reverse 7.3.
8. A state map is a data structure representing state generated by each node to be pushed.
8.1. StateMap{
Map<Address, Map<Object, InternalCacheEntry>> state
}
8.2. addState(stateMap, k):
OL = OLD_CH.locate(k)
if (OL.contains(L) and ((position(L, OL) == last and current_address == L - 1) or (position(L, OL) != last and current_address == L + 1)) {
stateMap.add(NEW_CH.locate(k) - OL, k)
break
}
9. Functions to determine who pushes and who receives state during a leave:
9.1. is_pusher(address): return address == (L + 1) or (L - 1)
9.2. is_receiver(address): return address == (L + 1) or (L + replCount)
Concurrency
1. Deal with concurrent JOINs
1.1. Concurrent JOINs in completely disjoint parts of the hash space will have no overlap whatsoever.
1.2. Concurrent JOINs in the same hash space would work as well, since JOINs are controlled by the joiner.
2. Deal with concurrent LEAVEs
2.1. Concurrent LEAVEs in completely disjoint parts of the hash space will have no overlap whatsoever.
2.2. Concurrent LEAVEs that do not affect the same nodes involved in providing or receiving state will not be a problem.
2.3. Concurrent LEAVES affecting the same nodes already providing or receiving state will interrupt the LeaveTasks on those nodes. Leave tasks will need to clean up appropriately, which may involve issuing a "cleanup command" to ensure partially transmitted state is reconciled.
3. Deal with concurrent LEAVE and JOIN
3.1. Concurrent JOIN and LEAVEs in completely disjoint parts of the hash space will have no overlap whatsoever.
3.2. FOr overlapping areas of hash space, this needs further thought.