On 11-01-05 1:19 PM, Manik Surtani wrote:
Ok, so something like what I proposed, plus:
DistributedCallable<T> extends Callable<T> () {
setEmbeddedCacheManager(ecm);
setCache(c);
}
DistributedRemoteTask dt = new DistributedRemoteTask(cache, new
DistributedCallable<T>() {... });
List<Future<T>> res = dt.invoke(); // executes on all nodes
List<Future<T>> res = dt.invoke(K...); // executes on all nodes containing
some or more of K
and expect the user to iterate over whatever is needed, performing any mapping,
transforming, reducing, etc in DistributedCallable.call()?
Essentially yes. Lets merge it with
https://github.com/vblagoje/infinispan/commit/b6cd39fe56e1cced96926ed09e6...
Forget for a moment about misnamed map/reduce. We give access to users
to choose execution nodes they want or let CH handle it for them by
default. We could do invoke on task that returns futures as you
suggested and/or keep reduce and have invoke return R. Those are just
detail variations of the same concept. Furthermore, all distribution and
execution related policies would be set on DistributedTask level. There
would be one DistributedCallable per Infinispan node and that
DistributedCallable would be prepared for execution with
DistributedTaskContext given to that Callable would be setup to reflect
all K,V pairs residing on that node. Also users have access to other
caches if needed. Finally, see WordCount example to get a feel.
If so, then we need to define the following APIs (along with an
example of using each, so we are clear on how each API works).
M/R API:
Mapper<K, V, T>
Reducer<T, R>
Collator<R>
MapReduceTask<T, R>
DistributedCallable API:
DistributedCallable<K, V, R>
DistributedRemoteTask<R>
right?
MapReduceTask could extend DistributedRemoteTask... (maybe) ... and would need the
following overloaded invocation methods, perhaps:
R invoke(); //invoke on all nodes
R invoke(K...); // invoke on a subset of nodes
R invoke(Address...); // invoke on a subset of nodes
Future<R> invokeAsync(); //invoke on all nodes
Future<R> invokeAsync(K...); // invoke on a subset of nodes
Future<R> invokeAsync(Address...); // invoke on a subset of nodes
Or, perhaps, making use of a more fluent API:
R result = new
MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collate(collator);
Future<R> result = new
MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collateAsynchronously(collator);
List<R> result = new
DistributedRemoteTask(cache).on(K...).execute(distributedCallable);
List<Future<R>> result = new
DistributedRemoteTask(cache).on(K...).executeAsync(distributedCallable);
Awesome! I love this fluent API proposal!
Excellent! Anyone else?
WDYT? I think the fluent API makes a lot of sense, and it is very
clear/user friendly.
Cheers
Manik