[infinispan-dev] MapReduce limitations and suggestions.

Radim Vansa rvansa at redhat.com
Tue Feb 18 08:36:01 EST 2014


Thanks a lot for this explanations, guys (Dan and Evangelos), I was 
confused with nomenclature in Hadoop/Infinispan vs. wiki/something I 
learned in the past. I was considering M/R to be

      node1     |    node2 |
---------------|--------------|
  K1,V1 | K2,V2 | K3,V3 | K4,V4|
    |   |   |   |  |   |   |  |
    v   |   v   |   v   |   v |MAP
   Foo  | null  |  Bar  |  Goo |
------------------------------|
    \           |   \       /  | LOCAL
      Foo       |     BarGoo   | REDUCE
       |        |       |      |
------------------------------|
        \              / |GLOBAL
           FooBarGoo           | REDUCE
------------------------------|

But now I understand that the model introduced here is somewhat different.
I have propagated parallel Map-Combine, but I understand that now you're 
trying to solve problem in the reduce phase.

Thanks again

Radim

On 02/18/2014 01:17 PM, Evangelos Vazaios wrote:
> On 02/18/2014 01:40 PM, Dan Berindei wrote:
>> On Tue, Feb 18, 2014 at 12:21 PM, Evangelos Vazaios <vagvaz at gmail.com>wrote:
>>
>>> Hi Radim,
>>>
>>> Since Hadoop is the most popular implementation of MapReduce I will give
>>> a brief overview of how it works and then I'll provide with an example
>>> where the reducers must run over the whole list of values with the same
>>> key.
>>>
>>> Hadoop MR overview.
>>>
>>> MAP
>>>
>>> 1) Input file(s) are split into pieces of 64MB
>>> 2) For each split hadoop creates one map task and then assign the task
>>> to a cluster node
>>> 3) The splits are read as key,value pairs and the map function of Mapper
>>> is called. The mapper can output arbitrary number of intermediate
>>> key,value pairs
>>> 4) the output from the mapper is stored in a buffer in memory. After a
>>> certain threshold is reached the pairs are sorted by key and if there is
>>> a combiner it is run on the pairs that have the same key. Then, the
>>> output is flushed on the HDFS.
>>>
>> Ok, so Hadoop runs the combiner more or less concurrently with the mappers.
>>
>> I'm curious if there are any M/R tasks that benefit from the sorting the
>> keys here, we just put the intermediate values in a Map<IK, Coll<IV>>. We
>> could do about the same by passing this map (or rather each entry in the
>> map) to the combiner when it reaches a certain threshold, but I'm not
>> convinced about the need to sort it.
>>
> Well there are algorithms that make use of it. Implementing a graph
> algorithm can take use of it.Where the graph is split into k partitions
> and each partition is assigned to one Mapper and Reducer. Mappers
> compute the outgoing messages and output them to reducers. Then,
> reducers can read the partition file sequentially to update the
> vertices. This is just one use case that came to my mind.
>>> SHUFFLE
>>>
>>> hadoop decides the Reducer that should process each key by running a
>>> partitioner. The default partitioner decides with the following way:
>>> reducer = intermidKey.hashCode() % numberOfReducer
>>> Finally, the intermediate key,value pairs are sent to the reducers
>>>
>> Is this algorithm set in stone, in that some M/R tasks rely on it? In our
>> impl, the user could use grouping to direct a set of intermediate keys to
>> the same node for reducing, but otherwise the reducing node is more or less
>> random.
>>
> The default partitioner does exactly that check the actual code for
> hadoop 1.2.1 here
> http://goo.gl/he9yHO
>>> REDUCE
>>>
>>> 1) Reducer sorts all key,value pairs by key and then groups the values
>>> with the same key. As a result reducers receive their keys sorted.
>>>
>> I guess this sorting is only relevant if the reduce phase happens on a
>> single thread, on a single node? If the reduce happens in parallel, the
>> ordering is going to be lost anyway.
> Each reduce task is run on a single thread, but you can run more than
> one reduce tasks on a given node. The key ordering will not be lost. The
> values are not ordered in any way. Moreover, the call to the reducer is
> reduce(Key key, Iterable<Value> values) I cannot think of a way that the
> order is lost.
>>
>>> 2) for each Key,List<Value> the reduce function of the reducer is
>>> called. Reducer can also emit arbitrary number of key,value pairs
>>>
>> We limit the reducer (and the combiner) to emit a single value, which is
>> paired with the input key. We may need to lift this restriction, if only to
>> make porting/adapting tasks easier.
>>
>>
>>> Additionally, hadoop lets you customize almost every aspect of the code
>>> run from how the input is split and read as key value pairs to how it is
>>> partitioned and sorted.
>>>
>> Does that mean you can sort the values as well? I was thinking of each
>> reduce() call as independent, and then only the order of values for one
>> intermediate key would be relevant. I guess some tasks may require keeping
>> state across all the reduce() calls and then the order of key matters, but
>> then the reduce phase can't be parallelized, either across the cluster or
>> on a single node.
> I was not very clear here. You can set the partitioner for a specific
> job. You may also set the key comparator, as a result change the way
> that intermediate keys are sorted. Additionally, one can change how keys
> are grouped into one reduce call by setting the GroupComparator class. A
> simple example would be to have sales(date,amount) and you want to
> create  totals for each month of the year.
> so for the key: (year,month) and value: amount.
> by overriding the keyClass hashCode function you can send all the
> intermediate pairs with the same year to the same reducer
>
> and then you can set the groupComparator to group together all the
> values with the same year.
>
> Cheers,
> Evangelos
>
>
>>> A simple example is group by and computing an average over the grouped
>>> values. Let the dataset be webpages (url,domain,sentiment) and we want
>>> to compute the average sentiment for each domain in the dataset then the
>>> mapper for each webpages wp. will run
>>> map(wp.url,wp):
>>>    emit(wp.domain,wp.sentiment)
>>>
>>> and in reducer:
>>> reduce(domain,Iterable<Double> values):
>>>    counter = 0
>>>    sum = 0
>>>    while(values.hasNext())
>>>      counter++;
>>>      sum += values.next()
>>>    emit(domain,sum/counter)
>>>
>>> I know that this approach is not optimized. But, I wanted give a simple
>>> example.
>>>
>> I think it can also be optimized to use a combiner, if we emit a (domain,
>> counter, sum) tuple :)
>>
>>
>>> Dan, only the the values for one intermediate key must be in memory? or
>>> all the intermediate key,value pairs  that are assigned to one reducer
>>> must be in memory?
>>>
>> With the default configuration, all the key/value pairs assigned to one
>> reducer must be in memory. But one can define the __tmpMapReduce cache in
>> the configuration and configure eviction with a cache store (note that
>> because of how our eviction works, the actual container size is at least
>> concurrencyLevel rounded up to the next power of 2). The problem is that
>> there is only one configuration for all the M/R tasks [1].
>>
>> Note that because we only run the combiner after the mapping phase is
>> complete, we do need to keep in memory all the results of the mapping phase
>> from that node (those are not stored in a cache). I've created an issue in
>> JIRA for this [2].
>>
>> Cheers
>> Dan
>>
>> [1] https://issues.jboss.org/browse/ISPN-4021
>> [2] https://issues.jboss.org/browse/ISPN-4022
>>
>>
>>
>>> Cheers,
>>> Evangelos
>>>
>>> On 02/18/2014 11:59 AM, Dan Berindei wrote:
>>>> Radim, this is how our M/R algorithm works (Hadoop may do it
>>> differently):
>>>> * The mapping phase generates a Map<IntKey, Collection<IntValue>> on each
>>>> node (Int meaning intermediate).
>>>> * In the combine (local reduce) phase, a combine operation takes as input
>>>> an IntKey and a Collection<IntValue> with only the values that were
>>>> produced on that node.
>>>> * In the (global) reduce phase, all the intermediate values for each key
>>>> are merged, and a reduce operation takes an intermediate key and a
>>> sequence
>>>> of *all* the intermediate values generated for that key. These reduce
>>>> operations are completely independent, so each intermediate key can be
>>>> mapped to a different node (distributed reduce), while still having
>>> access
>>>> to all the intermediate values at once.
>>>> * In the end, the collator takes the Map<IntKey, IntValue> from the
>>> reduce
>>>> phase and produces a single value.
>>>>
>>>> If a combiner can be used, then I believe it can also be run in parallel
>>>> with a LinkedBlockingQueue between the mapper and the combiner. But
>>>> sometimes the reduce algorithm can only be run on the entire collection
>>> of
>>>> values (e.g if you want to find the median, or a percentile).
>>>>
>>>> The limitation we have now is that in the reduce phase, the entire list
>>> of
>>>> values for one intermediate key must be in memory at once. I think Hadoop
>>>> only loads a block of intermediate values in memory at once, and can even
>>>> sort the intermediate values (with a user-supplied comparison function)
>>> so
>>>> that the reduce function can work on a sorted list without loading the
>>>> values in memory itself.
>>>>
>>>> Cheers
>>>> Dan
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


-- 
Radim Vansa <rvansa at redhat.com>
JBoss DataGrid QA



More information about the infinispan-dev mailing list