[infinispan-dev] Further dist.exec and M/R API improvements

Mircea Markus mmarkus at redhat.com
Mon Feb 24 12:57:17 EST 2014


On Feb 19, 2014, at 8:45 PM, Vladimir Blagojevic <vblagoje at redhat.com> wrote:

> Hey guys,
> 
> As some of you might know we have received additional requirements from 
> community and internally to add a few things to dist.executors and 
> map/reduce API. On distributed executors front we need to enable 
> distributed executors to store results into cache directly rather than 
> returning them to invoker [1]. As soon as we introduce this API we also 
> need a asyc. mechanism to allow notifications of subtask 
> completion/failure.

I think we need both in at the same time :-)

> I was thinking we add a concept of 
> DistributedTaskExecutionListener which can be specified in 
> DistributedTaskBuilder:
> 
> DistributedTaskBuilder<T> 
> executionListener(DistributedTaskExecutionListener<K, T> listener);
> 
> 
> We needed DistributedTaskExecutionListener anyway. All distributed tasks 
> might use some feedback about task progress, completion/failure and on. 
> My proposal is roughly:
> 
> 
> public interface DistributedTaskExecutionListener<K, T> {
> 
>    void subtaskSent(Address node, Set<K> inputKeys);
>    void subtaskFailed(Address node, Set<K> inputKeys, Exception e);
>    void subtaskSucceded(Address node, Set<K> inputKeys, T result);
>    void allSubtasksCompleted();
> 
> }
> 
> So much for that.

I think this it would make sense to add this logic for monitoring, + additional info such as average execution time etc. I'm not sure if this is a generally useful API though, unless there were people asking for it already?

> If tasks do not use input keys these parameters would 
> be emply sets. Now for [1] we need to add additional methods to 
> DistributedExecutorService. We can not specify result cache in 
> DistributedTaskBuilder as we are still bound to only submit methods in 
> DistributedExecutorService that return futures and we don't want that. 
> We need two new void methods:
> 
> <T, K> void submitEverywhere(DistributedTask<T> task, 
> Cache<DistExecResultKey<K>, T> result);
> <T, K > void submitEverywhere(DistributedTask<T> task, 
> Cache<DistExecResultKey<K>, T> result, K... input);
> 
> 
> Now, why bother with DistExecResultKey? Well we have tasks that use 
> input keys and tasks that don't. So results cache could only be keyed by 
> either keys or execution address, or combination of those two. 
> Therefore, DistExecResultKey could be something like:
> 
> public interface DistExecResultKey<K> {
> 
>    Address getExecutionAddress();
>    K getKey();
> 
> }
> 
> If you have a better idea how to address this aspect let us know. So 
> much for distributed executors.
> 
> 
> For map/reduce we also have to enable storing of map reduce task results 
> into cache [2] and allow users to specify custom cache for intermediate 
> results[3]. Part of task [2] is to allow notification about map/reduce 
> task progress and completion. Just as in dist.executor I would add 
> MapReduceTaskExecutionListener interface:
> 
> 
> public interface MapReduceTaskExecutionListener {
> 
>    void mapTaskInitialized(Address executionAddress);
>    void mapTaskSucceeded(Address executionAddress);
>    void mapTaskFailed(Address executionTarget, Exception cause);
>    void mapPhaseCompleted();
> 
>    void reduceTaskInitialized(Address executionAddress);
>    void reduceTaskSucceeded(Address executionAddress);
>    void reduceTaskFailed(Address address, Exception cause);
>    void reducePhaseCompleted();
> 
> }

IMO - in the first stage at leas - I would rather use a simpler (Notifying)Future, on which the user can wait till the computation happens: it's simpler and more aligned with the rest of our async API.

> 
> while MapReduceTask would have an additional method:
> 
> public void execute(Cache<KOut, VOut> resultsCache);

you could overload it with cache name only method.

> 
> MapReduceTaskExecutionListener could be specified using fluent 
> MapReduceTask API just as intermediate cache would be:
> 
> public MapReduceTask<KIn, VIn, KOut, VOut> 
> usingIntermediateCache(Cache<KOut, List<VOut>> tmpCache);
> 
> thus addressing issue [3]
 

+1

> 
> Let me know what you think,
> Vladimir
> 
> 
> [1] https://issues.jboss.org/browse/ISPN-4030
> [2] https://issues.jboss.org/browse/ISPN-4002
> [3] https://issues.jboss.org/browse/ISPN-4021
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

Cheers,
-- 
Mircea Markus
Infinispan lead (www.infinispan.org)







More information about the infinispan-dev mailing list