[infinispan-dev] MapReduce limitations and suggestions.

Dan Berindei dan.berindei at gmail.com
Tue Feb 18 09:39:04 EST 2014


On Tue, Feb 18, 2014 at 2:17 PM, Evangelos Vazaios <vagvaz at gmail.com> wrote:

> On 02/18/2014 01:40 PM, Dan Berindei wrote:
> > On Tue, Feb 18, 2014 at 12:21 PM, Evangelos Vazaios <vagvaz at gmail.com
> >wrote:
> >
> >> Hi Radim,
> >>
> >> Since Hadoop is the most popular implementation of MapReduce I will give
> >> a brief overview of how it works and then I'll provide with an example
> >> where the reducers must run over the whole list of values with the same
> >> key.
> >>
> >> Hadoop MR overview.
> >>
> >> MAP
> >>
> >> 1) Input file(s) are split into pieces of 64MB
> >> 2) For each split hadoop creates one map task and then assign the task
> >> to a cluster node
> >> 3) The splits are read as key,value pairs and the map function of Mapper
> >> is called. The mapper can output arbitrary number of intermediate
> >> key,value pairs
> >> 4) the output from the mapper is stored in a buffer in memory. After a
> >> certain threshold is reached the pairs are sorted by key and if there is
> >> a combiner it is run on the pairs that have the same key. Then, the
> >> output is flushed on the HDFS.
> >>
> >
> > Ok, so Hadoop runs the combiner more or less concurrently with the
> mappers.
> >
> > I'm curious if there are any M/R tasks that benefit from the sorting the
> > keys here, we just put the intermediate values in a Map<IK, Coll<IV>>. We
> > could do about the same by passing this map (or rather each entry in the
> > map) to the combiner when it reaches a certain threshold, but I'm not
> > convinced about the need to sort it.
> >
> Well there are algorithms that make use of it. Implementing a graph
> algorithm can take use of it.Where the graph is split into k partitions
> and each partition is assigned to one Mapper and Reducer. Mappers
> compute the outgoing messages and output them to reducers. Then,
> reducers can read the partition file sequentially to update the
> vertices. This is just one use case that came to my mind.
>

I thought the partitioning only happens during the shuffle phase, and
mappers/combiners don't know about partitions at all?
I understand that reducers may need the intermediary keys to be sorted, I'm
asking about the combiners, since even if the keys from one block are
sorted, the complete list of keys they receive is not sorted (unless a new
combiner is created for each input block).

>
> >
> >> SHUFFLE
> >>
> >> hadoop decides the Reducer that should process each key by running a
> >> partitioner. The default partitioner decides with the following way:
> >> reducer = intermidKey.hashCode() % numberOfReducer
> >> Finally, the intermediate key,value pairs are sent to the reducers
> >>
> >
> > Is this algorithm set in stone, in that some M/R tasks rely on it? In our
> > impl, the user could use grouping to direct a set of intermediate keys to
> > the same node for reducing, but otherwise the reducing node is more or
> less
> > random.
> >
> The default partitioner does exactly that check the actual code for
> hadoop 1.2.1 here
> http://goo.gl/he9yHO
>

So API documentation doesn't specify it, but users still rely on this
particular behaviour?

BTW, is there always one reducer one each node, or can there be multiple
reducers on each node? If it's the latter, it should be relatively easy to
model this in Infinispan using grouping. If it's the former, I'm not so
sure...


> >
> >> REDUCE
> >>
> >> 1) Reducer sorts all key,value pairs by key and then groups the values
> >> with the same key. As a result reducers receive their keys sorted.
> >>
> >
> > I guess this sorting is only relevant if the reduce phase happens on a
> > single thread, on a single node? If the reduce happens in parallel, the
> > ordering is going to be lost anyway.
> Each reduce task is run on a single thread, but you can run more than
> one reduce tasks on a given node. The key ordering will not be lost. The
> values are not ordered in any way. Moreover, the call to the reducer is
> reduce(Key key, Iterable<Value> values) I cannot think of a way that the
> order is lost.
> >
>

Right, the call to the reducer is with a single key, but I'm assuming the
order of the calls matters (e.g. because the reduces keeps some internal
state across reduce() calls), otherwise there's no point in sorting the
keys. Calling the same reducer from multiple threads (like we do) would
definitely mess up the order of the calls.

ATM we only have one reducer per node, which can be called from multiple
threads, but it shouldn't be too hard to allow multiple reducers per node
and to run each of them in a single thread.


