[infinispan-dev] MapReduce limitations and suggestions.

Dan Berindei dan.berindei at gmail.com
Tue Feb 18 06:40:49 EST 2014

On Tue, Feb 18, 2014 at 12:21 PM, Evangelos Vazaios <vagvaz at gmail.com>wrote:

> Hi Radim,
> Since Hadoop is the most popular implementation of MapReduce I will give
> a brief overview of how it works and then I'll provide with an example
> where the reducers must run over the whole list of values with the same
> key.
> Hadoop MR overview.
> 1) Input file(s) are split into pieces of 64MB
> 2) For each split hadoop creates one map task and then assign the task
> to a cluster node
> 3) The splits are read as key,value pairs and the map function of Mapper
> is called. The mapper can output arbitrary number of intermediate
> key,value pairs
> 4) the output from the mapper is stored in a buffer in memory. After a
> certain threshold is reached the pairs are sorted by key and if there is
> a combiner it is run on the pairs that have the same key. Then, the
> output is flushed on the HDFS.

Ok, so Hadoop runs the combiner more or less concurrently with the mappers.

I'm curious if there are any M/R tasks that benefit from the sorting the
keys here, we just put the intermediate values in a Map<IK, Coll<IV>>. We
could do about the same by passing this map (or rather each entry in the
map) to the combiner when it reaches a certain threshold, but I'm not
convinced about the need to sort it.

> hadoop decides the Reducer that should process each key by running a
> partitioner. The default partitioner decides with the following way:
> reducer = intermidKey.hashCode() % numberOfReducer
> Finally, the intermediate key,value pairs are sent to the reducers

Is this algorithm set in stone, in that some M/R tasks rely on it? In our
impl, the user could use grouping to direct a set of intermediate keys to
the same node for reducing, but otherwise the reducing node is more or less

> 1) Reducer sorts all key,value pairs by key and then groups the values
> with the same key. As a result reducers receive their keys sorted.

I guess this sorting is only relevant if the reduce phase happens on a
single thread, on a single node? If the reduce happens in parallel, the
ordering is going to be lost anyway.

> 2) for each Key,List<Value> the reduce function of the reducer is
> called. Reducer can also emit arbitrary number of key,value pairs

We limit the reducer (and the combiner) to emit a single value, which is
paired with the input key. We may need to lift this restriction, if only to
make porting/adapting tasks easier.

> Additionally, hadoop lets you customize almost every aspect of the code
> run from how the input is split and read as key value pairs to how it is
> partitioned and sorted.

Does that mean you can sort the values as well? I was thinking of each
reduce() call as independent, and then only the order of values for one
intermediate key would be relevant. I guess some tasks may require keeping
state across all the reduce() calls and then the order of key matters, but
then the reduce phase can't be parallelized, either across the cluster or
on a single node.

> A simple example is group by and computing an average over the grouped
> values. Let the dataset be webpages (url,domain,sentiment) and we want
> to compute the average sentiment for each domain in the dataset then the
> mapper for each webpages wp. will run
> map(wp.url,wp):
>   emit(wp.domain,wp.sentiment)
> and in reducer:
> reduce(domain,Iterable<Double> values):
>   counter = 0
>   sum = 0
>   while(values.hasNext())
>     counter++;
>     sum += values.next()
>   emit(domain,sum/counter)
> I know that this approach is not optimized. But, I wanted give a simple
> example.

I think it can also be optimized to use a combiner, if we emit a (domain,
counter, sum) tuple :)

> Dan, only the the values for one intermediate key must be in memory? or
> all the intermediate key,value pairs  that are assigned to one reducer
> must be in memory?

With the default configuration, all the key/value pairs assigned to one
reducer must be in memory. But one can define the __tmpMapReduce cache in
the configuration and configure eviction with a cache store (note that
because of how our eviction works, the actual container size is at least
concurrencyLevel rounded up to the next power of 2). The problem is that
there is only one configuration for all the M/R tasks [1].

Note that because we only run the combiner after the mapping phase is
complete, we do need to keep in memory all the results of the mapping phase
from that node (those are not stored in a cache). I've created an issue in
JIRA for this [2].


[1] https://issues.jboss.org/browse/ISPN-4021
[2] https://issues.jboss.org/browse/ISPN-4022

> Cheers,
> Evangelos
> On 02/18/2014 11:59 AM, Dan Berindei wrote:
> > Radim, this is how our M/R algorithm works (Hadoop may do it
> differently):
> >
> > * The mapping phase generates a Map<IntKey, Collection<IntValue>> on each
> > node (Int meaning intermediate).
> > * In the combine (local reduce) phase, a combine operation takes as input
> > an IntKey and a Collection<IntValue> with only the values that were
> > produced on that node.
> > * In the (global) reduce phase, all the intermediate values for each key
> > are merged, and a reduce operation takes an intermediate key and a
> sequence
> > of *all* the intermediate values generated for that key. These reduce
> > operations are completely independent, so each intermediate key can be
> > mapped to a different node (distributed reduce), while still having
> access
> > to all the intermediate values at once.
> > * In the end, the collator takes the Map<IntKey, IntValue> from the
> reduce
> > phase and produces a single value.
> >
> > If a combiner can be used, then I believe it can also be run in parallel
> > with a LinkedBlockingQueue between the mapper and the combiner. But
> > sometimes the reduce algorithm can only be run on the entire collection
> of
> > values (e.g if you want to find the median, or a percentile).
> >
> > The limitation we have now is that in the reduce phase, the entire list
> of
> > values for one intermediate key must be in memory at once. I think Hadoop
> > only loads a block of intermediate values in memory at once, and can even
> > sort the intermediate values (with a user-supplied comparison function)
> so
> > that the reduce function can work on a sorted list without loading the
> > values in memory itself.
> >
> > Cheers
> > Dan
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20140218/a2543df8/attachment-0001.html 

More information about the infinispan-dev mailing list