As a data-oriented API, why should I deal with details such as Address? Could we avoid that?
Also I didn't see a Collator in other examples; I've never used M/R seriusly so I might have forgotten the more complex examples, but my impression was that problems should be entirely expressed in Mapper and Reducer only.

Sanne

On 4 Jan 2011 18:47, "Manik Surtani" <manik@jboss.org> wrote:

Also, I think we need to be clear about these 2 (map and reduce) functions.  Map doesn't mean "pick node to run task on" in map/reduce speak.  Map means select /transform data for inclusion into a result set.  Perhaps it also makes sense to use smaller/simpler interfaces.  I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here.  How's this for an alternate API:

Mapper<K, V, T> {
       // just "maps" entries on a remote node.  Map = filter and transform.  Invoked once for each entry on a remote cache.
       // null responses are ignored/filtered out.
       T map(K, V);
}

Reducer<T, R> {
       // incrementally reduces a transformed entry.  Called once for each T produced by the mapper.
       // previously reduced value passed in each time.
       R reduce(T, R);
}

Collator<R> {
       // Adds reduced results from remote nodes.  Called once for each R returned by a RemoteReducer.
       add(Address origin, R remote);

       // collates all results added so far.
       R collate();
}


And the API could do something like

MapReduceContext c = new MapReduceContext(cache);

// 1) distributes 'mapper' cluster wide.  Calls mapper.map() for each K/V pair.  Stores result T for each invocation if T != null.
// 2) For each T, reducer.reduce() is called.  Each time, the previous value of R is passed back in to reduce().
// 3) Final value of R is sent back as a RPC result.  For each result, address and R is passed collator.add()
// 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
R r = c.invoke(mapper, reducer, collator);

Variants may include:

Filtering nodes:
// restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
// question: does this mean only K/V pairs that are in K... are passed in to the mapper?
R r = c.invoke(mapper, reducer, collator, K...);

Using futures:
NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)

Example:  implementing a word count. but only for keys that start with "text" :

Mapper<String, String, Integer> mapper = new Mapper<String, String, Integer> () {
   Integer map(String k, String v) {
               return k.startsWith("text") ? v.length() : null;
       }
}

Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
       Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
}

Collator<Integer> collator = Collator<Integer>() {
       int collated = 0;
       void add(Address origin, Integer result) {collated += result;}
       Integer collate() {return collated;}
}


WDYT?  :-)

Cheers
Manik




On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:

> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>> Maybe I'm reading this wrong but are you saying th...

--
Manik Surtani
manik@jboss.org
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.or...

infinispan-dev mailing list
infinispan-dev@lists.jboss.org
https://lists.jboss.org/mailman/listinfo/...