> >
> >> 2) for each Key,List<Value> the reduce function of the reducer is
> >> called. Reducer can also emit arbitrary number of key,value pairs
> >>
> >
> > We limit the reducer (and the combiner) to emit a single value, which is
> > paired with the input key. We may need to lift this restriction, if only
> to
> > make porting/adapting tasks easier.
> >
> >
> >>
> >> Additionally, hadoop lets you customize almost every aspect of the code
> >> run from how the input is split and read as key value pairs to how it is
> >> partitioned and sorted.
> >>
> >
> > Does that mean you can sort the values as well? I was thinking of each
> > reduce() call as independent, and then only the order of values for one
> > intermediate key would be relevant. I guess some tasks may require
> keeping
> > state across all the reduce() calls and then the order of key matters,
> but
> > then the reduce phase can't be parallelized, either across the cluster or
> > on a single node.
>
> I was not very clear here. You can set the partitioner for a specific
> job. You may also set the key comparator, as a result change the way
> that intermediate keys are sorted. Additionally, one can change how keys
> are grouped into one reduce call by setting the GroupComparator class. A
> simple example would be to have sales(date,amount) and you want to
> create  totals for each month of the year.
> so for the key: (year,month) and value: amount.
> by overriding the keyClass hashCode function you can send all the
> intermediate pairs with the same year to the same reducer
>
> and then you can set the groupComparator to group together all the
> values with the same year.
>

You mean set the groupComparator to group together all the values with the
same month? I don't think so, because the key is already (year, month). But
if you wanted to collect the totals for each year you could just use the
year as the intermediary key. So I don't quite understand how your example
is supposed to work.

Besides, each reduce() call receives just one key, if you have keys (2013,
1) and (2013, 2) and the groupComparator decides they should map to the
same group, which key does the reducer see? I think a regular equals()
should be good enough for us here, since we already need equals() in order
to put the intermediary keys in the intermediary cache.

Cheers
Dan



>
> Cheers,
> Evangelos
>
>
> >
> >> A simple example is group by and computing an average over the grouped
> >> values. Let the dataset be webpages (url,domain,sentiment) and we want
> >> to compute the average sentiment for each domain in the dataset then the
> >> mapper for each webpages wp. will run
> >> map(wp.url,wp):
> >>   emit(wp.domain,wp.sentiment)
> >>
> >> and in reducer:
> >> reduce(domain,Iterable<Double> values):
> >>   counter = 0
> >>   sum = 0
> >>   while(values.hasNext())
> >>     counter++;
> >>     sum += values.next()
> >>   emit(domain,sum/counter)
> >>
> >> I know that this approach is not optimized. But, I wanted give a simple
> >> example.
> >>
> >
> > I think it can also be optimized to use a combiner, if we emit a (domain,
> > counter, sum) tuple :)
>
> >
> >
> >
> >> Dan, only the the values for one intermediate key must be in memory? or
> >> all the intermediate key,value pairs  that are assigned to one reducer
> >> must be in memory?
> >>
> >
> > With the default configuration, all the key/value pairs assigned to one
> > reducer must be in memory. But one can define the __tmpMapReduce cache in
> > the configuration and configure eviction with a cache store (note that
> > because of how our eviction works, the actual container size is at least
> > concurrencyLevel rounded up to the next power of 2). The problem is that
> > there is only one configuration for all the M/R tasks [1].
> >
> > Note that because we only run the combiner after the mapping phase is
> > complete, we do need to keep in memory all the results of the mapping
> phase
> > from that node (those are not stored in a cache). I've created an issue
> in
> > JIRA for this [2].
> >
> > Cheers
> > Dan
> >
> > [1] https://issues.jboss.org/browse/ISPN-4021
> > [2] https://issues.jboss.org/browse/ISPN-4022
> >
> >
> >
> >> Cheers,
> >> Evangelos
> >>
> >> On 02/18/2014 11:59 AM, Dan Berindei wrote:
> >>> Radim, this is how our M/R algorithm works (Hadoop may do it
> >> differently):
> >>>
> >>> * The mapping phase generates a Map<IntKey, Collection<IntValue>> on
> each
> >>> node (Int meaning intermediate).
> >>> * In the combine (local reduce) phase, a combine operation takes as
> input
> >>> an IntKey and a Collection<IntValue> with only the values that were
> >>> produced on that node.
> >>> * In the (global) reduce phase, all the intermediate values for each
> key
> >>> are merged, and a reduce operation takes an intermediate key and a
> >> sequence
> >>> of *all* the intermediate values generated for that key. These reduce
> >>> operations are completely independent, so each intermediate key can be
> >>> mapped to a different node (distributed reduce), while still having
> >> access
> >>> to all the intermediate values at once.
> >>> * In the end, the collator takes the Map<IntKey, IntValue> from the
> >> reduce
> >>> phase and produces a single value.
> >>>
> >>> If a combiner can be used, then I believe it can also be run in
> parallel
> >>> with a LinkedBlockingQueue between the mapper and the combiner. But
> >>> sometimes the reduce algorithm can only be run on the entire collection
> >> of
> >>> values (e.g if you want to find the median, or a percentile).
> >>>
> >>> The limitation we have now is that in the reduce phase, the entire list
> >> of
> >>> values for one intermediate key must be in memory at once. I think
> Hadoop
> >>> only loads a block of intermediate values in memory at once, and can
> even
> >>> sort the intermediate values (with a user-supplied comparison function)
> >> so
> >>> that the reduce function can work on a sorted list without loading the
> >>> values in memory itself.
> >>>
> >>> Cheers
> >>> Dan
> >>
> >
> >
> >
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev at lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20140218/a1c60195/attachment-0001.html 


More information about the infinispan-dev mailing list