[infinispan-dev] Distributed execution framework - API proposal(s)
Sanne Grinovero
sanne.grinovero at gmail.com
Tue Jan 4 18:20:07 EST 2011
As a data-oriented API, why should I deal with details such as Address?
Could we avoid that?
Also I didn't see a Collator in other examples; I've never used M/R seriusly
so I might have forgotten the more complex examples, but my impression was
that problems should be entirely expressed in Mapper and Reducer only.
Sanne
On 4 Jan 2011 18:47, "Manik Surtani" <manik at jboss.org> wrote:
Also, I think we need to be clear about these 2 (map and reduce) functions.
Map doesn't mean "pick node to run task on" in map/reduce speak. Map means
select /transform data for inclusion into a result set. Perhaps it also
makes sense to use smaller/simpler interfaces. I know this breaks away from
the F/J API, but I'm beginning to wonder if there is a proper alignment of
purpose here in the first place - going back on my original plans here.
How's this for an alternate API:
Mapper<K, V, T> {
// just "maps" entries on a remote node. Map = filter and transform.
Invoked once for each entry on a remote cache.
// null responses are ignored/filtered out.
T map(K, V);
}
Reducer<T, R> {
// incrementally reduces a transformed entry. Called once for each T
produced by the mapper.
// previously reduced value passed in each time.
R reduce(T, R);
}
Collator<R> {
// Adds reduced results from remote nodes. Called once for each R
returned by a RemoteReducer.
add(Address origin, R remote);
// collates all results added so far.
R collate();
}
And the API could do something like
MapReduceContext c = new MapReduceContext(cache);
// 1) distributes 'mapper' cluster wide. Calls mapper.map() for each K/V
pair. Stores result T for each invocation if T != null.
// 2) For each T, reducer.reduce() is called. Each time, the previous value
of R is passed back in to reduce().
// 3) Final value of R is sent back as a RPC result. For each result,
address and R is passed collator.add()
// 4) Once all remote RPCs have responded, collator.collate() is called,
pass result back to caller.
R r = c.invoke(mapper, reducer, collator);
Variants may include:
Filtering nodes:
// restricts the set of nodes where RPCs are sent, based on the subset of
the cluster that contain one or more of K.
// question: does this mean only K/V pairs that are in K... are passed in to
the mapper?
R r = c.invoke(mapper, reducer, collator, K...);
Using futures:
NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)
Example: implementing a word count. but only for keys that start with
"text" :
Mapper<String, String, Integer> mapper = new Mapper<String, String, Integer>
() {
Integer map(String k, String v) {
return k.startsWith("text") ? v.length() : null;
}
}
Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
Integer reduce(Integer transformed, Integer prevReduced) {return
transformed + prevReduced;}
}
Collator<Integer> collator = Collator<Integer>() {
int collated = 0;
void add(Address origin, Integer result) {collated += result;}
Integer collate() {return collated;}
}
WDYT? :-)
Cheers
Manik
On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>> Maybe I'm reading this wrong but are you saying th...
--
Manik Surtani
manik at jboss.org
twitter.com/maniksurtani
Lead, Infinispan
http://www.infinispan.or...
infinispan-dev mailing list
infinispan-dev at lists.jboss.org
https://lists.jboss.org/mailman/listinfo/...
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20110104/250f8b76/attachment-0001.html
More information about the infinispan-dev
mailing list