[infinispan-dev] Further dist.exec and M/R API improvements

Vladimir Blagojevic vblagoje at redhat.com
Wed Feb 19 15:45:45 EST 2014


Hey guys,

As some of you might know we have received additional requirements from 
community and internally to add a few things to dist.executors and 
map/reduce API. On distributed executors front we need to enable 
distributed executors to store results into cache directly rather than 
returning them to invoker [1]. As soon as we introduce this API we also 
need a asyc. mechanism to allow notifications of subtask 
completion/failure. I was thinking we add a concept of 
DistributedTaskExecutionListener which can be specified in 
DistributedTaskBuilder:

DistributedTaskBuilder<T> 
executionListener(DistributedTaskExecutionListener<K, T> listener);


We needed DistributedTaskExecutionListener anyway. All distributed tasks 
might use some feedback about task progress, completion/failure and on. 
My proposal is roughly:


public interface DistributedTaskExecutionListener<K, T> {

    void subtaskSent(Address node, Set<K> inputKeys);
    void subtaskFailed(Address node, Set<K> inputKeys, Exception e);
    void subtaskSucceded(Address node, Set<K> inputKeys, T result);
    void allSubtasksCompleted();

}

So much for that. If tasks do not use input keys these parameters would 
be emply sets. Now for [1] we need to add additional methods to 
DistributedExecutorService. We can not specify result cache in 
DistributedTaskBuilder as we are still bound to only submit methods in 
DistributedExecutorService that return futures and we don't want that. 
We need two new void methods:

<T, K> void submitEverywhere(DistributedTask<T> task, 
Cache<DistExecResultKey<K>, T> result);
<T, K > void submitEverywhere(DistributedTask<T> task, 
Cache<DistExecResultKey<K>, T> result, K... input);


Now, why bother with DistExecResultKey? Well we have tasks that use 
input keys and tasks that don't. So results cache could only be keyed by 
either keys or execution address, or combination of those two. 
Therefore, DistExecResultKey could be something like:

public interface DistExecResultKey<K> {

    Address getExecutionAddress();
    K getKey();

}

If you have a better idea how to address this aspect let us know. So 
much for distributed executors.


For map/reduce we also have to enable storing of map reduce task results 
into cache [2] and allow users to specify custom cache for intermediate 
results[3]. Part of task [2] is to allow notification about map/reduce 
task progress and completion. Just as in dist.executor I would add 
MapReduceTaskExecutionListener interface:


public interface MapReduceTaskExecutionListener {

    void mapTaskInitialized(Address executionAddress);
    void mapTaskSucceeded(Address executionAddress);
    void mapTaskFailed(Address executionTarget, Exception cause);
    void mapPhaseCompleted();

    void reduceTaskInitialized(Address executionAddress);
    void reduceTaskSucceeded(Address executionAddress);
    void reduceTaskFailed(Address address, Exception cause);
    void reducePhaseCompleted();

}

while MapReduceTask would have an additional method:

public void execute(Cache<KOut, VOut> resultsCache);

MapReduceTaskExecutionListener could be specified using fluent 
MapReduceTask API just as intermediate cache would be:

public MapReduceTask<KIn, VIn, KOut, VOut> 
usingIntermediateCache(Cache<KOut, List<VOut>> tmpCache);

thus addressing issue [3].

Let me know what you think,
Vladimir


[1] https://issues.jboss.org/browse/ISPN-4030
[2] https://issues.jboss.org/browse/ISPN-4002
[3] https://issues.jboss.org/browse/ISPN-4021


More information about the infinispan-dev mailing list