[infinispan-dev] Map Reduce 2.0
Vladimir Blagojevic
vblagoje at redhat.com
Tue Feb 14 10:19:09 EST 2012
Hey guys,
Before moving forward with next iteration of map reduce I wanted to hear
your thoughts about the following proposal. After we agree on the
general direction I will transcribe the agreed design on a wiki page and
start implementation.
Shortcoming of current map reduce implementation
While our current map reduce implementation is more than a proof of a
concept there are several drawbacks preventing it from being an
industrial grade map reduce solution. The main drawback is the inability
of the current solution to deal with a large data (in GB/TB) map reduce
problems. This shortcoming is mainly around our reduce phase execution.
Reduce phase, as you might know, is currently done on a single
Infinispan master task node; reduce phase of map reduce problems we can
support (data size wise) is therefore shrunk to a working memory of a
single node.
Proposed solution
The proposed solution involves distributing execution of reduce phase
tasks across the cluster thus effectively achieving higher reduce task
parallelization and at the same time removing the above mentioned reduce
phase restriction. Effectively leveraging our consistent hashing
solution even further we can parallelize reduce phase and elevate our
map reduce solution to an industrial level. Here is how we can achieve that.
Map phase
MapReduceTask, as it currently does, will hash task input keys and group
them by execution node N they are hashed to. For each node N and its
grouped input KIn keys MapReduceTask creates a MapCombineCommand which
is migrated to an execution target node N. MapCombineCommand is similar
to current MapReduceCommand. MapCombineCommand takes an instance of a
Mapper and an instance of a Reducer, which is a combiner [1].
Once loaded into target execution node MapCombineCommand takes each
local KIn key and executes Mapper method void map(KIn key, VIn value,
Collector<KOut, VOut> collector). Results are collected to a common
Collector<KOut, VOut> collector and combine phase is initiated. A
Combiner, if specified, takes KOut keys and imediatelly invokes reduce
phase on keys. The result of mapping phase executed on each node is
<KOut, VOut> map M. There will be one resulting M map per execution node N.
At the end of combine phase instead of returning map M to the master
task node (as we currently do), we now hash each KOut in map M and group
KOut keys by the execution node N they are hashed to. Each group of KOut
keys and its VOut values, hashed to the same node, is wrapped with a new
command Migrate. Command Migrate, which is very similar to
PutKeyValueCommand,executed on Infinispan target node N esentially
maintains KOut K -> List<VOut> mapping, i.e all KOut/VOut pairs from all
executed MapCombineCommands will be collocated on a node N where KOut is
hashed to and value for KOut will be a list of all VOut values. We
essentially collect all VOut values under each KOut for all executed
MapCombineCommands.
At this point MapCombineCommand has finished its execution; list of KOut
keys is returned to a master node and its MapReduceTask. We do not
return VOut values as we do not need them at master task node.
MapReduceTask is ready to start with reduce phase.
Reduce phase
MapReduceTask initializes ReduceCommand with a user specified Reducer.
For each key KOut collected from a map phase we group them by execution
node N they are hashed to. For each node N and its grouped input KOut
keys MapReduceTask creates a ReduceCommand and sends it to a node N
where KOut keys are hashed. Once loaded on target execution node,
ReduceCommand for each KOut key grabs list of values VOut and invokes:
VOut reduce(KOut reducedKey, Iterator<VOut> iter).
A result of ReduceCommand is a map M where each key is KOut and value is
VOut. Each Infinispan execution node N returns one map M where each key
KOut is hashed to N and each VOut is KOut's reduced value.
When all ReduceCommands return to a calling node, MapReduceTask simply
combines all these M maps and returns final Map<KOut, VOut> as a result
of MapReduceTask. All intermediate KOut->List<VOut> maps left on
Infinispan cluster are then cleaned up.
[1] See section 4.3 of http://research.google.com/archive/mapreduce.html
More information about the infinispan-dev
mailing list