[infinispan-dev] Further dist.exec and M/R API improvements

Mircea Markus mmarkus at redhat.com
Tue Feb 25 11:30:25 EST 2014


On Feb 25, 2014, at 3:09 PM, Vladimir Blagojevic <vblagoje at redhat.com> wrote:

> On 2/25/2014, 7:33 AM, Dan Berindei wrote:
>> 
>> 
>> Do we really need special support for distributed tasks to write results to another cache? We already allow a task to do
>> 
>> cache.getCacheManager().getCache("outputCache").put(k, v)
> Yeah, very good point Dan. Thanks for being sanity check. Mircea? 
+1


>> 
>>  
>> >
>> >> I was thinking we add a concept of
>> >> DistributedTaskExecutionListener which can be specified in
>> >> DistributedTaskBuilder:
>> >>
>> >> DistributedTaskBuilder<T>
>> >> executionListener(DistributedTaskExecutionListener<K, T> listener);
>> >>
>> >>
>> >> We needed DistributedTaskExecutionListener anyway. All distributed tasks
>> >> might use some feedback about task progress, completion/failure and on.
>> >> My proposal is roughly:
>> >>
>> >>
>> >> public interface DistributedTaskExecutionListener<K, T> {
>> >>
>> >>     void subtaskSent(Address node, Set<K> inputKeys);
>> >>     void subtaskFailed(Address node, Set<K> inputKeys, Exception e);
>> >>     void subtaskSucceded(Address node, Set<K> inputKeys, T result);
>> >>     void allSubtasksCompleted();
>> >>
>> >> }
>> >>
>> >> So much for that.
>> > I think this it would make sense to add this logic for monitoring, + additional info such as average execution time etc. I'm not sure if this is a generally useful API though, unless there were people asking for it already?
>> Ok, noted. If you remember any references about this let me know and
>> I'll incorporate what people actually asked for rather than guess.
>> 
>> Ok, let's wait until we get some actual requests from users then. TBH I don't think distributed tasks with subtasks are something that users care about. E.g. with Map/Reduce the reduce tasks are not subtasks of the map/combine tasks, so this API wouldn't help.
>> 
>> Hadoop has a Reporter interface that allows you to report "ticks" and increment counters, maybe we should add something like that instead?
> 
> The subtask I am referring to here is just to denote part of the distributed task initiated using dist.executors. This interface (maybe extended a bit with ideas from Reporter) could be used for both monitoring and more application specific logic about task re-execution and so on.
> 
> 
>>  
>> 
>> I think we should allow each distributed task to deal with output in its own way, the existing API should be enough.
> 
> Yes, I can see your point. Mircea?

+1 user driven features

>>  
>> 
>> >> public interface MapReduceTaskExecutionListener {
>> >>
>> >>     void mapTaskInitialized(Address executionAddress);
>> >>     void mapTaskSucceeded(Address executionAddress);
>> >>     void mapTaskFailed(Address executionTarget, Exception cause);
>> >>     void mapPhaseCompleted();
>> >>
>> >>     void reduceTaskInitialized(Address executionAddress);
>> >>     void reduceTaskSucceeded(Address executionAddress);
>> >>     void reduceTaskFailed(Address address, Exception cause);
>> >>     void reducePhaseCompleted();
>> >>
>> >> }
>> > IMO - in the first stage at leas - I would rather use a simpler (Notifying)Future, on which the user can wait till the computation happens: it's simpler and more aligned with the rest of our async API.
>> >
>> What do you mean? We already have futures in MapReduceTask API. This API
>> is more fine grained and allows monitoring/reporting of task progress.
>> Please clarify.

ah right, wasn't aware of MapReduceTask.executeAsynchronously() :-) That's what I was after.

>> 
>> I'm not sure about the usefulness of an API like this either... if the intention is to allow the user to collect statistics about duration of various phases, then I think exposing the durations via MapReduceTasks would be better.
> How would you design that API Dan? Something other than listener/callback interface?

Functionally, what I was having in mind was JMX stats for the MapReduce tasks in general: like average execution time, count etc. Also the ability to cancel a running task through JMX/JON would be nice. I don't think we need to expose this to the user through the MapReduceTaskExecutionListener above, though.

> 
>>  
>> 
>> >> while MapReduceTask would have an additional method:
>> >>
>> >> public void execute(Cache<KOut, VOut> resultsCache);
>> > you could overload it with cache name only method.
>> Yeah, good idea. Same for usingIntermediateCache? I actually asked you
>> this here https://issues.jboss.org/browse/ISPN-4021
>> 
>> +1 to allow a cache name only. For the intermediate cache I don't think it makes sense to allow a Cache version at all.
> Ok good. Deal.
> 
> 
> Thanks,
> Vladimir
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

Cheers,
-- 
Mircea Markus
Infinispan lead (www.infinispan.org)







More information about the infinispan-dev mailing list