[infinispan-dev] MapReduce limitations and suggestions.

Dan Berindei dan.berindei at gmail.com
Wed Feb 19 08:22:23 EST 2014


On Tue, Feb 18, 2014 at 5:33 PM, Evangelos Vazaios <vagvaz at gmail.com> wrote:

>
> On 02/18/2014 04:39 PM, Dan Berindei wrote:
> > On Tue, Feb 18, 2014 at 2:17 PM, Evangelos Vazaios <vagvaz at gmail.com>
> wrote:
> >
> >> On 02/18/2014 01:40 PM, Dan Berindei wrote:
> >>> On Tue, Feb 18, 2014 at 12:21 PM, Evangelos Vazaios <vagvaz at gmail.com
> >>> wrote:
> >>>
> >>>> Hi Radim,
> >>>>
> >>>> Since Hadoop is the most popular implementation of MapReduce I will
> give
> >>>> a brief overview of how it works and then I'll provide with an example
> >>>> where the reducers must run over the whole list of values with the
> same
> >>>> key.
> >>>>
> >>>> Hadoop MR overview.
> >>>>
> >>>> MAP
> >>>>
> >>>> 1) Input file(s) are split into pieces of 64MB
> >>>> 2) For each split hadoop creates one map task and then assign the task
> >>>> to a cluster node
> >>>> 3) The splits are read as key,value pairs and the map function of
> Mapper
> >>>> is called. The mapper can output arbitrary number of intermediate
> >>>> key,value pairs
>

I forgot to ask about this... we already have the entries stored as
key,value pairs, so we expect the data to be already in the cache. That
means there is no ordering in the inputs, and the mapper can't rely on
sequential inputs to be related. Would you consider that to be a reasonable
expectation?


> >>>> 4) the output from the mapper is stored in a buffer in memory. After a
> >>>> certain threshold is reached the pairs are sorted by key and if there
> is
> >>>> a combiner it is run on the pairs that have the same key. Then, the
> >>>> output is flushed on the HDFS.
> >>>>
> >>>
> >>> Ok, so Hadoop runs the combiner more or less concurrently with the
> >> mappers.
> >>>
> >>> I'm curious if there are any M/R tasks that benefit from the sorting
> the
> >>> keys here, we just put the intermediate values in a Map<IK, Coll<IV>>.
> We
> >>> could do about the same by passing this map (or rather each entry in
> the
> >>> map) to the combiner when it reaches a certain threshold, but I'm not
> >>> convinced about the need to sort it.
> >>>
> >> Well there are algorithms that make use of it. Implementing a graph
> >> algorithm can take use of it.Where the graph is split into k partitions
> >> and each partition is assigned to one Mapper and Reducer. Mappers
> >> compute the outgoing messages and output them to reducers. Then,
> >> reducers can read the partition file sequentially to update the
> >> vertices. This is just one use case that came to my mind.
> >>
> >
> > I thought the partitioning only happens during the shuffle phase, and
> > mappers/combiners don't know about partitions at all?
> > I understand that reducers may need the intermediary keys to be sorted,
> I'm
> > asking about the combiners, since even if the keys from one block are
> > sorted, the complete list of keys they receive is not sorted (unless a
> new
> > combiner is created for each input block).
> You are absolutely right partitioning happens during the shuffle phase
> and mappers/combiners do not know about partitions. Did I say something
> different?
> >
>

My initial question was whether there is a real need to sort the keys
before calling the combiner. So when you presented the example with the
graph being split in k partitions, I got a bit confused and I thought
combiners might know about partitions, too.


>  >>
> >>>
> >>>> SHUFFLE
> >>>>
> >>>> hadoop decides the Reducer that should process each key by running a
> >>>> partitioner. The default partitioner decides with the following way:
> >>>> reducer = intermidKey.hashCode() % numberOfReducer
> >>>> Finally, the intermediate key,value pairs are sent to the reducers
> >>>>
> >>>
> >>> Is this algorithm set in stone, in that some M/R tasks rely on it? In
> our
> >>> impl, the user could use grouping to direct a set of intermediate keys
> to
> >>> the same node for reducing, but otherwise the reducing node is more or
> >> less
> >>> random.
> >>>
> >> The default partitioner does exactly that check the actual code for
> >> hadoop 1.2.1 here
> >> http://goo.gl/he9yHO
> >>
> >
> > So API documentation doesn't specify it, but users still rely on this
> > particular behaviour?
> >
> > BTW, is there always one reducer one each node, or can there be multiple
> > reducers on each node? If it's the latter, it should be relatively easy
> to
> > model this in Infinispan using grouping. If it's the former, I'm not so
> > sure...
> >
> Actually, the configuration of the MapReduce job (MapReduce task in
> infinispan) defines the number of reducers and is programmatically
> configurable. The short answer to your answer is the latter multiple
> Reduce tasks are assigned to nodes almost equally.
>

