See inline
On 2/24/2014, 12:57 PM, Mircea Markus wrote:Yes, that is what I actually meant. Poor wording.
> On Feb 19, 2014, at 8:45 PM, Vladimir Blagojevic <vblagoje@redhat.com> wrote:
>
>> Hey guys,
>>
>> As some of you might know we have received additional requirements from
>> community and internally to add a few things to dist.executors and
>> map/reduce API. On distributed executors front we need to enable
>> distributed executors to store results into cache directly rather than
>> returning them to invoker [1]. As soon as we introduce this API we also
>> need a asyc. mechanism to allow notifications of subtask
>> completion/failure.
> I think we need both in at the same time :-)
>Ok, noted. If you remember any references about this let me know and
>> I was thinking we add a concept of
>> DistributedTaskExecutionListener which can be specified in
>> DistributedTaskBuilder:
>>
>> DistributedTaskBuilder<T>
>> executionListener(DistributedTaskExecutionListener<K, T> listener);
>>
>>
>> We needed DistributedTaskExecutionListener anyway. All distributed tasks
>> might use some feedback about task progress, completion/failure and on.
>> My proposal is roughly:
>>
>>
>> public interface DistributedTaskExecutionListener<K, T> {
>>
>> void subtaskSent(Address node, Set<K> inputKeys);
>> void subtaskFailed(Address node, Set<K> inputKeys, Exception e);
>> void subtaskSucceded(Address node, Set<K> inputKeys, T result);
>> void allSubtasksCompleted();
>>
>> }
>>
>> So much for that.
> I think this it would make sense to add this logic for monitoring, + additional info such as average execution time etc. I'm not sure if this is a generally useful API though, unless there were people asking for it already?
I'll incorporate what people actually asked for rather than guess.
>
>> If tasks do not use input keys these parameters would
>> be emply sets. Now for [1] we need to add additional methods to
>> DistributedExecutorService. We can not specify result cache in
>> DistributedTaskBuilder as we are still bound to only submit methods in
>> DistributedExecutorService that return futures and we don't want that.
>> We need two new void methods:
>>
>> <T, K> void submitEverywhere(DistributedTask<T> task,
>> Cache<DistExecResultKey<K>, T> result);
>> <T, K > void submitEverywhere(DistributedTask<T> task,
>> Cache<DistExecResultKey<K>, T> result, K... input);
>>
>>
>> Now, why bother with DistExecResultKey? Well we have tasks that use
>> input keys and tasks that don't. So results cache could only be keyed by
>> either keys or execution address, or combination of those two.
>> Therefore, DistExecResultKey could be something like:
>>
>> public interface DistExecResultKey<K> {
>>
>> Address getExecutionAddress();
>> K getKey();
>>
>> }
>>
>> If you have a better idea how to address this aspect let us know. So
>> much for distributed executors.
>>
What do you mean? We already have futures in MapReduceTask API. This API>>
>> For map/reduce we also have to enable storing of map reduce task results
>> into cache [2] and allow users to specify custom cache for intermediate
>> results[3]. Part of task [2] is to allow notification about map/reduce
>> task progress and completion. Just as in dist.executor I would add
>> MapReduceTaskExecutionListener interface:
>>
>>
>> public interface MapReduceTaskExecutionListener {
>>
>> void mapTaskInitialized(Address executionAddress);
>> void mapTaskSucceeded(Address executionAddress);
>> void mapTaskFailed(Address executionTarget, Exception cause);
>> void mapPhaseCompleted();
>>
>> void reduceTaskInitialized(Address executionAddress);
>> void reduceTaskSucceeded(Address executionAddress);
>> void reduceTaskFailed(Address address, Exception cause);
>> void reducePhaseCompleted();
>>
>> }
> IMO - in the first stage at leas - I would rather use a simpler (Notifying)Future, on which the user can wait till the computation happens: it's simpler and more aligned with the rest of our async API.
>
is more fine grained and allows monitoring/reporting of task progress.
Please clarify.
Yeah, good idea. Same for usingIntermediateCache? I actually asked you
>> while MapReduceTask would have an additional method:
>>
>> public void execute(Cache<KOut, VOut> resultsCache);
> you could overload it with cache name only method.
this here https://issues.jboss.org/browse/ISPN-4021