[infinispan-dev] Map Reduce 2.0

Mircea Markus mircea.markus at jboss.com
Tue Mar 20 07:38:05 EDT 2012


Hi Vladimir,

I'm a bit late, sorry. My comments below:

On 14 Feb 2012, at 15:19, Vladimir Blagojevic wrote:

> Hey guys,
> 
> Before moving forward with next iteration of map reduce I wanted to hear 
> your thoughts about the following proposal. After we agree on the 
> general direction I will transcribe the agreed design on a wiki page and 
> start implementation.
> 
> 
> Shortcoming of current map reduce implementation
> 
> While our current map reduce implementation is more than a proof of a 
> concept there are several drawbacks preventing it from being an 
> industrial grade map reduce solution. The main drawback is the inability 
> of the current solution to deal with a large data (in GB/TB) map reduce 
> problems. This shortcoming is mainly around our reduce phase execution. 
> Reduce phase, as you might know, is currently done on a single 
> Infinispan master task node; reduce phase of map reduce problems we can 
> support (data size wise) is therefore shrunk to a working memory of a 
> single node.
> 
> 
> Proposed solution
> 
> The proposed solution involves distributing execution of reduce phase 
> tasks across the cluster thus effectively achieving higher reduce task 
> parallelization and at the same time removing the above mentioned reduce 
> phase restriction.
Something like a hierarchy (tree) of reducers?
> Effectively leveraging our consistent hashing 
> solution even further we can parallelize reduce phase and elevate our 
> map reduce solution to an industrial level. Here is how we can achieve that.
> 
> Map phase
> 
> MapReduceTask, as it currently does, will hash task input keys and group 
> them by execution node N they are hashed to. For each node N and its 
> grouped input KIn keys MapReduceTask creates a MapCombineCommand which 
> is migrated to an execution target node N. MapCombineCommand is similar 
> to current MapReduceCommand. MapCombineCommand takes an instance of a 
> Mapper and an instance of a Reducer, which is a combiner [1].
Feels like our Reducer is more of an Combiner as described by [1], as it enforce the result of the reduce (KOut) to have the same type as the entries it consumes (KOut).
Don't you think that if we add the Combiner/MapCombineCommand we can loosen up this constraint and allow the Reducer to output something else than KOut?

> 
> Once loaded into target execution node MapCombineCommand takes each 
> local KIn key and executes Mapper method  void map(KIn key, VIn value, 
> Collector<KOut, VOut> collector). Results are collected to a common 
> Collector<KOut, VOut> collector and combine phase is initiated. A 
> Combiner, if specified, takes KOut keys and imediatelly invokes reduce 
> phase on keys. The result of mapping phase executed on each node is 
> <KOut, VOut> map M. There will be one resulting M map per execution node N.
So the combiner has the same output as the Mapper - just that this time it is in the form of an Map<KOut, VOut> instead of a Collector<Kout,Vout>?
Which is okay IMO, as at the end of the day the Combiner is just an extra polishing added to the mapping...
> 
> At the end of combine phase instead of returning map M to the master 
> task node (as we currently do), we now hash each KOut in map M and group 
> KOut keys by the execution node N they are hashed to. Each group of KOut 
> keys and its VOut values, hashed to the same node, is wrapped with a new 
> command Migrate. Command Migrate, which is very similar to 
> PutKeyValueCommand,executed on Infinispan target node N esentially 
> maintains KOut K -> List<VOut> mapping, i.e all KOut/VOut pairs from all 
> executed MapCombineCommands will be collocated on a node N where KOut is 
> hashed to and value for KOut will be a list of all VOut values. We 
> essentially collect all VOut values under each KOut for all executed 
> MapCombineCommands.

This potentially involves each node sending some state to all other nodes, i.e. (clusterSize-1)^2 RPCs. Not that this is necessarily a bad thing, as it is there to reduce contention on a single node.

> At this point MapCombineCommand has finished its execution; list of KOut 
> keys is returned to a master node and its MapReduceTask. We do not 
> return VOut values as we do not need them at master task node. 
> MapReduceTask is ready to start with reduce phase.
> 
If you plan to add this to a design document, I think it would be a good idea to show an example of how the combiner would be used with the word numbering algorithm. You explained this quite well here, but that would make sure we're all on the same page on the role of combiner :-) 

> Reduce phase
> 
> 
> MapReduceTask initializes ReduceCommand with a user specified Reducer. 
> For each key KOut collected from a map phase we group them by execution 
> node N they are hashed to. For each node N and its grouped input KOut 
> keys MapReduceTask creates a ReduceCommand and sends it to a node N 
> where KOut keys are hashed.
the originator (node on which the M/R is run), at this point, doesn't know the entire set of KOut values produces, so it can't determine the list of nodes where to send the ReduceCommand - it would have to send it to all nodes, right?
> Once loaded on target execution node, 
> ReduceCommand for each KOut key grabs list of values VOut and invokes: 
> VOut reduce(KOut reducedKey, Iterator<VOut> iter).
> 
> A result of ReduceCommand is a map M where each key is KOut and value is 
> VOut. Each Infinispan execution node N returns one map M where each key 
> KOut is hashed to N and each VOut is KOut's reduced value.
> 
> When all ReduceCommands return to a calling node, MapReduceTask simply 
> combines all these M maps and returns final Map<KOut, VOut> as a result 
> of MapReduceTask. All intermediate KOut->List<VOut> maps left on 
> Infinispan cluster are then cleaned up.
> 
> 
> [1] See section 4.3 of http://research.google.com/archive/mapreduce.html






More information about the infinispan-dev mailing list