As a data-oriented API, why should I deal with details such as Address? Could we avoid that?
Also I didn't see a Collator in other examples; I've never used M/R seriusly so I might have forgotten the more complex examples, but my impression was that problems should be entirely expressed in Mapper and Reducer only.
Sanne
On 4 Jan 2011 18:47, "Manik Surtani" <manik@jboss.org> wrote:
Also, I think we need to be clear about these 2 (map and reduce) functions. Map doesn't mean "pick node to run task on" in map/reduce speak. Map means select /transform data for inclusion into a result set. Perhaps it also makes sense to use smaller/simpler interfaces. I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here. How's this for an alternate API:
Mapper<K, V, T> {
// just "maps" entries on a remote node. Map = filter and transform. Invoked once for each entry on a remote cache.
// null responses are ignored/filtered out.
T map(K, V);
}
Reducer<T, R> {
// incrementally reduces a transformed entry. Called once for each T produced by the mapper.
// previously reduced value passed in each time.
R reduce(T, R);
}
Collator<R> {
// Adds reduced results from remote nodes. Called once for each R returned by a RemoteReducer.
add(Address origin, R remote);
// collates all results added so far.
R collate();
}
And the API could do something like
MapReduceContext c = new MapReduceContext(cache);
// 1) distributes 'mapper' cluster wide. Calls mapper.map() for each K/V pair. Stores result T for each invocation if T != null.
// 2) For each T, reducer.reduce() is called. Each time, the previous value of R is passed back in to reduce().
// 3) Final value of R is sent back as a RPC result. For each result, address and R is passed collator.add()
// 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
R r = c.invoke(mapper, reducer, collator);
Variants may include:
Filtering nodes:
// restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
// question: does this mean only K/V pairs that are in K... are passed in to the mapper?
R r = c.invoke(mapper, reducer, collator, K...);
Using futures:
NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)
Example: implementing a word count. but only for keys that start with "text" :
Mapper<String, String, Integer> mapper = new Mapper<String, String, Integer> () {
Integer map(String k, String v) {
return k.startsWith("text") ? v.length() : null;
}
}
Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
}
Collator<Integer> collator = Collator<Integer>() {
int collated = 0;
void add(Address origin, Integer result) {collated += result;}
Integer collate() {return collated;}
}
WDYT? :-)
Cheers
Manik
On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>> Maybe I'm reading this wrong but are you saying th...--
Manik Surtani
manik@jboss.org
twitter.com/maniksurtani
Lead, Infinispan
http://www.infinispan.or...infinispan-dev mailing list
infinispan-dev@lists.jboss.org
https://lists.jboss.org/mailman/listinfo/...