<div dir="ltr"><div>I am really sorry for the ridiculously late response.  I will describe briefly our 1st year and our current approach.<br></div><br>1st year approach.<br><div>During the first year, we used infinispan MR to implement our operators. Most of our operators were Map-only (for example project,filter) and for these we did not use the intermediate cache. For all the other operators (join,group by) we used the collector interface. Our reducers always returned null and the actual output was written to another cache, because we had a workflow of operators.<br><br></div><div>Current approach<br></div><div>At the moment we do not use replaced MR, with two dist calls one for the map and another for the reduce phase. The intermediate data are stored in a cache (Cache&lt;K,List&lt;V&gt;&gt;). At some point we would like to change to a delta aware cache. We changed from the MR to dist calls, because we want to run MR tasks across multiple micro-clouds and the synchronization of Mappers and reducers it would be more complicated than monitoring the execution of independent dist calls ( 1 for each micro-cloud). The intermediate data are written to a ensemble cache ( a LEADS cache), which spans multiple micro-clouds.<br></div><div><br>In general, I find it quite useful to be able to &quot;consistently&quot; (without missing data that are already inside) iterate over the values of a cache.<br></div><div><br><br></div><div><div><div class="gmail_extra"><br><div class="gmail_quote">On Wed, Oct 15, 2014 at 7:41 PM, Emmanuel Bernard <span dir="ltr">&lt;<a href="mailto:emmanuel@hibernate.org" target="_blank">emmanuel@hibernate.org</a>&gt;</span> wrote:<br><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex"><div style="word-wrap:break-word"><br><div><span class=""><div>On 13 Oct 2014, at 10:45, Dan Berindei &lt;<a href="mailto:dan.berindei@gmail.com" target="_blank">dan.berindei@gmail.com</a>&gt; wrote:</div><br><blockquote type="cite"><div dir="ltr"><div class="gmail_extra"><br><div class="gmail_quote">On Fri, Oct 10, 2014 at 6:49 PM, Emmanuel Bernard <span dir="ltr">&lt;<a href="mailto:emmanuel@hibernate.org" target="_blank">emmanuel@hibernate.org</a>&gt;</span> wrote:<br><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">When wrestling with the subject, here is what I had in mind.<br>
<br>
The M/R coordinator node sends the M task per segment on the node where<br>
the segment is primary.<br></blockquote><div><br></div><div>What&#39;s M? Is it just a shorthand for &quot;map&quot;, or is it a new parameter that controls the number of map/combine tasks sent at once?</div></div></div></div></blockquote><div><br></div></span>M is short for Map. Sorry.</div><div><span class=""><br><blockquote type="cite"><div dir="ltr"><div class="gmail_extra"><div class="gmail_quote"><div> </div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
Each &quot;per-segment&quot; M task is executed and is offered the way to push<br>
intermediary results in a temp cache.<br></blockquote><div><br></div><div>Just to be clear, the user-provided mapper and combiner don&#39;t know anything about the intermediary cache (which doesn&#39;t have to be temporary, if it&#39;s shared by all M/R tasks). They only interact with the Collector interface.</div><div>The map/combine task on the other hand is our code, and it deals with the intermediary cache directly.</div></div></div></div></blockquote><div><br></div></span><div>Interesting, Evangelos, do you actually use the collector interface or actual explicit intermediary caches in your approach.</div><div>If that’s the collector interface, I guess that’s easier to hide that sharding business. <br></div></div></div></blockquote><div> </div><div>We use explicit caches, but should that functionality become available, we could possibly revert back to Infinspan MR.<br></div><div> </div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex"><div style="word-wrap:break-word"><div><div></div><span class=""><br><blockquote type="cite"><div dir="ltr"><div class="gmail_extra"><div class="gmail_quote"><div> </div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
The intermediary results are stored with a composite key [imtermKey-i, seg-j].<br>
The M/R coordinator waits for all M tasks to return. If one does not<br>
(timeout, rehash), the following happens:<br></blockquote><div><br></div><div>We can&#39;t allow time out map tasks, or they will keep writing to the intermediate cache in parallel with the retried tasks. So the originator has to wait for a response from each node to which it sent a map task.</div></div></div></div></blockquote><div><br></div></span><div>OK. I guess the originator can see that a node is out of the cluster though and act accordingly.</div><span class=""><br><blockquote type="cite"><div dir="ltr"><div class="gmail_extra"><div class="gmail_quote"><div> </div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
- delete [intermKey-i, seg-i] (that operation could be handled by the<br>
  new per-segment M before the map task is effectively started)<br>
- ship the M task for that segment-i to the new primary owner of<br>
  segment-i<br>
