These two approaches are not mutually exclusive. Here is what we can
do.
We can have a very simple "execute this runnable on remote nodes and
collect results" model as an infrastructure for a higher layer
map/reduce you proposed Manik. That way we cover both ends - a simple
execution model on remote nodes for a given input data set and a more
sophisticated, original map/reduce model built on top of it. Users can
choose what fit their needs best. I can definitely see a need arising
for both of these approaches.
Ok, so something like what I proposed, plus:
DistributedCallable<T> extends Callable<T> () {
setEmbeddedCacheManager(ecm);
setCache(c);
}
DistributedRemoteTask dt = new DistributedRemoteTask(cache, new
DistributedCallable<T>() {... });
List<Future<T>> res = dt.invoke(); // executes on all nodes
List<Future<T>> res = dt.invoke(K...); // executes on all nodes containing
some or more of K
and expect the user to iterate over whatever is needed, performing any mapping,
transforming, reducing, etc in DistributedCallable.call()?
If so, then we need to define the following APIs (along with an example of using each, so
we are clear on how each API works).
M/R API:
Mapper<K, V, T>
Reducer<T, R>
Collator<R>
MapReduceTask<T, R>
DistributedCallable API:
DistributedCallable<K, V, R>
DistributedRemoteTask<R>
right?
MapReduceTask could extend DistributedRemoteTask... (maybe) ... and would need the
following overloaded invocation methods, perhaps:
R invoke(); //invoke on all nodes
R invoke(K...); // invoke on a subset of nodes
R invoke(Address...); // invoke on a subset of nodes
Future<R> invokeAsync(); //invoke on all nodes
Future<R> invokeAsync(K...); // invoke on a subset of nodes
Future<R> invokeAsync(Address...); // invoke on a subset of nodes
Or, perhaps, making use of a more fluent API:
R result = new
MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collate(collator);
Future<R> result = new
MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collateAsynchronously(collator);
List<R> result = new
DistributedRemoteTask(cache).on(K...).execute(distributedCallable);
List<Future<R>> result = new
DistributedRemoteTask(cache).on(K...).executeAsync(distributedCallable);
WDYT? I think the fluent API makes a lot of sense, and it is very clear/user friendly.
Cheers
Manik
WDYT?
On 11-01-05 12:46 AM, Vladimir Blagojevic wrote:
> Manik,
>
> Of course we could go this direction as well. This is very similar to
> Hadoop approach and close to original map/reduce paradigm. I
> intentionally changed this paradigm into a simpler one because I read a
> lot of criticism how it is very hard for "regular" developers to adapt
> to original map/reduce paradigm. Hence simpler approach of mapping
> runnable to execution nodes and collating results - unfortunately I
> named them map and reduce as well.
>
> Anyone else has an opinion while I think about this a bit more?
>
> Regards,
> Vladimir
>
>
>
>
> On 11-01-04 3:47 PM, Manik Surtani wrote:
>> Also, I think we need to be clear about these 2 (map and reduce) functions. Map
doesn't mean "pick node to run task on" in map/reduce speak. Map means
select /transform data for inclusion into a result set. Perhaps it also makes sense to
use smaller/simpler interfaces. I know this breaks away from the F/J API, but I'm
beginning to wonder if there is a proper alignment of purpose here in the first place -
going back on my original plans here. How's this for an alternate API:
>>
>> Mapper<K, V, T> {
>> // just "maps" entries on a remote node. Map = filter and transform.
Invoked once for each entry on a remote cache.
>> // null responses are ignored/filtered out.
>> T map(K, V);
>> }
>>
>> Reducer<T, R> {
>> // incrementally reduces a transformed entry. Called once for each T produced
by the mapper.
>> // previously reduced value passed in each time.
>> R reduce(T, R);
>> }
>>
>> Collator<R> {
>> // Adds reduced results from remote nodes. Called once for each R returned by a
RemoteReducer.
>> add(Address origin, R remote);
>>
>> // collates all results added so far.
>> R collate();
>> }
>>
>>
>> And the API could do something like
>>
>> MapReduceContext c = new MapReduceContext(cache);
>>
>> // 1) distributes 'mapper' cluster wide. Calls mapper.map() for each K/V
pair. Stores result T for each invocation if T != null.
>> // 2) For each T, reducer.reduce() is called. Each time, the previous value of R
is passed back in to reduce().
>> // 3) Final value of R is sent back as a RPC result. For each result, address
and R is passed collator.add()
>> // 4) Once all remote RPCs have responded, collator.collate() is called, pass
result back to caller.
>> R r = c.invoke(mapper, reducer, collator);
>>
>> Variants may include:
>>
>> Filtering nodes:
>> // restricts the set of nodes where RPCs are sent, based on the subset of the
cluster that contain one or more of K.
>> // question: does this mean only K/V pairs that are in K... are passed in to the
mapper?
>> R r = c.invoke(mapper, reducer, collator, K...);
>>
>> Using futures:
>> NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)
>>
>> Example: implementing a word count. but only for keys that start with
"text" :
>>
>> Mapper<String, String, Integer> mapper = new Mapper<String, String,
Integer> () {
>> Integer map(String k, String v) {
>> return k.startsWith("text") ? v.length() : null;
>> }
>> }
>>
>> Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
>> Integer reduce(Integer transformed, Integer prevReduced) {return transformed +
prevReduced;}
>> }
>>
>> Collator<Integer> collator = Collator<Integer>() {
>> int collated = 0;
>> void add(Address origin, Integer result) {collated += result;}
>> Integer collate() {return collated;}
>> }
>>
>>
>> WDYT? :-)
>>
>> Cheers
>> Manik
>>
>>
>>
>> On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
>>
>>> On 11-01-03 6:16 AM, Galder ZamarreƱo wrote:
>>>> Maybe I'm reading this wrong but are you saying that multiple caches
cause problem with mapping of task units to nodes in cluster?
>>>>
>>>> Or are you just doing it not to clutter the API?
>>> Clutter of API. If you did not like K,V,T,R imagine dealing with
>>> multiple cache confusion! It would be horrible.
>>>
>>>> I think DistributedTaskContext extending CacheContainer is rather
confusing, particularly when DistributedTaskContext has K,V parameters that generally are
associated with Cache rather than CacheContainer.
>>> Yes, true but DistributedTaskContext is primarily geared towards one
>>> cache while providing opportunity to read data from other caches as
>>> well. Hence K,V for the primary cache. Any suggestions how to deal with
>>> this in a more elegant way? Maybe pass DistributedTaskContext and
>>> CacheContainer as separate parameters?
>>>
>>>
>>>> Also, why is a context iterable? Iterates the contents of a
CacheContainer? extends generally means that "is something". AFAIK, you'd be
able to iterate a Map or Cache, but not a CacheContainer.
>>> True.
>>>
>>>> Personally, I think DistributedTask has too many generics (K, V, T, R)
and it's hard to read. IMO, only T and R should only exist. I would also try to stick
to Callable conventions that takes a V.
>>>>
>>>> I don't like to see things like this, reminds me of EJB 2.1 where you
were forced to implement a method to simply get hold of a ctx. There're much nicer
ways to do things like this, if completely necessary (see EJB3) :
>>> You mean injection? There is a proposal 2 that essentially does this.
>>>
>>>> @Override
>>>> public void mapped(DistributedTaskContext<String, String>
ctx) {
>>>> this.ctx = ctx;
>>>> }
>>>>
>>>> Looking at the example provided, it seems to me that all
DistributedTaskContext is used for is to navigate the Cache contents from a user defined
callable, in which case I would limit its scope.
>>> What do you mean - "limit its scope"?
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> infinispan-dev(a)lists.jboss.org
>>>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> --
>> Manik Surtani
>> manik(a)jboss.org
>>
twitter.com/maniksurtani
>>
>> Lead, Infinispan
>>
http://www.infinispan.org
>>
>>
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev(a)lists.jboss.org
>>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev(a)lists.jboss.org
>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev