[infinispan-dev] Distributed execution framework - API proposal(s)
Vladimir Blagojevic
vblagoje at redhat.com
Wed Jan 5 12:05:49 EST 2011
On 11-01-05 1:19 PM, Manik Surtani wrote:
> Ok, so something like what I proposed, plus:
>
> DistributedCallable<T> extends Callable<T> () {
> setEmbeddedCacheManager(ecm);
> setCache(c);
> }
>
> DistributedRemoteTask dt = new DistributedRemoteTask(cache, new DistributedCallable<T>() {... });
> List<Future<T>> res = dt.invoke(); // executes on all nodes
> List<Future<T>> res = dt.invoke(K...); // executes on all nodes containing some or more of K
>
> and expect the user to iterate over whatever is needed, performing any mapping, transforming, reducing, etc in DistributedCallable.call()?
Essentially yes. Lets merge it with
https://github.com/vblagoje/infinispan/commit/b6cd39fe56e1cced96926ed09e69f9d6a823d64e
Forget for a moment about misnamed map/reduce. We give access to users
to choose execution nodes they want or let CH handle it for them by
default. We could do invoke on task that returns futures as you
suggested and/or keep reduce and have invoke return R. Those are just
detail variations of the same concept. Furthermore, all distribution and
execution related policies would be set on DistributedTask level. There
would be one DistributedCallable per Infinispan node and that
DistributedCallable would be prepared for execution with
DistributedTaskContext given to that Callable would be setup to reflect
all K,V pairs residing on that node. Also users have access to other
caches if needed. Finally, see WordCount example to get a feel.
> If so, then we need to define the following APIs (along with an example of using each, so we are clear on how each API works).
>
> M/R API:
> Mapper<K, V, T>
> Reducer<T, R>
> Collator<R>
> MapReduceTask<T, R>
>
> DistributedCallable API:
> DistributedCallable<K, V, R>
> DistributedRemoteTask<R>
>
> right?
>
> MapReduceTask could extend DistributedRemoteTask... (maybe) ... and would need the following overloaded invocation methods, perhaps:
>
> R invoke(); //invoke on all nodes
> R invoke(K...); // invoke on a subset of nodes
> R invoke(Address...); // invoke on a subset of nodes
> Future<R> invokeAsync(); //invoke on all nodes
> Future<R> invokeAsync(K...); // invoke on a subset of nodes
> Future<R> invokeAsync(Address...); // invoke on a subset of nodes
>
> Or, perhaps, making use of a more fluent API:
>
> R result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collate(collator);
>
> Future<R> result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collateAsynchronously(collator);
>
> List<R> result = new DistributedRemoteTask(cache).on(K...).execute(distributedCallable);
>
> List<Future<R>> result = new DistributedRemoteTask(cache).on(K...).executeAsync(distributedCallable);
>
Awesome! I love this fluent API proposal!
Excellent! Anyone else?
> WDYT? I think the fluent API makes a lot of sense, and it is very clear/user friendly.
>
> Cheers
> Manik
>
More information about the infinispan-dev
mailing list