<br>
When all M tasks are received the Reduce phase will read all [intermKey-i, *]<br>
keys and reduce them.<br></blockquote><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">Note that if the reduction phase is itself distributed, we could apply<br>
the same key per segment and shipping split for these.<br></blockquote><div><br></div><div>Sure, we have to retry reduce tasks when the primary owner changes, and it makes sense to retry as little as possible.<br></div><div> </div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
<br>
Again the tricky part is to expose the ability to write to intermediary<br>
caches per segment without exposing segments per se as well as let<br>
someone see a concatenated view if intermKey-i from all segments subkeys<br>
during reduction.<br></blockquote><div><br></div><div>Writing to and reading from the intermediate cache is already abstracted from user code (in the Mapper and Reducer interfaces). So we don&#39;t need to worry about exposing extra details to the user.</div><div> <br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
<br>
Thoughts?<br>
<br>
Dan, I did not quite get what alternative approach you wanted to<br>
propose. Care to respin it for a slow brain? :)<br></blockquote><div><br></div><div>I think where we differ is that I don&#39;t think user code needs to know about how we store the intermediate values and what we retry, as long as their mappers/combiners/reducers don&#39;t have side effects.</div></div></div></div></blockquote><div><br></div></span><div>Right but my understanding from the LEADS guys was that they had side effects on their M/Rs. Waiting for Evangelos to speak up.</div><span class=""><br></span></div></div></blockquote><div>Should that be available for MapReduce, and the underlying ensemble 
cache can correctly handle one of  the strategies described above, we might be 
able to change back to Infinispan MR.  <br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex"><div style="word-wrap:break-word"><div><span class=""><blockquote type="cite"><div dir="ltr"><div class="gmail_extra"><div class="gmail_quote"><div><br></div><div>Otherwise I was thinking on the same lines: send 1 map/combine task for each segment (maybe with a cap on the number of segments being processed at the same time on each node), split the intermediate values per input segment, cancel+retry each map task if the topology changes and the executing node is no longer an owner. If the reduce phase is distributed, run 1 reduce task per segment as well, and cancel+retry the reduce task if the executing node is no longer an owner.</div><div><br></div><div>I had some ideas about assigning each map/combine phase a UUID and making the intermediate keys [intermKey, seg, mctask] to allow the originator to retry a map/combine task without waiting for the previous one to finish, but I don&#39;t think I mentioned that before :)</div></div></div></div></blockquote><div><br></div></span><div>Nice touch, that fixes the rogue node / timeout problem.</div><span class=""><br></span></div></div></blockquote><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex"><div style="word-wrap:break-word"><div><span class=""><blockquote type="cite"><div dir="ltr"><div class="gmail_extra"><div class="gmail_quote"><div>There are also some details that I&#39;m worried about:</div><div><br></div><div>1) If the reduce phase is distributed, and the intermediate cache is non-transactional, any topology change in the intermediate cache will require us to retry all the map/combine tasks that were running at the time on any node (even if some nodes did not detect the topology change yet). So it would make sense to limit the number of map/combine tasks that are processed at one time, in order to limit the amount of tasks we retry (OR require the intermediate cache to be transactional).</div></div></div></div></blockquote><div><br></div></span>I am not fully following that. What matters in the end it seems is for the originator to detect a topology change and discard things accordingly, no? If the other nodes are slaves of that originator for the purpose of that M/R, we are good.</div><div><span class=""><br><blockquote type="cite"><div dir="ltr"><div class="gmail_extra"><div class="gmail_quote"><div><br></div><div>2) Running a separate map/combine task for each segment is not really an option until we implement the the segment-aware data container and cache stores. Without that change, it will make everything much slower, because of all the extra iterations for each segment.</div><div><br></div></div></div></div></blockquote><div><br></div></span><div>See my other email about physically merging down the per segment work into a per node work when you ship that work.</div><span class=""><br><blockquote type="cite"><div dir="ltr"><div class="gmail_extra"><div class="gmail_quote"><div>3) And finally, all this will be overkill when the input cache is small, and the time needed to process the data is comparable to the time needed to send all those extra RPCs.</div></div></div></div></blockquote><blockquote type="cite"><div dir="ltr"><div class="gmail_extra"><div class="gmail_quote"><div><br></div><div>So I&#39;m thinking it might be better to adopt Vladimir&#39;s suggestion to retry everything if we detect a topology change in the input and/or intermediate cache at the end of the M/R task, at least in the first phase. </div></div></div></div></blockquote><div><br></div></span></div></div></blockquote><div>It would also be an overkill to restart everything MR task if the volume of data is large. <br></div><div>I would propose a solution using the distributed iterator and that it would not miss data whenever a topology change happens.<br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex"><div style="word-wrap:break-word"><div><span class=""><div></div></span><div>You half lost but I think that with my proposal to physically merge the RPC calls per node instead of per segment, that problem would be alleviated.</div><span class=""><font color="#888888"><div><br></div><div>Emmanuel</div></font></span></div></div><br>_______________________________________________<br>
infinispan-dev mailing list<br>
<a href="mailto:infinispan-dev@lists.jboss.org">infinispan-dev@lists.jboss.org</a><br>
<a href="https://lists.jboss.org/mailman/listinfo/infinispan-dev" target="_blank">https://lists.jboss.org/mailman/listinfo/infinispan-dev</a><br></blockquote></div><br></div><div class="gmail_extra">Cheers,<br></div><div class="gmail_extra">Evangelos<br></div></div></div></div>