Hi Radim,
Since Hadoop is the most popular implementation of MapReduce I will give
a brief overview of how it works and then I'll provide with an example
where the reducers must run over the whole list of values with the same key.
Hadoop MR overview.
MAP
1) Input file(s) are split into pieces of 64MB
2) For each split hadoop creates one map task and then assign the task
to a cluster node
3) The splits are read as key,value pairs and the map function of Mapper
is called. The mapper can output arbitrary number of intermediate
key,value pairs
4) the output from the mapper is stored in a buffer in memory. After a
certain threshold is reached the pairs are sorted by key and if there is
a combiner it is run on the pairs that have the same key. Then, the
output is flushed on the HDFS.
SHUFFLE
hadoop decides the Reducer that should process each key by running a
partitioner. The default partitioner decides with the following way:
reducer = intermidKey.hashCode() % numberOfReducer
Finally, the intermediate key,value pairs are sent to the reducers
REDUCE
1) Reducer sorts all key,value pairs by key and then groups the values
with the same key. As a result reducers receive their keys sorted.
2) for each Key,List<Value> the reduce function of the reducer is
called. Reducer can also emit arbitrary number of key,value pairs
Additionally, hadoop lets you customize almost every aspect of the code
run from how the input is split and read as key value pairs to how it is
partitioned and sorted.
A simple example is group by and computing an average over the grouped
values. Let the dataset be webpages (url,domain,sentiment) and we want
to compute the average sentiment for each domain in the dataset then the
mapper for each webpages wp. will run
map(wp.url,wp):
emit(wp.domain,wp.sentiment)
and in reducer:
reduce(domain,Iterable<Double> values):
counter = 0
sum = 0
while(values.hasNext())
counter++;
sum += values.next()
emit(domain,sum/counter)
I know that this approach is not optimized. But, I wanted give a simple
example.
Dan, only the the values for one intermediate key must be in memory? or
all the intermediate key,value pairs that are assigned to one reducer
must be in memory?
Cheers,
Evangelos
On 02/18/2014 11:59 AM, Dan Berindei wrote:
> Radim, this is how our M/R algorithm works (Hadoop may do it differently):
>
> * The mapping phase generates a Map<IntKey, Collection<IntValue>> on each
> node (Int meaning intermediate).
> * In the combine (local reduce) phase, a combine operation takes as input
> an IntKey and a Collection<IntValue> with only the values that were
> produced on that node.
> * In the (global) reduce phase, all the intermediate values for each key
> are merged, and a reduce operation takes an intermediate key and a sequence
> of *all* the intermediate values generated for that key. These reduce
> operations are completely independent, so each intermediate key can be
> mapped to a different node (distributed reduce), while still having access
> to all the intermediate values at once.
> * In the end, the collator takes the Map<IntKey, IntValue> from the reduce
> phase and produces a single value.
>
> If a combiner can be used, then I believe it can also be run in parallel
> with a LinkedBlockingQueue between the mapper and the combiner. But
> sometimes the reduce algorithm can only be run on the entire collection of
> values (e.g if you want to find the median, or a percentile).
>
> The limitation we have now is that in the reduce phase, the entire list of
> values for one intermediate key must be in memory at once. I think Hadoop
> only loads a block of intermediate values in memory at once, and can even
> sort the intermediate values (with a user-supplied comparison function) so
> that the reduce function can work on a sorted list without loading the
> values in memory itself.
>
> Cheers
> Dan