[infinispan-dev] MapReduce limitations and suggestions.

Evangelos Vazaios vagvaz at gmail.com
Tue Feb 18 07:17:34 EST 2014


On 02/18/2014 01:40 PM, Dan Berindei wrote:
> On Tue, Feb 18, 2014 at 12:21 PM, Evangelos Vazaios <vagvaz at gmail.com>wrote:
> 
>> Hi Radim,
>>
>> Since Hadoop is the most popular implementation of MapReduce I will give
>> a brief overview of how it works and then I'll provide with an example
>> where the reducers must run over the whole list of values with the same
>> key.
>>
>> Hadoop MR overview.
>>
>> MAP
>>
>> 1) Input file(s) are split into pieces of 64MB
>> 2) For each split hadoop creates one map task and then assign the task
>> to a cluster node
>> 3) The splits are read as key,value pairs and the map function of Mapper
>> is called. The mapper can output arbitrary number of intermediate
>> key,value pairs
>> 4) the output from the mapper is stored in a buffer in memory. After a
>> certain threshold is reached the pairs are sorted by key and if there is
>> a combiner it is run on the pairs that have the same key. Then, the
>> output is flushed on the HDFS.
>>
> 
> Ok, so Hadoop runs the combiner more or less concurrently with the mappers.
> 
> I'm curious if there are any M/R tasks that benefit from the sorting the
> keys here, we just put the intermediate values in a Map<IK, Coll<IV>>. We
> could do about the same by passing this map (or rather each entry in the
> map) to the combiner when it reaches a certain threshold, but I'm not
> convinced about the need to sort it.
> 
Well there are algorithms that make use of it. Implementing a graph
algorithm can take use of it.Where the graph is split into k partitions
and each partition is assigned to one Mapper and Reducer. Mappers
compute the outgoing messages and output them to reducers. Then,
reducers can read the partition file sequentially to update the
vertices. This is just one use case that came to my mind.
> 
>> SHUFFLE
>>
>> hadoop decides the Reducer that should process each key by running a
>> partitioner. The default partitioner decides with the following way:
>> reducer = intermidKey.hashCode() % numberOfReducer
>> Finally, the intermediate key,value pairs are sent to the reducers
>>
> 
> Is this algorithm set in stone, in that some M/R tasks rely on it? In our
> impl, the user could use grouping to direct a set of intermediate keys to
> the same node for reducing, but otherwise the reducing node is more or less
> random.
> 
The default partitioner does exactly that check the actual code for
hadoop 1.2.1 here
http://goo.gl/he9yHO
> 
>> REDUCE
>>
>> 1) Reducer sorts all key,value pairs by key and then groups the values
>> with the same key. As a result reducers receive their keys sorted.
>>
> 
> I guess this sorting is only relevant if the reduce phase happens on a
> single thread, on a single node? If the reduce happens in parallel, the
> ordering is going to be lost anyway.
Each reduce task is run on a single thread, but you can run more than
one reduce tasks on a given node. The key ordering will not be lost. The
values are not ordered in any way. Moreover, the call to the reducer is
reduce(Key key, Iterable<Value> values) I cannot think of a way that the
order is lost.
> 
> 
>> 2) for each Key,List<Value> the reduce function of the reducer is
>> called. Reducer can also emit arbitrary number of key,value pairs
>>
> 
> We limit the reducer (and the combiner) to emit a single value, which is
> paired with the input key. We may need to lift this restriction, if only to
> make porting/adapting tasks easier.
> 
> 
>>
>> Additionally, hadoop lets you customize almost every aspect of the code
>> run from how the input is split and read as key value pairs to how it is
>> partitioned and sorted.
>>
> 
> Does that mean you can sort the values as well? I was thinking of each
> reduce() call as independent, and then only the order of values for one
> intermediate key would be relevant. I guess some tasks may require keeping
> state across all the reduce() calls and then the order of key matters, but
> then the reduce phase can't be parallelized, either across the cluster or
> on a single node.

