Hi Vladimir,
I'm a bit late, sorry. My comments below:
On 14 Feb 2012, at 15:19, Vladimir Blagojevic wrote:
Hey guys,
Before moving forward with next iteration of map reduce I wanted to hear
your thoughts about the following proposal. After we agree on the
general direction I will transcribe the agreed design on a wiki page and
start implementation.
Shortcoming of current map reduce implementation
While our current map reduce implementation is more than a proof of a
concept there are several drawbacks preventing it from being an
industrial grade map reduce solution. The main drawback is the inability
of the current solution to deal with a large data (in GB/TB) map reduce
problems. This shortcoming is mainly around our reduce phase execution.
Reduce phase, as you might know, is currently done on a single
Infinispan master task node; reduce phase of map reduce problems we can
support (data size wise) is therefore shrunk to a working memory of a
single node.
Proposed solution
The proposed solution involves distributing execution of reduce phase
tasks across the cluster thus effectively achieving higher reduce task
parallelization and at the same time removing the above mentioned reduce
phase restriction.
Something like a hierarchy (tree) of reducers?
Effectively leveraging our consistent hashing
solution even further we can parallelize reduce phase and elevate our
map reduce solution to an industrial level. Here is how we can achieve that.
Map phase
MapReduceTask, as it currently does, will hash task input keys and group
them by execution node N they are hashed to. For each node N and its
grouped input KIn keys MapReduceTask creates a MapCombineCommand which
is migrated to an execution target node N. MapCombineCommand is similar
to current MapReduceCommand. MapCombineCommand takes an instance of a
Mapper and an instance of a Reducer, which is a combiner [1].
Feels like our
Reducer is more of an Combiner as described by [1], as it enforce the result of the reduce
(KOut) to have the same type as the entries it consumes (KOut).
Don't you think that if we add the Combiner/MapCombineCommand we can loosen up this
constraint and allow the Reducer to output something else than KOut?
Once loaded into target execution node MapCombineCommand takes each
local KIn key and executes Mapper method void map(KIn key, VIn value,
Collector<KOut, VOut> collector). Results are collected to a common
Collector<KOut, VOut> collector and combine phase is initiated. A
Combiner, if specified, takes KOut keys and imediatelly invokes reduce
phase on keys. The result of mapping phase executed on each node is
<KOut, VOut> map M. There will be one resulting M map per execution node N.
So the combiner has the same output as the Mapper - just that this time it is in the
form of an Map<KOut, VOut> instead of a Collector<Kout,Vout>?
Which is okay IMO, as at the end of the day the Combiner is just an extra polishing added
to the mapping...
At the end of combine phase instead of returning map M to the master
task node (as we currently do), we now hash each KOut in map M and group
KOut keys by the execution node N they are hashed to. Each group of KOut
keys and its VOut values, hashed to the same node, is wrapped with a new
command Migrate. Command Migrate, which is very similar to
PutKeyValueCommand,executed on Infinispan target node N esentially
maintains KOut K -> List<VOut> mapping, i.e all KOut/VOut pairs from all
executed MapCombineCommands will be collocated on a node N where KOut is
hashed to and value for KOut will be a list of all VOut values. We
essentially collect all VOut values under each KOut for all executed
MapCombineCommands.
This potentially involves each node sending some state to all other nodes, i.e.
(clusterSize-1)^2 RPCs. Not that this is necessarily a bad thing, as it is there to reduce
contention on a single node.
At this point MapCombineCommand has finished its execution; list of
KOut
keys is returned to a master node and its MapReduceTask. We do not
return VOut values as we do not need them at master task node.
MapReduceTask is ready to start with reduce phase.
If you plan to add this to a design document, I think it would be a good idea to
show an example of how the combiner would be used with the word numbering algorithm. You
explained this quite well here, but that would make sure we're all on the same page on
the role of combiner :-)
Reduce phase
MapReduceTask initializes ReduceCommand with a user specified Reducer.
For each key KOut collected from a map phase we group them by execution
node N they are hashed to. For each node N and its grouped input KOut
keys MapReduceTask creates a ReduceCommand and sends it to a node N
where KOut keys are hashed.
the originator (node on which the M/R is run), at this
point, doesn't know the entire set of KOut values produces, so it can't determine
the list of nodes where to send the ReduceCommand - it would have to send it to all nodes,
right?
Once loaded on target execution node,
ReduceCommand for each KOut key grabs list of values VOut and invokes:
VOut reduce(KOut reducedKey, Iterator<VOut> iter).
A result of ReduceCommand is a map M where each key is KOut and value is
VOut. Each Infinispan execution node N returns one map M where each key
KOut is hashed to N and each VOut is KOut's reduced value.
When all ReduceCommands return to a calling node, MapReduceTask simply
combines all these M maps and returns final Map<KOut, VOut> as a result
of MapReduceTask. All intermediate KOut->List<VOut> maps left on
Infinispan cluster are then cleaned up.
[1] See section 4.3 of
http://research.google.com/archive/mapreduce.html