[infinispan-dev] Distributed execution framework - API proposal(s)

Vladimir Blagojevic vblagoje at redhat.com
Wed Jan 5 12:05:49 EST 2011


On 11-01-05 1:19 PM, Manik Surtani wrote:
> Ok, so something like what I proposed, plus:
>
> DistributedCallable<T>  extends Callable<T>  () {
> 	setEmbeddedCacheManager(ecm);
> 	setCache(c);
> }
>
> DistributedRemoteTask dt = new DistributedRemoteTask(cache, new DistributedCallable<T>() {... });
> List<Future<T>>  res = dt.invoke(); // executes on all nodes
> List<Future<T>>  res = dt.invoke(K...); // executes on all nodes containing some or more of K
>
> and expect the user to iterate over whatever is needed, performing any mapping, transforming, reducing, etc in DistributedCallable.call()?

Essentially yes. Lets merge it with 
https://github.com/vblagoje/infinispan/commit/b6cd39fe56e1cced96926ed09e69f9d6a823d64e

Forget for a moment about misnamed map/reduce. We give access to users 
to choose execution nodes they want or let CH handle it for them by 
default. We could do invoke on task that returns futures as you 
suggested and/or keep reduce and have invoke return R. Those are just 
detail variations of the same concept. Furthermore, all distribution and 
execution related policies would be set on DistributedTask level. There 
would be one DistributedCallable per Infinispan node and that 
DistributedCallable would be prepared for execution with 
DistributedTaskContext given to that Callable would be setup to reflect 
all K,V pairs residing on that node. Also users have access to other 
caches if needed. Finally, see WordCount example to get a feel.



> If so, then we need to define the following APIs (along with an example of using each, so we are clear on how each API works).
>
> M/R API:
> 	Mapper<K, V, T>
> 	Reducer<T, R>
> 	Collator<R>
> 	MapReduceTask<T, R>
>
> DistributedCallable API:
> 	DistributedCallable<K, V, R>
> 	DistributedRemoteTask<R>
>
> right?
>
> MapReduceTask could extend DistributedRemoteTask... (maybe) ... and would need the following overloaded invocation methods, perhaps:
>
> R invoke(); //invoke on all nodes
> R invoke(K...); // invoke on a subset of nodes
> R invoke(Address...); // invoke on a subset of nodes
> Future<R>  invokeAsync(); //invoke on all nodes
> Future<R>  invokeAsync(K...); // invoke on a subset of nodes
> Future<R>  invokeAsync(Address...); // invoke on a subset of nodes
>
> Or, perhaps, making use of a more fluent API:
>
> R result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collate(collator);
>
> Future<R>  result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collateAsynchronously(collator);
>
> List<R>  result = new DistributedRemoteTask(cache).on(K...).execute(distributedCallable);
>
> List<Future<R>>  result = new DistributedRemoteTask(cache).on(K...).executeAsync(distributedCallable);
>



Awesome! I love this fluent API proposal!

Excellent! Anyone else?



> WDYT?  I think the fluent API makes a lot of sense, and it is very clear/user friendly.
>
> Cheers
> Manik
>



More information about the infinispan-dev mailing list