[infinispan-dev] Map Reduce 2.0

Vladimir Blagojevic vblagoje at redhat.com
Tue May 1 14:39:29 EDT 2012


Thanks everyone for the feedback. I will maintain the 
design/requirements document at 
https://community.jboss.org/wiki/Infinispan60-MapReduceEnhancements

If anything remains unclear do tell me. Keep the feedback coming.

Regards,
Vladimir

P.S I will also create another document that is related to enhancements 
to both distributed executors and map reduce - mainly fail-over, 
topology awareness,  task interruption/cancellation etc


On 12-02-14 10:19 AM, Vladimir Blagojevic wrote:
> Hey guys,
>
> Before moving forward with next iteration of map reduce I wanted to hear
> your thoughts about the following proposal. After we agree on the
> general direction I will transcribe the agreed design on a wiki page and
> start implementation.
>
>
> Shortcoming of current map reduce implementation
>
> While our current map reduce implementation is more than a proof of a
> concept there are several drawbacks preventing it from being an
> industrial grade map reduce solution. The main drawback is the inability
> of the current solution to deal with a large data (in GB/TB) map reduce
> problems. This shortcoming is mainly around our reduce phase execution.
> Reduce phase, as you might know, is currently done on a single
> Infinispan master task node; reduce phase of map reduce problems we can
> support (data size wise) is therefore shrunk to a working memory of a
> single node.
>
>
> Proposed solution
>
> The proposed solution involves distributing execution of reduce phase
> tasks across the cluster thus effectively achieving higher reduce task
> parallelization and at the same time removing the above mentioned reduce
> phase restriction. Effectively leveraging our consistent hashing
> solution even further we can parallelize reduce phase and elevate our
> map reduce solution to an industrial level. Here is how we can achieve that.
>
> Map phase
>
> MapReduceTask, as it currently does, will hash task input keys and group
> them by execution node N they are hashed to. For each node N and its
> grouped input KIn keys MapReduceTask creates a MapCombineCommand which
> is migrated to an execution target node N. MapCombineCommand is similar
> to current MapReduceCommand. MapCombineCommand takes an instance of a
> Mapper and an instance of a Reducer, which is a combiner [1].
>
> Once loaded into target execution node MapCombineCommand takes each
> local KIn key and executes Mapper method  void map(KIn key, VIn value,
> Collector<KOut, VOut>  collector). Results are collected to a common
> Collector<KOut, VOut>  collector and combine phase is initiated. A
> Combiner, if specified, takes KOut keys and imediatelly invokes reduce
> phase on keys. The result of mapping phase executed on each node is
> <KOut, VOut>  map M. There will be one resulting M map per execution node N.
>
> At the end of combine phase instead of returning map M to the master
> task node (as we currently do), we now hash each KOut in map M and group
> KOut keys by the execution node N they are hashed to. Each group of KOut
> keys and its VOut values, hashed to the same node, is wrapped with a new
> command Migrate. Command Migrate, which is very similar to
> PutKeyValueCommand,executed on Infinispan target node N esentially
> maintains KOut K ->  List<VOut>  mapping, i.e all KOut/VOut pairs from all
> executed MapCombineCommands will be collocated on a node N where KOut is
> hashed to and value for KOut will be a list of all VOut values. We
> essentially collect all VOut values under each KOut for all executed
> MapCombineCommands.
>
>
> At this point MapCombineCommand has finished its execution; list of KOut
> keys is returned to a master node and its MapReduceTask. We do not
> return VOut values as we do not need them at master task node.
> MapReduceTask is ready to start with reduce phase.
>
>
> Reduce phase
>
>
> MapReduceTask initializes ReduceCommand with a user specified Reducer.
> For each key KOut collected from a map phase we group them by execution
> node N they are hashed to. For each node N and its grouped input KOut
> keys MapReduceTask creates a ReduceCommand and sends it to a node N
> where KOut keys are hashed. Once loaded on target execution node,
> ReduceCommand for each KOut key grabs list of values VOut and invokes:
> VOut reduce(KOut reducedKey, Iterator<VOut>  iter).
>
> A result of ReduceCommand is a map M where each key is KOut and value is
> VOut. Each Infinispan execution node N returns one map M where each key
> KOut is hashed to N and each VOut is KOut's reduced value.
>
> When all ReduceCommands return to a calling node, MapReduceTask simply
> combines all these M maps and returns final Map<KOut, VOut>  as a result
> of MapReduceTask. All intermediate KOut->List<VOut>  maps left on
> Infinispan cluster are then cleaned up.
>
>
> [1] See section 4.3 of http://research.google.com/archive/mapreduce.html
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev



More information about the infinispan-dev mailing list