Ok, partitioning sounds like something we could do in Infinispan.
Partitioning seems like a pretty big deal in Hadoop M/R descriptions, so
implementing it should be quite useful.

>
> >>>
> >>>> REDUCE
> >>>>
> >>>> 1) Reducer sorts all key,value pairs by key and then groups the values
> >>>> with the same key. As a result reducers receive their keys sorted.
> >>>>
> >>>
> >>> I guess this sorting is only relevant if the reduce phase happens on a
> >>> single thread, on a single node? If the reduce happens in parallel, the
> >>> ordering is going to be lost anyway.
> >> Each reduce task is run on a single thread, but you can run more than
> >> one reduce tasks on a given node. The key ordering will not be lost. The
> >> values are not ordered in any way. Moreover, the call to the reducer is
> >> reduce(Key key, Iterable<Value> values) I cannot think of a way that the
> >> order is lost.
> >>>
> >>
> >
> > Right, the call to the reducer is with a single key, but I'm assuming the
> > order of the calls matters (e.g. because the reduces keeps some internal
> > state across reduce() calls), otherwise there's no point in sorting the
> > keys. Calling the same reducer from multiple threads (like we do) would
> > definitely mess up the order of the calls.
> >
> > ATM we only have one reducer per node, which can be called from multiple
> > threads, but it shouldn't be too hard to allow multiple reducers per node
> > and to run each of them in a single thread.
> >
> I belive the sorting is done in order to group the values with same key
> since there are large data stored on files the easiest way to group is
> to sort and then group values with the same keys.
>

Yeah, I realized that my idea of keeping state between reduce() calls is
kind of tricky to use, because you'd have to insert a sentinel value in
each partition, and make sure that after the sorting the sentinel value
will come last, in order to flush the final results to the output. I see
Hadoop does offer some stuff to keep global state, like counters, so
perhaps it's not even necessary.


> >
> >>>
> >>>> 2) for each Key,List<Value> the reduce function of the reducer is
> >>>> called. Reducer can also emit arbitrary number of key,value pairs
> >>>>
> >>>
> >>> We limit the reducer (and the combiner) to emit a single value, which
> is
> >>> paired with the input key. We may need to lift this restriction, if
> only
> >> to
> >>> make porting/adapting tasks easier.
> >>>
> >>>
> >>>>
> >>>> Additionally, hadoop lets you customize almost every aspect of the
> code
> >>>> run from how the input is split and read as key value pairs to how it
> is
> >>>> partitioned and sorted.
> >>>>
> >>>
> >>> Does that mean you can sort the values as well? I was thinking of each
> >>> reduce() call as independent, and then only the order of values for one
> >>> intermediate key would be relevant. I guess some tasks may require
> >> keeping
> >>> state across all the reduce() calls and then the order of key matters,
> >> but
> >>> then the reduce phase can't be parallelized, either across the cluster
> or
> >>> on a single node.
> >>
> >> I was not very clear here. You can set the partitioner for a specific
> >> job. You may also set the key comparator, as a result change the way
> >> that intermediate keys are sorted. Additionally, one can change how keys
> >> are grouped into one reduce call by setting the GroupComparator class. A
> >> simple example would be to have sales(date,amount) and you want to
> >> create  totals for each month of the year.
> >> so for the key: (year,month) and value: amount.
> >> by overriding the keyClass hashCode function you can send all the
> >> intermediate pairs with the same year to the same reducer
> >>
> >> and then you can set the groupComparator to group together all the
> >> values with the same year.
> >>
> >
> > You mean set the groupComparator to group together all the values with
> the
> > same month? I don't think so, because the key is already (year, month).
> But
> > if you wanted to collect the totals for each year you could just use the
> > year as the intermediary key. So I don't quite understand how your
> example
> > is supposed to work.
> Well you can do that as well, but I meant to group all the months of the
> same year in one reduce call. The idea is that you want to receive in
> one reduce the values for one year and the values for that year to be
> sorted by month.
>

Ok, I didn't get it because I was looking at the problem from the other way
around: if I'd want the values to be sorted, I'd include the month in the
value and configure sorting for the values. But with Hadoop's streaming
model it's probably easier to always sort by the keys.


> >
> > Besides, each reduce() call receives just one key, if you have keys
> (2013,
> > 1) and (2013, 2) and the groupComparator decides they should map to the
> > same group, which key does the reducer see? I think a regular equals()
> > should be good enough for us here, since we already need equals() in
> order
> > to put the intermediary keys in the intermediary cache.
> >
>
> I am not be very good with examples you can check this
>
> https://www.inkling.com/read/hadoop-definitive-guide-tom-white-3rd/chapter-8/sorting
> .
> It is more or less the same problem with different setting.
>

Sorry, I didn't get too much from that example either, I gave up after the
second "registering is fun" popup :)

One last question: with Hadoop I imagine it's quite easy to leave the
results of the M/R job on the distributed FS and start a new job to M/R
from that. Do you think it would be important to offer something similar in
Infinispan (i.e. put the result of the reducers in a cache instead of
returning it to the user)?


