[infinispan-dev] MapReduce limitations and suggestions.

Evangelos Vazaios vagvaz at gmail.com
Tue Feb 18 05:21:30 EST 2014


Hi Radim,

Since Hadoop is the most popular implementation of MapReduce I will give
a brief overview of how it works and then I'll provide with an example
where the reducers must run over the whole list of values with the same key.

Hadoop MR overview.

MAP

1) Input file(s) are split into pieces of 64MB
2) For each split hadoop creates one map task and then assign the task
to a cluster node
3) The splits are read as key,value pairs and the map function of Mapper
is called. The mapper can output arbitrary number of intermediate
key,value pairs
4) the output from the mapper is stored in a buffer in memory. After a
certain threshold is reached the pairs are sorted by key and if there is
a combiner it is run on the pairs that have the same key. Then, the
output is flushed on the HDFS.

SHUFFLE

hadoop decides the Reducer that should process each key by running a
partitioner. The default partitioner decides with the following way:
reducer = intermidKey.hashCode() % numberOfReducer
Finally, the intermediate key,value pairs are sent to the reducers

REDUCE

1) Reducer sorts all key,value pairs by key and then groups the values
with the same key. As a result reducers receive their keys sorted.
2) for each Key,List<Value> the reduce function of the reducer is
called. Reducer can also emit arbitrary number of key,value pairs

Additionally, hadoop lets you customize almost every aspect of the code
run from how the input is split and read as key value pairs to how it is
partitioned and sorted.

A simple example is group by and computing an average over the grouped
values. Let the dataset be webpages (url,domain,sentiment) and we want
to compute the average sentiment for each domain in the dataset then the
mapper for each webpages wp. will run
map(wp.url,wp):
  emit(wp.domain,wp.sentiment)

and in reducer:
reduce(domain,Iterable<Double> values):
  counter = 0
  sum = 0
  while(values.hasNext())
    counter++;	
    sum += values.next()
  emit(domain,sum/counter)

I know that this approach is not optimized. But, I wanted give a simple
example.
Dan, only the the values for one intermediate key must be in memory? or
all the intermediate key,value pairs  that are assigned to one reducer
must be in memory?

Cheers,
Evangelos

