Hey Mircea,
On 12-03-20 7:38 AM, Mircea Markus wrote:
Hi Vladimir,
I'm a bit late, sorry. My comments below:
Feels like our Reducer is more of an Combiner as described by [1], as it enforce the result of the reduce (KOut) to have the same type as the entries it consumes (KOut).
Don't you think that if we add the Combiner/MapCombineCommand we can loosen up this constraint and allow the Reducer to output something else than KOut?
I tried playing with this but could not hash out nice API interface
that did not complicate everything too much. Let me see your API
interface proposal Mircea.
Once loaded into target execution node MapCombineCommand takes each
local KIn key and executes Mapper method void map(KIn key, VIn value,
Collector<KOut, VOut> collector). Results are collected to a common
Collector<KOut, VOut> collector and combine phase is initiated. A
Combiner, if specified, takes KOut keys and imediatelly invokes reduce
phase on keys. The result of mapping phase executed on each node is
<KOut, VOut> map M. There will be one resulting M map per execution node N.
So the combiner has the same output as the Mapper - just that this time it is in the form of an Map<KOut, VOut> instead of a Collector<Kout,Vout>?
Which is okay IMO, as at the end of the day the Combiner is just an extra polishing added to the mapping...
Yes exactly. Combiner is just a polisher allowing less reduction
later on - however one can only use combiner only if reduce
function is both associative and commutative, see : http://philippeadjiman.com/blog/2010/01/14/hadoop-tutorial-series-issue-4-to-use-or-not-to-use-a-combiner/
At the end of combine phase instead of returning map M to the master
task node (as we currently do), we now hash each KOut in map M and group
KOut keys by the execution node N they are hashed to. Each group of KOut
keys and its VOut values, hashed to the same node, is wrapped with a new
command Migrate. Command Migrate, which is very similar to
PutKeyValueCommand,executed on Infinispan target node N esentially
maintains KOut K -> List<VOut> mapping, i.e all KOut/VOut pairs from all
executed MapCombineCommands will be collocated on a node N where KOut is
hashed to and value for KOut will be a list of all VOut values. We
essentially collect all VOut values under each KOut for all executed
MapCombineCommands.
This potentially involves each node sending some state to all other nodes, i.e. (clusterSize-1)^2 RPCs. Not that this is necessarily a bad thing, as it is there to reduce contention on a single node.
Each node sending state to all other nodes, really? Isn't it that
each node sends state to numOwners nodes?
At this point MapCombineCommand has finished its execution; list of KOut
keys is returned to a master node and its MapReduceTask. We do not
return VOut values as we do not need them at master task node.
MapReduceTask is ready to start with reduce phase.
If you plan to add this to a design document, I think it would be a good idea to show an example of how the combiner would be used with the word numbering algorithm. You explained this quite well here, but that would make sure we're all on the same page on the role of combiner :-)
Deal!
Reduce phase
MapReduceTask initializes ReduceCommand with a user specified Reducer.
For each key KOut collected from a map phase we group them by execution
node N they are hashed to. For each node N and its grouped input KOut
keys MapReduceTask creates a ReduceCommand and sends it to a node N
where KOut keys are hashed.
the originator (node on which the M/R is run), at this point, doesn't know the entire set of KOut values produces, so it can't determine the list of nodes where to send the ReduceCommand - it would have to send it to all nodes, right?
No, it does know all KOut values but it does not know know all VOut
values (they are on the grid)!
Thanks a lot Mircea! Do you think overall this design makes sense?
Are you concerned about level of traffic for migrate intermediate
results phase?
Regards,
Vladimir