[infinispan-dev] Map Reduce 2.0

Manik Surtani manik at jboss.org
Thu May 31 10:06:24 EDT 2012


Some thoughts:

1. Failover policy.

* Does this rely on being run in a CDI environment?  (RandomNodeTaskFailoverPolicy has a DistributedExecutorService @Injected).  If so, then -1.  While it *should work* in a CDI environment, it shouldn't *require* CDI.

* Do you plan to include any canned failover policies?  If so, what are these?

* Shouldn't  DistributedTaskFailoverPolicy be parameterised to return the same type as DistributedFuture.get() rather than an untyped Object?

2.  Task mapping policy

* So the existing policies of "execute everywhere" and "execute on data owner" (and possibly a new one, "execute on PRIMARY data owner") would all implement this policy interface?

Cheers
Manik

On 1 May 2012, at 19:39, Vladimir Blagojevic wrote:

> Thanks everyone for the feedback. I will maintain the 
> design/requirements document at 
> https://community.jboss.org/wiki/Infinispan60-MapReduceEnhancements
> 
> If anything remains unclear do tell me. Keep the feedback coming.
> 
> Regards,
> Vladimir
> 
> P.S I will also create another document that is related to enhancements 
> to both distributed executors and map reduce - mainly fail-over, 
> topology awareness,  task interruption/cancellation etc
> 
> 
> On 12-02-14 10:19 AM, Vladimir Blagojevic wrote:
>> Hey guys,
>> 
>> Before moving forward with next iteration of map reduce I wanted to hear
>> your thoughts about the following proposal. After we agree on the
>> general direction I will transcribe the agreed design on a wiki page and
>> start implementation.
>> 
>> 
>> Shortcoming of current map reduce implementation
>> 
>> While our current map reduce implementation is more than a proof of a
>> concept there are several drawbacks preventing it from being an
>> industrial grade map reduce solution. The main drawback is the inability
>> of the current solution to deal with a large data (in GB/TB) map reduce
>> problems. This shortcoming is mainly around our reduce phase execution.
>> Reduce phase, as you might know, is currently done on a single
>> Infinispan master task node; reduce phase of map reduce problems we can
>> support (data size wise) is therefore shrunk to a working memory of a
>> single node.
>> 
>> 
>> Proposed solution
>> 
>> The proposed solution involves distributing execution of reduce phase
>> tasks across the cluster thus effectively achieving higher reduce task
>> parallelization and at the same time removing the above mentioned reduce
>> phase restriction. Effectively leveraging our consistent hashing
>> solution even further we can parallelize reduce phase and elevate our
>> map reduce solution to an industrial level. Here is how we can achieve that.
>> 
>> Map phase
>> 
>> MapReduceTask, as it currently does, will hash task input keys and group
>> them by execution node N they are hashed to. For each node N and its
>> grouped input KIn keys MapReduceTask creates a MapCombineCommand which
>> is migrated to an execution target node N. MapCombineCommand is similar
>> to current MapReduceCommand. MapCombineCommand takes an instance of a
>> Mapper and an instance of a Reducer, which is a combiner [1].
>> 
>> Once loaded into target execution node MapCombineCommand takes each
>> local KIn key and executes Mapper method  void map(KIn key, VIn value,
>> Collector<KOut, VOut>  collector). Results are collected to a common
>> Collector<KOut, VOut>  collector and combine phase is initiated. A
>> Combiner, if specified, takes KOut keys and imediatelly invokes reduce
>> phase on keys. The result of mapping phase executed on each node is
>> <KOut, VOut>  map M. There will be one resulting M map per execution node N.
>> 
>> At the end of combine phase instead of returning map M to the master
>> task node (as we currently do), we now hash each KOut in map M and group
>> KOut keys by the execution node N they are hashed to. Each group of KOut
>> keys and its VOut values, hashed to the same node, is wrapped with a new
>> command Migrate. Command Migrate, which is very similar to
>> PutKeyValueCommand,executed on Infinispan target node N esentially
>> maintains KOut K ->  List<VOut>  mapping, i.e all KOut/VOut pairs from all
>> executed MapCombineCommands will be collocated on a node N where KOut is
>> hashed to and value for KOut will be a list of all VOut values. We
>> essentially collect all VOut values under each KOut for all executed
>> MapCombineCommands.
>> 
>> 
>> At this point MapCombineCommand has finished its execution; list of KOut
>> keys is returned to a master node and its MapReduceTask. We do not
>> return VOut values as we do not need them at master task node.
>> MapReduceTask is ready to start with reduce phase.
>> 
>> 
>> Reduce phase
>> 
>> 
>> MapReduceTask initializes ReduceCommand with a user specified Reducer.
>> For each key KOut collected from a map phase we group them by execution
>> node N they are hashed to. For each node N and its grouped input KOut
>> keys MapReduceTask creates a ReduceCommand and sends it to a node N
>> where KOut keys are hashed. Once loaded on target execution node,
>> ReduceCommand for each KOut key grabs list of values VOut and invokes:
>> VOut reduce(KOut reducedKey, Iterator<VOut>  iter).
>> 
>> A result of ReduceCommand is a map M where each key is KOut and value is
>> VOut. Each Infinispan execution node N returns one map M where each key
>> KOut is hashed to N and each VOut is KOut's reduced value.
>> 
>> When all ReduceCommands return to a calling node, MapReduceTask simply
>> combines all these M maps and returns final Map<KOut, VOut>  as a result
>> of MapReduceTask. All intermediate KOut->List<VOut>  maps left on
>> Infinispan cluster are then cleaned up.
>> 
>> 
>> [1] See section 4.3 of http://research.google.com/archive/mapreduce.html
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> 
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Manik Surtani
manik at jboss.org
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org






More information about the infinispan-dev mailing list