<div dir="ltr"><br><div class="gmail_extra"><br><br><div class="gmail_quote">On Tue, Feb 18, 2014 at 12:21 PM, Evangelos Vazaios <span dir="ltr">&lt;<a href="mailto:vagvaz@gmail.com" target="_blank">vagvaz@gmail.com</a>&gt;</span> wrote:<br>

<blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">Hi Radim,<br>
<br>
Since Hadoop is the most popular implementation of MapReduce I will give<br>
a brief overview of how it works and then I&#39;ll provide with an example<br>
where the reducers must run over the whole list of values with the same key.<br>
<br>
Hadoop MR overview.<br>
<br>
MAP<br>
<br>
1) Input file(s) are split into pieces of 64MB<br>
2) For each split hadoop creates one map task and then assign the task<br>
to a cluster node<br>
3) The splits are read as key,value pairs and the map function of Mapper<br>
is called. The mapper can output arbitrary number of intermediate<br>
key,value pairs<br>
4) the output from the mapper is stored in a buffer in memory. After a<br>
certain threshold is reached the pairs are sorted by key and if there is<br>
a combiner it is run on the pairs that have the same key. Then, the<br>
output is flushed on the HDFS.<br></blockquote><div><br></div><div><div>Ok, so Hadoop runs the combiner more or less concurrently with the mappers.<br><br></div>I&#39;m curious if there are any M/R tasks that benefit from the sorting the keys here, we just put the intermediate values in a Map&lt;IK, Coll&lt;IV&gt;&gt;. We could do about the same by passing this map (or rather each entry in the map) to the combiner when it reaches a certain threshold, but I&#39;m not convinced about the need to sort it.<br>

</div><br><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
<br>
SHUFFLE<br>
<br>
hadoop decides the Reducer that should process each key by running a<br>
partitioner. The default partitioner decides with the following way:<br>
reducer = intermidKey.hashCode() % numberOfReducer<br>
Finally, the intermediate key,value pairs are sent to the reducers<br></blockquote><div><br></div><div>Is this algorithm set in stone, in that some M/R tasks rely on it? In our impl, the user could use grouping to direct a set of intermediate keys to the same node for reducing, but otherwise the reducing node is more or less random. <br>

<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
<br>
REDUCE<br>
<br>
1) Reducer sorts all key,value pairs by key and then groups the values<br>
with the same key. As a result reducers receive their keys sorted.<br></blockquote><div><br></div><div>I guess this sorting is only relevant if the reduce phase happens on a single thread, on a single node? If the reduce happens in parallel, the ordering is going to be lost anyway.<br>

</div><div> </div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
2) for each Key,List&lt;Value&gt; the reduce function of the reducer is<br>
called. Reducer can also emit arbitrary number of key,value pairs<br></blockquote><div><br></div><div>We limit the reducer (and the combiner) to emit a single value, which is paired with the input key. We may need to lift this restriction, if only to make porting/adapting tasks easier.<br>

 <br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
<br>
Additionally, hadoop lets you customize almost every aspect of the code<br>
run from how the input is split and read as key value pairs to how it is<br>
partitioned and sorted.<br></blockquote><div><br></div><div>Does that mean you can sort the values as well? I was thinking of each reduce() call as independent, and then only the order of values for one intermediate key would be relevant. I guess some tasks may require keeping state across all the reduce() calls and then the order of key matters, but then the reduce phase can&#39;t be parallelized, either across the cluster or on a single node.<br>

<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
<br>
A simple example is group by and computing an average over the grouped<br>
values. Let the dataset be webpages (url,domain,sentiment) and we want<br>
to compute the average sentiment for each domain in the dataset then the<br>
mapper for each webpages wp. will run<br>
map(wp.url,wp):<br>
  emit(wp.domain,wp.sentiment)<br>
<br>
and in reducer:<br>
reduce(domain,Iterable&lt;Double&gt; values):<br>
  counter = 0<br>
  sum = 0<br>
  while(values.hasNext())<br>
    counter++;<br>
    sum += values.next()<br>
  emit(domain,sum/counter)<br>
<br>
I know that this approach is not optimized. But, I wanted give a simple<br>
example.<br></blockquote><div><br></div><div>I think it can also be optimized to use a combiner, if we emit a (domain, counter, sum) tuple :)<br><br></div><div> </div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">


Dan, only the the values for one intermediate key must be in memory? or<br>
all the intermediate key,value pairs  that are assigned to one reducer<br>
must be in memory?<br></blockquote><div><br></div><div>With the default configuration, all the key/value pairs assigned to one reducer must be in memory. But one can define the __tmpMapReduce cache in the configuration and configure eviction with a cache store (note that because of how our eviction works, the actual container size is at least concurrencyLevel rounded up to the next power of 2). The problem is that there is only one configuration for all the M/R tasks [1].<br>

<br></div><div>Note that because we only run the combiner after the mapping phase is complete, we do need to keep in memory all the results of the mapping phase from that node (those are not stored in a cache). I&#39;ve created an issue in JIRA for this [2].<br>

</div><div><br>Cheers<br>Dan<br><br>[1] <a href="https://issues.jboss.org/browse/ISPN-4021">https://issues.jboss.org/browse/ISPN-4021</a><br>[2] <a href="https://issues.jboss.org/browse/ISPN-4022">https://issues.jboss.org/browse/ISPN-4022</a><br>

<br><br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
<br>
Cheers,<br>
Evangelos<br>
<div><div class="h5"><br>
On 02/18/2014 11:59 AM, Dan Berindei wrote:<br>
&gt; Radim, this is how our M/R algorithm works (Hadoop may do it differently):<br>
&gt;<br>
&gt; * The mapping phase generates a Map&lt;IntKey, Collection&lt;IntValue&gt;&gt; on each<br>
&gt; node (Int meaning intermediate).<br>
&gt; * In the combine (local reduce) phase, a combine operation takes as input<br>
&gt; an IntKey and a Collection&lt;IntValue&gt; with only the values that were<br>
&gt; produced on that node.<br>
&gt; * In the (global) reduce phase, all the intermediate values for each key<br>
&gt; are merged, and a reduce operation takes an intermediate key and a sequence<br>
&gt; of *all* the intermediate values generated for that key. These reduce<br>
&gt; operations are completely independent, so each intermediate key can be<br>
&gt; mapped to a different node (distributed reduce), while still having access<br>
&gt; to all the intermediate values at once.<br>
&gt; * In the end, the collator takes the Map&lt;IntKey, IntValue&gt; from the reduce<br>
&gt; phase and produces a single value.<br>
&gt;<br>
&gt; If a combiner can be used, then I believe it can also be run in parallel<br>
&gt; with a LinkedBlockingQueue between the mapper and the combiner. But<br>
&gt; sometimes the reduce algorithm can only be run on the entire collection of<br>
&gt; values (e.g if you want to find the median, or a percentile).<br>
&gt;<br>
&gt; The limitation we have now is that in the reduce phase, the entire list of<br>
&gt; values for one intermediate key must be in memory at once. I think Hadoop<br>
&gt; only loads a block of intermediate values in memory at once, and can even<br>
&gt; sort the intermediate values (with a user-supplied comparison function) so<br>
&gt; that the reduce function can work on a sorted list without loading the<br>
&gt; values in memory itself.<br>
&gt;<br>
&gt; Cheers<br>
&gt; Dan<br></div></div></blockquote></div><br></div></div>