[infinispan-dev] Further dist.exec and M/R API improvements

Dan Berindei dan.berindei at gmail.com
Tue Feb 25 07:33:32 EST 2014


On Mon, Feb 24, 2014 at 10:55 PM, Vladimir Blagojevic
<vblagoje at redhat.com>wrote:

> See inline
> On 2/24/2014, 12:57 PM, Mircea Markus wrote:
> > On Feb 19, 2014, at 8:45 PM, Vladimir Blagojevic <vblagoje at redhat.com>
> wrote:
> >
> >> Hey guys,
> >>
> >> As some of you might know we have received additional requirements from
> >> community and internally to add a few things to dist.executors and
> >> map/reduce API. On distributed executors front we need to enable
> >> distributed executors to store results into cache directly rather than
> >> returning them to invoker [1]. As soon as we introduce this API we also
> >> need a asyc. mechanism to allow notifications of subtask
> >> completion/failure.
> > I think we need both in at the same time :-)
> Yes, that is what I actually meant. Poor wording.
>

Do we really need special support for distributed tasks to write results to
another cache? We already allow a task to do

cache.getCacheManager().getCache("outputCache").put(k, v)



> >
> >> I was thinking we add a concept of
> >> DistributedTaskExecutionListener which can be specified in
> >> DistributedTaskBuilder:
> >>
> >> DistributedTaskBuilder<T>
> >> executionListener(DistributedTaskExecutionListener<K, T> listener);
> >>
> >>
> >> We needed DistributedTaskExecutionListener anyway. All distributed tasks
> >> might use some feedback about task progress, completion/failure and on.
> >> My proposal is roughly:
> >>
> >>
> >> public interface DistributedTaskExecutionListener<K, T> {
> >>
> >>     void subtaskSent(Address node, Set<K> inputKeys);
> >>     void subtaskFailed(Address node, Set<K> inputKeys, Exception e);
> >>     void subtaskSucceded(Address node, Set<K> inputKeys, T result);
> >>     void allSubtasksCompleted();
> >>
> >> }
> >>
> >> So much for that.
> > I think this it would make sense to add this logic for monitoring, +
> additional info such as average execution time etc. I'm not sure if this is
> a generally useful API though, unless there were people asking for it
> already?
> Ok, noted. If you remember any references about this let me know and
> I'll incorporate what people actually asked for rather than guess.
>

Ok, let's wait until we get some actual requests from users then. TBH I
don't think distributed tasks with subtasks are something that users care
about. E.g. with Map/Reduce the reduce tasks are not subtasks of the
map/combine tasks, so this API wouldn't help.

Hadoop has a Reporter interface that allows you to report "ticks" and
increment counters, maybe we should add something like that instead?


>
> >
> >> If tasks do not use input keys these parameters would
> >> be emply sets. Now for [1] we need to add additional methods to
> >> DistributedExecutorService. We can not specify result cache in
> >> DistributedTaskBuilder as we are still bound to only submit methods in
> >> DistributedExecutorService that return futures and we don't want that.
> >> We need two new void methods:
> >>
> >> <T, K> void submitEverywhere(DistributedTask<T> task,
> >> Cache<DistExecResultKey<K>, T> result);
> >> <T, K > void submitEverywhere(DistributedTask<T> task,
> >> Cache<DistExecResultKey<K>, T> result, K... input);
> >>
> >>
> >> Now, why bother with DistExecResultKey? Well we have tasks that use
> >> input keys and tasks that don't. So results cache could only be keyed by
> >> either keys or execution address, or combination of those two.
> >> Therefore, DistExecResultKey could be something like:
> >>
> >> public interface DistExecResultKey<K> {
> >>
> >>     Address getExecutionAddress();
> >>     K getKey();
> >>
> >> }
> >>
> >> If you have a better idea how to address this aspect let us know. So
> >> much for distributed executors.
> >>
>

I think we should allow each distributed task to deal with output in its
own way, the existing API should be enough.


>  >>
> >> For map/reduce we also have to enable storing of map reduce task results
> >> into cache [2] and allow users to specify custom cache for intermediate
> >> results[3]. Part of task [2] is to allow notification about map/reduce
> >> task progress and completion. Just as in dist.executor I would add
> >> MapReduceTaskExecutionListener interface:
> >>
> >>
> >> public interface MapReduceTaskExecutionListener {
> >>
> >>     void mapTaskInitialized(Address executionAddress);
> >>     void mapTaskSucceeded(Address executionAddress);
> >>     void mapTaskFailed(Address executionTarget, Exception cause);
> >>     void mapPhaseCompleted();
> >>
> >>     void reduceTaskInitialized(Address executionAddress);
> >>     void reduceTaskSucceeded(Address executionAddress);
> >>     void reduceTaskFailed(Address address, Exception cause);
> >>     void reducePhaseCompleted();
> >>
> >> }
> > IMO - in the first stage at leas - I would rather use a simpler
> (Notifying)Future, on which the user can wait till the computation happens:
> it's simpler and more aligned with the rest of our async API.
> >
> What do you mean? We already have futures in MapReduceTask API. This API
> is more fine grained and allows monitoring/reporting of task progress.
> Please clarify.
>

I'm not sure about the usefulness of an API like this either... if the
intention is to allow the user to collect statistics about duration of
various phases, then I think exposing the durations via MapReduceTasks
would be better.


>
> >> while MapReduceTask would have an additional method:
> >>
> >> public void execute(Cache<KOut, VOut> resultsCache);
> > you could overload it with cache name only method.
> Yeah, good idea. Same for usingIntermediateCache? I actually asked you
> this here https://issues.jboss.org/browse/ISPN-4021
>

+1 to allow a cache name only. For the intermediate cache I don't think it
makes sense to allow a Cache version at all.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20140225/7048c1d6/attachment.html 


More information about the infinispan-dev mailing list