[infinispan-dev] Map Reduce 2.0

Manik Surtani manik at jboss.org
Mon Feb 20 13:43:18 EST 2012


On 14 Feb 2012, at 15:19, Vladimir Blagojevic wrote:

> Hey guys,
> 
> Before moving forward with next iteration of map reduce I wanted to hear 
> your thoughts about the following proposal. After we agree on the 
> general direction I will transcribe the agreed design on a wiki page and 
> start implementation.
> 
> 
> Shortcoming of current map reduce implementation
> 
> While our current map reduce implementation is more than a proof of a 
> concept there are several drawbacks preventing it from being an 
> industrial grade map reduce solution. The main drawback is the inability 
> of the current solution to deal with a large data (in GB/TB) map reduce 
> problems. This shortcoming is mainly around our reduce phase execution. 
> Reduce phase, as you might know, is currently done on a single 
> Infinispan master task node; reduce phase of map reduce problems we can 
> support (data size wise) is therefore shrunk to a working memory of a 
> single node.

I was under the impression reduce is distributed too?  Don't we do the mapping on each node, then a first-pass reduce on each node too, before streaming results back to the caller node?

> 
> 
> Proposed solution
> 
> The proposed solution involves distributing execution of reduce phase 
> tasks across the cluster thus effectively achieving higher reduce task 
> parallelization and at the same time removing the above mentioned reduce 
> phase restriction. Effectively leveraging our consistent hashing 
> solution even further we can parallelize reduce phase and elevate our 
> map reduce solution to an industrial level. Here is how we can achieve that.
> 
> Map phase
> 
> MapReduceTask, as it currently does, will hash task input keys and group 
> them by execution node N they are hashed to. For each node N and its 
> grouped input KIn keys MapReduceTask creates a MapCombineCommand which 
> is migrated to an execution target node N. MapCombineCommand is similar 
> to current MapReduceCommand. MapCombineCommand takes an instance of a 
> Mapper and an instance of a Reducer, which is a combiner [1].
> 
> Once loaded into target execution node MapCombineCommand takes each 
> local KIn key and executes Mapper method  void map(KIn key, VIn value, 
> Collector<KOut, VOut> collector). Results are collected to a common 
> Collector<KOut, VOut> collector and combine phase is initiated. A 
> Combiner, if specified, takes KOut keys and imediatelly invokes reduce 
> phase on keys. The result of mapping phase executed on each node is 
> <KOut, VOut> map M. There will be one resulting M map per execution node N.
> 
> At the end of combine phase instead of returning map M to the master 
> task node (as we currently do), we now hash each KOut in map M and group 
> KOut keys by the execution node N they are hashed to. Each group of KOut 
> keys and its VOut values, hashed to the same node, is wrapped with a new 
> command Migrate. Command Migrate, which is very similar to 
> PutKeyValueCommand,executed on Infinispan target node N esentially 
> maintains KOut K -> List<VOut> mapping, i.e all KOut/VOut pairs from all 
> executed MapCombineCommands will be collocated on a node N where KOut is 
> hashed to and value for KOut will be a list of all VOut values. We 
> essentially collect all VOut values under each KOut for all executed 
> MapCombineCommands.
> 
> 
> At this point MapCombineCommand has finished its execution; list of KOut 
> keys is returned to a master node and its MapReduceTask. We do not 
> return VOut values as we do not need them at master task node. 
> MapReduceTask is ready to start with reduce phase.
> 
> 
> Reduce phase
> 
> 
> MapReduceTask initializes ReduceCommand with a user specified Reducer. 
> For each key KOut collected from a map phase we group them by execution 
> node N they are hashed to. For each node N and its grouped input KOut 
> keys MapReduceTask creates a ReduceCommand and sends it to a node N 
> where KOut keys are hashed. Once loaded on target execution node, 
> ReduceCommand for each KOut key grabs list of values VOut and invokes: 
> VOut reduce(KOut reducedKey, Iterator<VOut> iter).
> 
> A result of ReduceCommand is a map M where each key is KOut and value is 
> VOut. Each Infinispan execution node N returns one map M where each key 
> KOut is hashed to N and each VOut is KOut's reduced value.
> 
> When all ReduceCommands return to a calling node, MapReduceTask simply 
> combines all these M maps and returns final Map<KOut, VOut> as a result 
> of MapReduceTask. All intermediate KOut->List<VOut> maps left on 
> Infinispan cluster are then cleaned up.

This all makes sense as well, however one problem with the consistent hash approach is that it is prone to change when there is a topology change.  How would you deal with that?  Would you maintain a history of consistent hashes?

Cheers
Manik


--
Manik Surtani
manik at jboss.org
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org






More information about the infinispan-dev mailing list