I was not very clear here. You can set the partitioner for a specific
job. You may also set the key comparator, as a result change the way
that intermediate keys are sorted. Additionally, one can change how keys
are grouped into one reduce call by setting the GroupComparator class. A
simple example would be to have sales(date,amount) and you want to
create  totals for each month of the year.
so for the key: (year,month) and value: amount.
by overriding the keyClass hashCode function you can send all the
intermediate pairs with the same year to the same reducer

and then you can set the groupComparator to group together all the
values with the same year.

Cheers,
Evangelos


> 
>> A simple example is group by and computing an average over the grouped
>> values. Let the dataset be webpages (url,domain,sentiment) and we want
>> to compute the average sentiment for each domain in the dataset then the
>> mapper for each webpages wp. will run
>> map(wp.url,wp):
>>   emit(wp.domain,wp.sentiment)
>>
>> and in reducer:
>> reduce(domain,Iterable<Double> values):
>>   counter = 0
>>   sum = 0
>>   while(values.hasNext())
>>     counter++;
>>     sum += values.next()
>>   emit(domain,sum/counter)
>>
>> I know that this approach is not optimized. But, I wanted give a simple
>> example.
>>
> 
> I think it can also be optimized to use a combiner, if we emit a (domain,
> counter, sum) tuple :)

> 
> 
> 
>> Dan, only the the values for one intermediate key must be in memory? or
>> all the intermediate key,value pairs  that are assigned to one reducer
>> must be in memory?
>>
> 
> With the default configuration, all the key/value pairs assigned to one
> reducer must be in memory. But one can define the __tmpMapReduce cache in
> the configuration and configure eviction with a cache store (note that
> because of how our eviction works, the actual container size is at least
> concurrencyLevel rounded up to the next power of 2). The problem is that
> there is only one configuration for all the M/R tasks [1].
> 
> Note that because we only run the combiner after the mapping phase is
> complete, we do need to keep in memory all the results of the mapping phase
> from that node (those are not stored in a cache). I've created an issue in
> JIRA for this [2].
> 
> Cheers
> Dan
> 
> [1] https://issues.jboss.org/browse/ISPN-4021
> [2] https://issues.jboss.org/browse/ISPN-4022
> 
> 
> 
>> Cheers,
>> Evangelos
>>
>> On 02/18/2014 11:59 AM, Dan Berindei wrote:
>>> Radim, this is how our M/R algorithm works (Hadoop may do it
>> differently):
>>>
>>> * The mapping phase generates a Map<IntKey, Collection<IntValue>> on each
>>> node (Int meaning intermediate).
>>> * In the combine (local reduce) phase, a combine operation takes as input
>>> an IntKey and a Collection<IntValue> with only the values that were
>>> produced on that node.
>>> * In the (global) reduce phase, all the intermediate values for each key
>>> are merged, and a reduce operation takes an intermediate key and a
>> sequence
>>> of *all* the intermediate values generated for that key. These reduce
>>> operations are completely independent, so each intermediate key can be
>>> mapped to a different node (distributed reduce), while still having
>> access
>>> to all the intermediate values at once.
>>> * In the end, the collator takes the Map<IntKey, IntValue> from the
>> reduce
>>> phase and produces a single value.
>>>
>>> If a combiner can be used, then I believe it can also be run in parallel
>>> with a LinkedBlockingQueue between the mapper and the combiner. But
>>> sometimes the reduce algorithm can only be run on the entire collection
>> of
>>> values (e.g if you want to find the median, or a percentile).
>>>
>>> The limitation we have now is that in the reduce phase, the entire list
>> of
>>> values for one intermediate key must be in memory at once. I think Hadoop
>>> only loads a block of intermediate values in memory at once, and can even
>>> sort the intermediate values (with a user-supplied comparison function)
>> so
>>> that the reduce function can work on a sorted list without loading the
>>> values in memory itself.
>>>
>>> Cheers
>>> Dan
>>
> 
> 
> 
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> 



More information about the infinispan-dev mailing list