> Cheers
> > Dan
> >
> >
> Cheers
> Evangelos
> >
> >>
> >> Cheers,
> >> Evangelos
> >>
> >>
> >>>
> >>>> A simple example is group by and computing an average over the grouped
> >>>> values. Let the dataset be webpages (url,domain,sentiment) and we want
> >>>> to compute the average sentiment for each domain in the dataset then
> the
> >>>> mapper for each webpages wp. will run
> >>>> map(wp.url,wp):
> >>>>   emit(wp.domain,wp.sentiment)
> >>>>
> >>>> and in reducer:
> >>>> reduce(domain,Iterable<Double> values):
> >>>>   counter = 0
> >>>>   sum = 0
> >>>>   while(values.hasNext())
> >>>>     counter++;
> >>>>     sum += values.next()
> >>>>   emit(domain,sum/counter)
> >>>>
> >>>> I know that this approach is not optimized. But, I wanted give a
> simple
> >>>> example.
> >>>>
> >>>
> >>> I think it can also be optimized to use a combiner, if we emit a
> (domain,
> >>> counter, sum) tuple :)
> >>
> >>>
> >>>
> >>>
> >>>> Dan, only the the values for one intermediate key must be in memory?
> or
> >>>> all the intermediate key,value pairs  that are assigned to one reducer
> >>>> must be in memory?
> >>>>
> >>>
> >>> With the default configuration, all the key/value pairs assigned to one
> >>> reducer must be in memory. But one can define the __tmpMapReduce cache
> in
> >>> the configuration and configure eviction with a cache store (note that
> >>> because of how our eviction works, the actual container size is at
> least
> >>> concurrencyLevel rounded up to the next power of 2). The problem is
> that
> >>> there is only one configuration for all the M/R tasks [1].
> >>>
> >>> Note that because we only run the combiner after the mapping phase is
> >>> complete, we do need to keep in memory all the results of the mapping
> >> phase
> >>> from that node (those are not stored in a cache). I've created an issue
> >> in
> >>> JIRA for this [2].
> >>>
> >>> Cheers
> >>> Dan
> >>>
> >>> [1] https://issues.jboss.org/browse/ISPN-4021
> >>> [2] https://issues.jboss.org/browse/ISPN-4022
> >>>
> >>>
> >>>
> >>>> Cheers,
> >>>> Evangelos
> >>>>
> >>>> On 02/18/2014 11:59 AM, Dan Berindei wrote:
> >>>>> Radim, this is how our M/R algorithm works (Hadoop may do it
> >>>> differently):
> >>>>>
> >>>>> * The mapping phase generates a Map<IntKey, Collection<IntValue>> on
> >> each
> >>>>> node (Int meaning intermediate).
> >>>>> * In the combine (local reduce) phase, a combine operation takes as
> >> input
> >>>>> an IntKey and a Collection<IntValue> with only the values that were
> >>>>> produced on that node.
> >>>>> * In the (global) reduce phase, all the intermediate values for each
> >> key
> >>>>> are merged, and a reduce operation takes an intermediate key and a
> >>>> sequence
> >>>>> of *all* the intermediate values generated for that key. These reduce
> >>>>> operations are completely independent, so each intermediate key can
> be
> >>>>> mapped to a different node (distributed reduce), while still having
> >>>> access
> >>>>> to all the intermediate values at once.
> >>>>> * In the end, the collator takes the Map<IntKey, IntValue> from the
> >>>> reduce
> >>>>> phase and produces a single value.
> >>>>>
> >>>>> If a combiner can be used, then I believe it can also be run in
> >> parallel
> >>>>> with a LinkedBlockingQueue between the mapper and the combiner. But
> >>>>> sometimes the reduce algorithm can only be run on the entire
> collection
> >>>> of
> >>>>> values (e.g if you want to find the median, or a percentile).
> >>>>>
> >>>>> The limitation we have now is that in the reduce phase, the entire
> list
> >>>> of
> >>>>> values for one intermediate key must be in memory at once. I think
> >> Hadoop
> >>>>> only loads a block of intermediate values in memory at once, and can
> >> even
> >>>>> sort the intermediate values (with a user-supplied comparison
> function)
> >>>> so
> >>>>> that the reduce function can work on a sorted list without loading
> the
> >>>>> values in memory itself.
> >>>>>
> >>>>> Cheers
> >>>>> Dan
> >>>>
> >>>
> >>>
> >>>
> >>> _______________________________________________
> >>> infinispan-dev mailing list
> >>> infinispan-dev at lists.jboss.org
> >>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >>>
> >>
> >> _______________________________________________
> >> infinispan-dev mailing list
> >> infinispan-dev at lists.jboss.org
> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >>
> >
> >
> >
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev at lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20140219/6690d65d/attachment-0001.html 


More information about the infinispan-dev mailing list