On 02/18/2014 11:59 AM, Dan Berindei wrote:
> Radim, this is how our M/R algorithm works (Hadoop may do it differently):
> 
> * The mapping phase generates a Map<IntKey, Collection<IntValue>> on each
> node (Int meaning intermediate).
> * In the combine (local reduce) phase, a combine operation takes as input
> an IntKey and a Collection<IntValue> with only the values that were
> produced on that node.
> * In the (global) reduce phase, all the intermediate values for each key
> are merged, and a reduce operation takes an intermediate key and a sequence
> of *all* the intermediate values generated for that key. These reduce
> operations are completely independent, so each intermediate key can be
> mapped to a different node (distributed reduce), while still having access
> to all the intermediate values at once.
> * In the end, the collator takes the Map<IntKey, IntValue> from the reduce
> phase and produces a single value.
> 
> If a combiner can be used, then I believe it can also be run in parallel
> with a LinkedBlockingQueue between the mapper and the combiner. But
> sometimes the reduce algorithm can only be run on the entire collection of
> values (e.g if you want to find the median, or a percentile).
> 
> The limitation we have now is that in the reduce phase, the entire list of
> values for one intermediate key must be in memory at once. I think Hadoop
> only loads a block of intermediate values in memory at once, and can even
> sort the intermediate values (with a user-supplied comparison function) so
> that the reduce function can work on a sorted list without loading the
> values in memory itself.
> 
> Cheers
> Dan
> 
> 
> 
> On Tue, Feb 18, 2014 at 10:59 AM, Radim Vansa <rvansa at redhat.com> wrote:
> 
>> Hi Etienne,
>>
>> how does the requirement for all data provided to Reducer as a whole
>> work for distributed caches? There you'd get only a subset of the whole
>> mapped set on each node (afaik each node maps the nodes locally and
>> performs a reduction before executing the "global" reduction). Or are
>> these M/R jobs applicable only to local caches?
>> I have to admit I have only a limited knowledge of M/R, could you give
>> me an example where the algorithm works in distributed environment and
>> still cannot be parallelized?
>>
>> Thanks
>>
>> Radim
>>
>> On 02/17/2014 09:18 AM, Etienne Riviere wrote:
>>> Hi Radim,
>>>
>>> I might misunderstand your suggestion but many M/R jobs actually require
>> to run the two phases one after the other, and henceforth to store the
>> intermediate results somewhere. While some may slightly reduce intermediate
>> memory usage by using a combiner function (e.g., the word-count example), I
>> don't see how we can avoid intermediate storage altogether.
>>>
>>> Thanks,
>>> Etienne (leads project -- as Evangelos who initiated the thread)
>>>
>>> On 17 Feb 2014, at 08:48, Radim Vansa <rvansa at redhat.com> wrote:
>>>
>>>> I think that the intermediate cache is not required at all. The M/R
>>>> algorithm itself can (and should!) run with memory occupied by the
>>>> result of reduction. The current implementation with Map first and
>>>> Reduce after that will always have these problems, using a cache for
>>>> temporary caching the result is only a workaround.
>>>>
>>>> The only situation when temporary cache could be useful is when the
>>>> result grows linearly (or close to that or even more) with the amount of
>>>> reduced entries. This would be the case for groupBy producing Map<Color,
>>>> List<Entry>> from all entries in cache. Then the task does not scale and
>>>> should be redesigned anyway, but flushing the results into cache backed
>>>> by cache store could help.
>>>>
>>>> Radim
>>>>
>>>> On 02/14/2014 04:54 PM, Vladimir Blagojevic wrote:
>>>>> Tristan,
>>>>>
>>>>> Actually they are not addressed in this pull request but the feature
>>>>> where custom output cache is used instead of results being returned is
>>>>> next in the implementation pipeline.
>>>>>
>>>>> Evangelos, indeed, depending on a reducer function all intermediate
>>>>> KOut/VOut pairs might be moved to a single node. How would custom cache
>>>>> help in this case?
>>>>>
>>>>> Regards,
>>>>> Vladimir
>>>>>
>>>>>
>>>>> On 2/14/2014, 10:16 AM, Tristan Tarrant wrote:
>>>>>> Hi Evangelos,
>>>>>>
>>>>>> you might be interested in looking into a current pull request which
>>>>>> addresses some (all?) of these issues
>>>>>>
>>>>>> https://github.com/infinispan/infinispan/pull/2300
>>>>>>
>>>>>> Tristan
>>>>>>
>>>>>> On 14/02/2014 16:10, Evangelos Vazaios wrote:
>>>>>>> Hello everyone,
>>>>>>>
>>>>>>> I started using the MapReduce implementation of Infinispan and I came
>>>>>>> across some possible limitations. Thus,  I want to make some
>> suggestions
>>>>>>> about the MapReduce (MR) implementation of Infinispan.
>>>>>>> Depending on the algorithm,  there might be some memory problems,
>>>>>>> especially for intermediate results.
>>>>>>> An example of such a case is  group by. Suppose that we have a
>> cluster
>>>>>>> of 2 nodes with 2 GB  available. Let a distributed cache, where
>> simple
>>>>>>> car objects (id,brand,colour) are stored and the total size of data
>> is
>>>>>>> 3.5GB. If all objects have the same colour , then all 3.5 GB would
>> go to
>>>>>>> only one reducer, as a result an OutOfMemoryException will be thrown.
>>>>>>>
>>>>>>> To overcome these limitations, I propose to add as parameter the
>> name of
>>>>>>> the intermediate cache to be used. This will enable the creation of a
>>>>>>> custom configured cache that deals with the memory limitations.
>>>>>>>
>>>>>>> Another feature that I would like to have is to set the name of the
>>>>>>> output cache. The reasoning behind this is similar to the one
>> mentioned
>>>>>>> above.
>>>>>>>
>>>>>>> I wait for your thoughts on these two suggestions.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Evangelos
>>>>>>> _______________________________________________
>>>>>>> infinispan-dev mailing list
>>>>>>> infinispan-dev at lists.jboss.org
>>>>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> _______________________________________________
>>>>>> infinispan-dev mailing list
>>>>>> infinispan-dev at lists.jboss.org
>>>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>>> _______________________________________________
>>>>> infinispan-dev mailing list
>>>>> infinispan-dev at lists.jboss.org
>>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>>
>>>> --
>>>> Radim Vansa <rvansa at redhat.com>
>>>> JBoss DataGrid QA
>>>>
>>>> _______________________________________________
>>>> infinispan-dev mailing list
>>>> infinispan-dev at lists.jboss.org
>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> infinispan-dev at lists.jboss.org
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>> --
>> Radim Vansa <rvansa at redhat.com>
>> JBoss DataGrid QA
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
> 
> 
> 
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> 



More information about the infinispan-dev mailing list