[infinispan-dev] Further dist.exec and M/R API improvements

Vladimir Blagojevic vblagoje at redhat.com
Mon Feb 24 15:55:43 EST 2014


See inline
On 2/24/2014, 12:57 PM, Mircea Markus wrote:
> On Feb 19, 2014, at 8:45 PM, Vladimir Blagojevic <vblagoje at redhat.com> wrote:
>
>> Hey guys,
>>
>> As some of you might know we have received additional requirements from
>> community and internally to add a few things to dist.executors and
>> map/reduce API. On distributed executors front we need to enable
>> distributed executors to store results into cache directly rather than
>> returning them to invoker [1]. As soon as we introduce this API we also
>> need a asyc. mechanism to allow notifications of subtask
>> completion/failure.
> I think we need both in at the same time :-)
Yes, that is what I actually meant. Poor wording.
>
>> I was thinking we add a concept of
>> DistributedTaskExecutionListener which can be specified in
>> DistributedTaskBuilder:
>>
>> DistributedTaskBuilder<T>
>> executionListener(DistributedTaskExecutionListener<K, T> listener);
>>
>>
>> We needed DistributedTaskExecutionListener anyway. All distributed tasks
>> might use some feedback about task progress, completion/failure and on.
>> My proposal is roughly:
>>
>>
>> public interface DistributedTaskExecutionListener<K, T> {
>>
>>     void subtaskSent(Address node, Set<K> inputKeys);
>>     void subtaskFailed(Address node, Set<K> inputKeys, Exception e);
>>     void subtaskSucceded(Address node, Set<K> inputKeys, T result);
>>     void allSubtasksCompleted();
>>
>> }
>>
>> So much for that.
> I think this it would make sense to add this logic for monitoring, + additional info such as average execution time etc. I'm not sure if this is a generally useful API though, unless there were people asking for it already?
Ok, noted. If you remember any references about this let me know and 
I'll incorporate what people actually asked for rather than guess.

>
>> If tasks do not use input keys these parameters would
>> be emply sets. Now for [1] we need to add additional methods to
>> DistributedExecutorService. We can not specify result cache in
>> DistributedTaskBuilder as we are still bound to only submit methods in
>> DistributedExecutorService that return futures and we don't want that.
>> We need two new void methods:
>>
>> <T, K> void submitEverywhere(DistributedTask<T> task,
>> Cache<DistExecResultKey<K>, T> result);
>> <T, K > void submitEverywhere(DistributedTask<T> task,
>> Cache<DistExecResultKey<K>, T> result, K... input);
>>
>>
>> Now, why bother with DistExecResultKey? Well we have tasks that use
>> input keys and tasks that don't. So results cache could only be keyed by
>> either keys or execution address, or combination of those two.
>> Therefore, DistExecResultKey could be something like:
>>
>> public interface DistExecResultKey<K> {
>>
>>     Address getExecutionAddress();
>>     K getKey();
>>
>> }
>>
>> If you have a better idea how to address this aspect let us know. So
>> much for distributed executors.
>>
>>
>> For map/reduce we also have to enable storing of map reduce task results
>> into cache [2] and allow users to specify custom cache for intermediate
>> results[3]. Part of task [2] is to allow notification about map/reduce
>> task progress and completion. Just as in dist.executor I would add
>> MapReduceTaskExecutionListener interface:
>>
>>
>> public interface MapReduceTaskExecutionListener {
>>
>>     void mapTaskInitialized(Address executionAddress);
>>     void mapTaskSucceeded(Address executionAddress);
>>     void mapTaskFailed(Address executionTarget, Exception cause);
>>     void mapPhaseCompleted();
>>
>>     void reduceTaskInitialized(Address executionAddress);
>>     void reduceTaskSucceeded(Address executionAddress);
>>     void reduceTaskFailed(Address address, Exception cause);
>>     void reducePhaseCompleted();
>>
>> }
> IMO - in the first stage at leas - I would rather use a simpler (Notifying)Future, on which the user can wait till the computation happens: it's simpler and more aligned with the rest of our async API.
>
What do you mean? We already have futures in MapReduceTask API. This API 
is more fine grained and allows monitoring/reporting of task progress. 
Please clarify.

>> while MapReduceTask would have an additional method:
>>
>> public void execute(Cache<KOut, VOut> resultsCache);
> you could overload it with cache name only method.
Yeah, good idea. Same for usingIntermediateCache? I actually asked you 
this here https://issues.jboss.org/browse/ISPN-4021

Thanks Mircea!
Vladimir


More information about the infinispan-dev mailing list