The Address is purely additional info if the collator cares to filter out stuff from
certain nodes.
And the whole concept of the collator is just another reduce - couldn't think of a
better name. Often it makes sense to reduce once remotely and once again locally.
Hope this makes sense. :)
Sent from my iPhone
On 4 Jan 2011, at 23:20, Sanne Grinovero <sanne.grinovero(a)gmail.com> wrote:
As a data-oriented API, why should I deal with details such as
Address? Could we avoid that?
Also I didn't see a Collator in other examples; I've never used M/R seriusly so I
might have forgotten the more complex examples, but my impression was that problems should
be entirely expressed in Mapper and Reducer only.
Sanne
> On 4 Jan 2011 18:47, "Manik Surtani" <manik(a)jboss.org> wrote:
>
> Also, I think we need to be clear about these 2 (map and reduce) functions. Map
doesn't mean "pick node to run task on" in map/reduce speak. Map means
select /transform data for inclusion into a result set. Perhaps it also makes sense to
use smaller/simpler interfaces. I know this breaks away from the F/J API, but I'm
beginning to wonder if there is a proper alignment of purpose here in the first place -
going back on my original plans here. How's this for an alternate API:
>
> Mapper<K, V, T> {
> // just "maps" entries on a remote node. Map = filter and
transform. Invoked once for each entry on a remote cache.
> // null responses are ignored/filtered out.
> T map(K, V);
> }
>
> Reducer<T, R> {
> // incrementally reduces a transformed entry. Called once for each T produced
by the mapper.
> // previously reduced value passed in each time.
> R reduce(T, R);
> }
>
> Collator<R> {
> // Adds reduced results from remote nodes. Called once for each R returned by
a RemoteReducer.
> add(Address origin, R remote);
>
> // collates all results added so far.
> R collate();
> }
>
>
> And the API could do something like
>
> MapReduceContext c = new MapReduceContext(cache);
>
> // 1) distributes 'mapper' cluster wide. Calls mapper.map() for each K/V
pair. Stores result T for each invocation if T != null.
> // 2) For each T, reducer.reduce() is called. Each time, the previous value of R is
passed back in to reduce().
> // 3) Final value of R is sent back as a RPC result. For each result, address and R
is passed collator.add()
> // 4) Once all remote RPCs have responded, collator.collate() is called, pass result
back to caller.
> R r = c.invoke(mapper, reducer, collator);
>
> Variants may include:
>
> Filtering nodes:
> // restricts the set of nodes where RPCs are sent, based on the subset of the cluster
that contain one or more of K.
> // question: does this mean only K/V pairs that are in K... are passed in to the
mapper?
> R r = c.invoke(mapper, reducer, collator, K...);
>
> Using futures:
> NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)
>
> Example: implementing a word count. but only for keys that start with
"text" :
>
> Mapper<String, String, Integer> mapper = new Mapper<String, String,
Integer> () {
> Integer map(String k, String v) {
> return k.startsWith("text") ? v.length() : null;
> }
> }
>
> Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
> Integer reduce(Integer transformed, Integer prevReduced) {return transformed +
prevReduced;}
> }
>
> Collator<Integer> collator = Collator<Integer>() {
> int collated = 0;
> void add(Address origin, Integer result) {collated += result;}
> Integer collate() {return collated;}
> }
>
>
> WDYT? :-)
>
> Cheers
> Manik
>
>
>
> On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
>
> > On 11-01-03 6:16 AM, Galder Zamarreño wrote:
> >> Maybe I'm reading this wrong but are you saying th...
>
> --
> Manik Surtani
> manik(a)jboss.org
>
twitter.com/maniksurtani
>
> Lead, Infinispan
>
http://www.infinispan.or...
>
> infinispan-dev mailing list
> infinispan-dev(a)lists.jboss.org
>
https://lists.jboss.org/mailman/listinfo/...
>
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev