<div dir="ltr"><br><div class="gmail_extra"><br><br><div class="gmail_quote">On Mon, Feb 24, 2014 at 10:55 PM, Vladimir Blagojevic <span dir="ltr">&lt;<a href="mailto:vblagoje@redhat.com" target="_blank">vblagoje@redhat.com</a>&gt;</span> wrote:<br>

<blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">See inline<br>
<div class="">On 2/24/2014, 12:57 PM, Mircea Markus wrote:<br>
&gt; On Feb 19, 2014, at 8:45 PM, Vladimir Blagojevic &lt;<a href="mailto:vblagoje@redhat.com">vblagoje@redhat.com</a>&gt; wrote:<br>
&gt;<br>
&gt;&gt; Hey guys,<br>
&gt;&gt;<br>
&gt;&gt; As some of you might know we have received additional requirements from<br>
&gt;&gt; community and internally to add a few things to dist.executors and<br>
&gt;&gt; map/reduce API. On distributed executors front we need to enable<br>
&gt;&gt; distributed executors to store results into cache directly rather than<br>
&gt;&gt; returning them to invoker [1]. As soon as we introduce this API we also<br>
&gt;&gt; need a asyc. mechanism to allow notifications of subtask<br>
&gt;&gt; completion/failure.<br>
&gt; I think we need both in at the same time :-)<br>
</div>Yes, that is what I actually meant. Poor wording.<br></blockquote><div><br></div><div>Do we really need special support for distributed tasks to write results to another cache? We already allow a task to do<br><br>

cache.getCacheManager().getCache(&quot;outputCache&quot;).put(k, v)<br><br></div><div> </div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
<div class="">&gt;<br>
&gt;&gt; I was thinking we add a concept of<br>
&gt;&gt; DistributedTaskExecutionListener which can be specified in<br>
&gt;&gt; DistributedTaskBuilder:<br>
&gt;&gt;<br>
&gt;&gt; DistributedTaskBuilder&lt;T&gt;<br>
&gt;&gt; executionListener(DistributedTaskExecutionListener&lt;K, T&gt; listener);<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt; We needed DistributedTaskExecutionListener anyway. All distributed tasks<br>
&gt;&gt; might use some feedback about task progress, completion/failure and on.<br>
&gt;&gt; My proposal is roughly:<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt; public interface DistributedTaskExecutionListener&lt;K, T&gt; {<br>
&gt;&gt;<br>
&gt;&gt;     void subtaskSent(Address node, Set&lt;K&gt; inputKeys);<br>
&gt;&gt;     void subtaskFailed(Address node, Set&lt;K&gt; inputKeys, Exception e);<br>
&gt;&gt;     void subtaskSucceded(Address node, Set&lt;K&gt; inputKeys, T result);<br>
&gt;&gt;     void allSubtasksCompleted();<br>
&gt;&gt;<br>
&gt;&gt; }<br>
&gt;&gt;<br>
&gt;&gt; So much for that.<br>
&gt; I think this it would make sense to add this logic for monitoring, + additional info such as average execution time etc. I&#39;m not sure if this is a generally useful API though, unless there were people asking for it already?<br>


</div>Ok, noted. If you remember any references about this let me know and<br>
I&#39;ll incorporate what people actually asked for rather than guess.<br></blockquote><div><br></div><div>Ok, let&#39;s wait until we get some actual requests from users then. TBH I don&#39;t think distributed tasks with subtasks are something that users care about. E.g. with Map/Reduce the reduce tasks are not subtasks of the map/combine tasks, so this API wouldn&#39;t help.<br>

<br></div><div></div><div>Hadoop has a Reporter interface that allows you to report &quot;ticks&quot; and increment counters, maybe we should add something like that instead?<br> <br></div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">


