Also, I think we need to be clear about these 2 (map and reduce) functions. Map
doesn't mean "pick node to run task on" in map/reduce speak. Map means
select /transform data for inclusion into a result set. Perhaps it also makes sense to
use smaller/simpler interfaces. I know this breaks away from the F/J API, but I'm
beginning to wonder if there is a proper alignment of purpose here in the first place -
going back on my original plans here. How's this for an alternate API:
Mapper<K, V, T> {
// just "maps" entries on a remote node. Map = filter and transform. Invoked
once for each entry on a remote cache.
// null responses are ignored/filtered out.
T map(K, V);
}
Reducer<T, R> {
// incrementally reduces a transformed entry. Called once for each T produced by the
mapper.
// previously reduced value passed in each time.
R reduce(T, R);
}
Collator<R> {
// Adds reduced results from remote nodes. Called once for each R returned by a
RemoteReducer.
add(Address origin, R remote);
// collates all results added so far.
R collate();
}
And the API could do something like
MapReduceContext c = new MapReduceContext(cache);
// 1) distributes 'mapper' cluster wide. Calls mapper.map() for each K/V pair.
Stores result T for each invocation if T != null.
// 2) For each T, reducer.reduce() is called. Each time, the previous value of R is
passed back in to reduce().
// 3) Final value of R is sent back as a RPC result. For each result, address and R is
passed collator.add()
// 4) Once all remote RPCs have responded, collator.collate() is called, pass result back
to caller.
R r = c.invoke(mapper, reducer, collator);
Variants may include:
Filtering nodes:
// restricts the set of nodes where RPCs are sent, based on the subset of the cluster that
contain one or more of K.
// question: does this mean only K/V pairs that are in K... are passed in to the mapper?
R r = c.invoke(mapper, reducer, collator, K...);
Using futures:
NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)
Example: implementing a word count. but only for keys that start with "text" :
Mapper<String, String, Integer> mapper = new Mapper<String, String, Integer>
() {
Integer map(String k, String v) {
return k.startsWith("text") ? v.length() : null;
}
}
Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
Integer reduce(Integer transformed, Integer prevReduced) {return transformed +
prevReduced;}
}
Collator<Integer> collator = Collator<Integer>() {
int collated = 0;
void add(Address origin, Integer result) {collated += result;}
Integer collate() {return collated;}
}
WDYT? :-)
Cheers
Manik
On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
On 11-01-03 6:16 AM, Galder ZamarreƱo wrote:
> Maybe I'm reading this wrong but are you saying that multiple caches cause
problem with mapping of task units to nodes in cluster?
>
> Or are you just doing it not to clutter the API?
Clutter of API. If you did not like K,V,T,R imagine dealing with
multiple cache confusion! It would be horrible.
> I think DistributedTaskContext extending CacheContainer is rather confusing,
particularly when DistributedTaskContext has K,V parameters that generally are associated
with Cache rather than CacheContainer.
Yes, true but DistributedTaskContext is primarily geared towards one
cache while providing opportunity to read data from other caches as
well. Hence K,V for the primary cache. Any suggestions how to deal with
this in a more elegant way? Maybe pass DistributedTaskContext and
CacheContainer as separate parameters?
> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends
generally means that "is something". AFAIK, you'd be able to iterate a Map
or Cache, but not a CacheContainer.
True.
> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's
hard to read. IMO, only T and R should only exist. I would also try to stick to Callable
conventions that takes a V.
>
> I don't like to see things like this, reminds me of EJB 2.1 where you were forced
to implement a method to simply get hold of a ctx. There're much nicer ways to do
things like this, if completely necessary (see EJB3) :
You mean injection? There is a proposal 2 that essentially does this.
> @Override
> public void mapped(DistributedTaskContext<String, String> ctx) {
> this.ctx = ctx;
> }
>
> Looking at the example provided, it seems to me that all DistributedTaskContext is
used for is to navigate the Cache contents from a user defined callable, in which case I
would limit its scope.
What do you mean - "limit its scope"?
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev
--
Manik Surtani
manik(a)jboss.org
twitter.com/maniksurtani
Lead, Infinispan
http://www.infinispan.org