[infinispan-dev] Map Reduce 2.0

Vladimir Blagojevic vblagoje at redhat.com
Tue Feb 14 10:19:09 EST 2012


Hey guys,

Before moving forward with next iteration of map reduce I wanted to hear 
your thoughts about the following proposal. After we agree on the 
general direction I will transcribe the agreed design on a wiki page and 
start implementation.


Shortcoming of current map reduce implementation

While our current map reduce implementation is more than a proof of a 
concept there are several drawbacks preventing it from being an 
industrial grade map reduce solution. The main drawback is the inability 
of the current solution to deal with a large data (in GB/TB) map reduce 
problems. This shortcoming is mainly around our reduce phase execution. 
Reduce phase, as you might know, is currently done on a single 
Infinispan master task node; reduce phase of map reduce problems we can 
support (data size wise) is therefore shrunk to a working memory of a 
single node.


Proposed solution

The proposed solution involves distributing execution of reduce phase 
tasks across the cluster thus effectively achieving higher reduce task 
parallelization and at the same time removing the above mentioned reduce 
phase restriction. Effectively leveraging our consistent hashing 
solution even further we can parallelize reduce phase and elevate our 
map reduce solution to an industrial level. Here is how we can achieve that.

Map phase

MapReduceTask, as it currently does, will hash task input keys and group 
them by execution node N they are hashed to. For each node N and its 
grouped input KIn keys MapReduceTask creates a MapCombineCommand which 
is migrated to an execution target node N. MapCombineCommand is similar 
to current MapReduceCommand. MapCombineCommand takes an instance of a 
Mapper and an instance of a Reducer, which is a combiner [1].

Once loaded into target execution node MapCombineCommand takes each 
local KIn key and executes Mapper method  void map(KIn key, VIn value, 
Collector<KOut, VOut> collector). Results are collected to a common 
Collector<KOut, VOut> collector and combine phase is initiated. A 
Combiner, if specified, takes KOut keys and imediatelly invokes reduce 
phase on keys. The result of mapping phase executed on each node is 
<KOut, VOut> map M. There will be one resulting M map per execution node N.

At the end of combine phase instead of returning map M to the master 
task node (as we currently do), we now hash each KOut in map M and group 
KOut keys by the execution node N they are hashed to. Each group of KOut 
keys and its VOut values, hashed to the same node, is wrapped with a new 
command Migrate. Command Migrate, which is very similar to 
PutKeyValueCommand,executed on Infinispan target node N esentially 
maintains KOut K -> List<VOut> mapping, i.e all KOut/VOut pairs from all 
executed MapCombineCommands will be collocated on a node N where KOut is 
hashed to and value for KOut will be a list of all VOut values. We 
essentially collect all VOut values under each KOut for all executed 
MapCombineCommands.


At this point MapCombineCommand has finished its execution; list of KOut 
keys is returned to a master node and its MapReduceTask. We do not 
return VOut values as we do not need them at master task node. 
MapReduceTask is ready to start with reduce phase.


Reduce phase


MapReduceTask initializes ReduceCommand with a user specified Reducer. 
For each key KOut collected from a map phase we group them by execution 
node N they are hashed to. For each node N and its grouped input KOut 
keys MapReduceTask creates a ReduceCommand and sends it to a node N 
where KOut keys are hashed. Once loaded on target execution node, 
ReduceCommand for each KOut key grabs list of values VOut and invokes: 
VOut reduce(KOut reducedKey, Iterator<VOut> iter).

A result of ReduceCommand is a map M where each key is KOut and value is 
VOut. Each Infinispan execution node N returns one map M where each key 
KOut is hashed to N and each VOut is KOut's reduced value.

When all ReduceCommands return to a calling node, MapReduceTask simply 
combines all these M maps and returns final Map<KOut, VOut> as a result 
of MapReduceTask. All intermediate KOut->List<VOut> maps left on 
Infinispan cluster are then cleaned up.


[1] See section 4.3 of http://research.google.com/archive/mapreduce.html


More information about the infinispan-dev mailing list