<div><div class="h5"><br>
&gt;<br>
&gt;&gt; If tasks do not use input keys these parameters would<br>
&gt;&gt; be emply sets. Now for [1] we need to add additional methods to<br>
&gt;&gt; DistributedExecutorService. We can not specify result cache in<br>
&gt;&gt; DistributedTaskBuilder as we are still bound to only submit methods in<br>
&gt;&gt; DistributedExecutorService that return futures and we don&#39;t want that.<br>
&gt;&gt; We need two new void methods:<br>
&gt;&gt;<br>
&gt;&gt; &lt;T, K&gt; void submitEverywhere(DistributedTask&lt;T&gt; task,<br>
&gt;&gt; Cache&lt;DistExecResultKey&lt;K&gt;, T&gt; result);<br>
&gt;&gt; &lt;T, K &gt; void submitEverywhere(DistributedTask&lt;T&gt; task,<br>
&gt;&gt; Cache&lt;DistExecResultKey&lt;K&gt;, T&gt; result, K... input);<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt; Now, why bother with DistExecResultKey? Well we have tasks that use<br>
&gt;&gt; input keys and tasks that don&#39;t. So results cache could only be keyed by<br>
&gt;&gt; either keys or execution address, or combination of those two.<br>
&gt;&gt; Therefore, DistExecResultKey could be something like:<br>
&gt;&gt;<br>
&gt;&gt; public interface DistExecResultKey&lt;K&gt; {<br>
&gt;&gt;<br>
&gt;&gt;     Address getExecutionAddress();<br>
&gt;&gt;     K getKey();<br>
&gt;&gt;<br>
&gt;&gt; }<br>
&gt;&gt;<br>
&gt;&gt; If you have a better idea how to address this aspect let us know. So<br>
&gt;&gt; much for distributed executors.<br>
&gt;&gt;<br></div></div></blockquote><div><br></div><div>I think we should allow each distributed task to deal with output in its own way, the existing API should be enough.<br></div><div> </div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">

<div><div class="h5">
&gt;&gt;<br>
&gt;&gt; For map/reduce we also have to enable storing of map reduce task results<br>
&gt;&gt; into cache [2] and allow users to specify custom cache for intermediate<br>
&gt;&gt; results[3]. Part of task [2] is to allow notification about map/reduce<br>
&gt;&gt; task progress and completion. Just as in dist.executor I would add<br>
&gt;&gt; MapReduceTaskExecutionListener interface:<br>
&gt;&gt;<br>
&gt;&gt;<br>
&gt;&gt; public interface MapReduceTaskExecutionListener {<br>
&gt;&gt;<br>
&gt;&gt;     void mapTaskInitialized(Address executionAddress);<br>
&gt;&gt;     void mapTaskSucceeded(Address executionAddress);<br>
&gt;&gt;     void mapTaskFailed(Address executionTarget, Exception cause);<br>
&gt;&gt;     void mapPhaseCompleted();<br>
&gt;&gt;<br>
&gt;&gt;     void reduceTaskInitialized(Address executionAddress);<br>
&gt;&gt;     void reduceTaskSucceeded(Address executionAddress);<br>
&gt;&gt;     void reduceTaskFailed(Address address, Exception cause);<br>
&gt;&gt;     void reducePhaseCompleted();<br>
&gt;&gt;<br>
&gt;&gt; }<br>
&gt; IMO - in the first stage at leas - I would rather use a simpler (Notifying)Future, on which the user can wait till the computation happens: it&#39;s simpler and more aligned with the rest of our async API.<br>
&gt;<br>
</div></div>What do you mean? We already have futures in MapReduceTask API. This API<br>
is more fine grained and allows monitoring/reporting of task progress.<br>
Please clarify.<br></blockquote><div><br></div><div>I&#39;m not sure about the usefulness of an API like this either... if the intention is to allow the user to collect statistics about duration of various phases, then I think exposing the durations via MapReduceTasks would be better.<br>

</div><div> </div><blockquote class="gmail_quote" style="margin:0px 0px 0px 0.8ex;border-left:1px solid rgb(204,204,204);padding-left:1ex">
<div class=""><br>
&gt;&gt; while MapReduceTask would have an additional method:<br>
&gt;&gt;<br>
&gt;&gt; public void execute(Cache&lt;KOut, VOut&gt; resultsCache);<br>
&gt; you could overload it with cache name only method.<br>
</div>Yeah, good idea. Same for usingIntermediateCache? I actually asked you<br>
this here <a href="https://issues.jboss.org/browse/ISPN-4021" target="_blank">https://issues.jboss.org/browse/ISPN-4021</a><br></blockquote><div><br></div><div>+1 to allow a cache name only. For the intermediate cache I don&#39;t think it makes sense to allow a Cache version at all.<br>

</div><div> <br></div></div><br></div></div>