On Feb 25, 2014, at 3:09 PM, Vladimir Blagojevic <vblagoje(a)redhat.com> wrote:
On 2/25/2014, 7:33 AM, Dan Berindei wrote:
>
>
> Do we really need special support for distributed tasks to write results to another
cache? We already allow a task to do
>
> cache.getCacheManager().getCache("outputCache").put(k, v)
Yeah, very good point Dan. Thanks for being sanity check. Mircea?
+1
>
>
> >
> >> I was thinking we add a concept of
> >> DistributedTaskExecutionListener which can be specified in
> >> DistributedTaskBuilder:
> >>
> >> DistributedTaskBuilder<T>
> >> executionListener(DistributedTaskExecutionListener<K, T> listener);
> >>
> >>
> >> We needed DistributedTaskExecutionListener anyway. All distributed tasks
> >> might use some feedback about task progress, completion/failure and on.
> >> My proposal is roughly:
> >>
> >>
> >> public interface DistributedTaskExecutionListener<K, T> {
> >>
> >> void subtaskSent(Address node, Set<K> inputKeys);
> >> void subtaskFailed(Address node, Set<K> inputKeys, Exception e);
> >> void subtaskSucceded(Address node, Set<K> inputKeys, T result);
> >> void allSubtasksCompleted();
> >>
> >> }
> >>
> >> So much for that.
> > I think this it would make sense to add this logic for monitoring, + additional
info such as average execution time etc. I'm not sure if this is a generally useful
API though, unless there were people asking for it already?
> Ok, noted. If you remember any references about this let me know and
> I'll incorporate what people actually asked for rather than guess.
>
> Ok, let's wait until we get some actual requests from users then. TBH I don't
think distributed tasks with subtasks are something that users care about. E.g. with
Map/Reduce the reduce tasks are not subtasks of the map/combine tasks, so this API
wouldn't help.
>
> Hadoop has a Reporter interface that allows you to report "ticks" and
increment counters, maybe we should add something like that instead?
The subtask I am referring to here is just to denote part of the distributed task
initiated using dist.executors. This interface (maybe extended a bit with ideas from
Reporter) could be used for both monitoring and more application specific logic about task
re-execution and so on.
>
>
> I think we should allow each distributed task to deal with output in its own way, the
existing API should be enough.
Yes, I can see your point. Mircea?
+1 user driven features
>
>
> >> public interface MapReduceTaskExecutionListener {
> >>
> >> void mapTaskInitialized(Address executionAddress);
> >> void mapTaskSucceeded(Address executionAddress);
> >> void mapTaskFailed(Address executionTarget, Exception cause);
> >> void mapPhaseCompleted();
> >>
> >> void reduceTaskInitialized(Address executionAddress);
> >> void reduceTaskSucceeded(Address executionAddress);
> >> void reduceTaskFailed(Address address, Exception cause);
> >> void reducePhaseCompleted();
> >>
> >> }
> > IMO - in the first stage at leas - I would rather use a simpler
(Notifying)Future, on which the user can wait till the computation happens: it's
simpler and more aligned with the rest of our async API.
> >
> What do you mean? We already have futures in MapReduceTask API. This API
> is more fine grained and allows monitoring/reporting of task progress.
> Please clarify.
ah right, wasn't aware of MapReduceTask.executeAsynchronously() :-) That's what I
was after.
>
> I'm not sure about the usefulness of an API like this either... if the intention
is to allow the user to collect statistics about duration of various phases, then I think
exposing the durations via MapReduceTasks would be better.
How would you design that API Dan? Something other than listener/callback interface?
Functionally, what I was having in mind was JMX stats for the MapReduce tasks in general:
like average execution time, count etc. Also the ability to cancel a running task through
JMX/JON would be nice. I don't think we need to expose this to the user through the
MapReduceTaskExecutionListener above, though.
>
>
> >> while MapReduceTask would have an additional method:
> >>
> >> public void execute(Cache<KOut, VOut> resultsCache);
> > you could overload it with cache name only method.
> Yeah, good idea. Same for usingIntermediateCache? I actually asked you
> this here
https://issues.jboss.org/browse/ISPN-4021
>
> +1 to allow a cache name only. For the intermediate cache I don't think it makes
sense to allow a Cache version at all.
Ok good. Deal.
Thanks,
Vladimir
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev
Cheers,
--
Mircea Markus
Infinispan lead (
www.infinispan.org)