[infinispan-dev] Further dist.exec and M/R API improvements

Vladimir Blagojevic vblagoje at redhat.com
Tue Feb 25 10:09:29 EST 2014


On 2/25/2014, 7:33 AM, Dan Berindei wrote:
>
>
> Do we really need special support for distributed tasks to write 
> results to another cache? We already allow a task to do
>
> cache.getCacheManager().getCache("outputCache").put(k, v)
Yeah, very good point Dan. Thanks for being sanity check. Mircea?

>
>     >
>     >> I was thinking we add a concept of
>     >> DistributedTaskExecutionListener which can be specified in
>     >> DistributedTaskBuilder:
>     >>
>     >> DistributedTaskBuilder<T>
>     >> executionListener(DistributedTaskExecutionListener<K, T> listener);
>     >>
>     >>
>     >> We needed DistributedTaskExecutionListener anyway. All
>     distributed tasks
>     >> might use some feedback about task progress, completion/failure
>     and on.
>     >> My proposal is roughly:
>     >>
>     >>
>     >> public interface DistributedTaskExecutionListener<K, T> {
>     >>
>     >>     void subtaskSent(Address node, Set<K> inputKeys);
>     >>     void subtaskFailed(Address node, Set<K> inputKeys,
>     Exception e);
>     >>     void subtaskSucceded(Address node, Set<K> inputKeys, T result);
>     >>     void allSubtasksCompleted();
>     >>
>     >> }
>     >>
>     >> So much for that.
>     > I think this it would make sense to add this logic for
>     monitoring, + additional info such as average execution time etc.
>     I'm not sure if this is a generally useful API though, unless
>     there were people asking for it already?
>     Ok, noted. If you remember any references about this let me know and
>     I'll incorporate what people actually asked for rather than guess.
>
>
> Ok, let's wait until we get some actual requests from users then. TBH 
> I don't think distributed tasks with subtasks are something that users 
> care about. E.g. with Map/Reduce the reduce tasks are not subtasks of 
> the map/combine tasks, so this API wouldn't help.
>
> Hadoop has a Reporter interface that allows you to report "ticks" and 
> increment counters, maybe we should add something like that instead?

The subtask I am referring to here is just to denote part of the 
distributed task initiated using dist.executors. This interface (maybe 
extended a bit with ideas from Reporter) could be used for both 
monitoring and more application specific logic about task re-execution 
and so on.


>
>
> I think we should allow each distributed task to deal with output in 
> its own way, the existing API should be enough.

Yes, I can see your point. Mircea?
>
>
>     >> public interface MapReduceTaskExecutionListener {
>     >>
>     >>     void mapTaskInitialized(Address executionAddress);
>     >>     void mapTaskSucceeded(Address executionAddress);
>     >>     void mapTaskFailed(Address executionTarget, Exception cause);
>     >>     void mapPhaseCompleted();
>     >>
>     >>     void reduceTaskInitialized(Address executionAddress);
>     >>     void reduceTaskSucceeded(Address executionAddress);
>     >>     void reduceTaskFailed(Address address, Exception cause);
>     >>     void reducePhaseCompleted();
>     >>
>     >> }
>     > IMO - in the first stage at leas - I would rather use a simpler
>     (Notifying)Future, on which the user can wait till the computation
>     happens: it's simpler and more aligned with the rest of our async API.
>     >
>     What do you mean? We already have futures in MapReduceTask API.
>     This API
>     is more fine grained and allows monitoring/reporting of task progress.
>     Please clarify.
>
>
> I'm not sure about the usefulness of an API like this either... if the 
> intention is to allow the user to collect statistics about duration of 
> various phases, then I think exposing the durations via MapReduceTasks 
> would be better.
How would you design that API Dan? Something other than 
listener/callback interface?

>
>     >> while MapReduceTask would have an additional method:
>     >>
>     >> public void execute(Cache<KOut, VOut> resultsCache);
>     > you could overload it with cache name only method.
>     Yeah, good idea. Same for usingIntermediateCache? I actually asked you
>     this here https://issues.jboss.org/browse/ISPN-4021
>
>
> +1 to allow a cache name only. For the intermediate cache I don't 
> think it makes sense to allow a Cache version at all.
Ok good. Deal.


Thanks,
Vladimir
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20140225/da649f6f/attachment.html 


More information about the infinispan-dev mailing list