Manik,
Of course we could go this direction as well. This is very similar to
Hadoop approach and close to original map/reduce paradigm. I
intentionally changed this paradigm into a simpler one because I read a
lot of criticism how it is very hard for "regular" developers to adapt
to original map/reduce paradigm. Hence simpler approach of mapping
runnable to execution nodes and collating results - unfortunately I
named them map and reduce as well.
Anyone else has an opinion while I think about this a bit more?
Regards,
Vladimir
On 11-01-04 3:47 PM, Manik Surtani wrote:
Also, I think we need to be clear about these 2 (map and reduce)
functions. Map doesn't mean "pick node to run task on" in map/reduce speak.
Map means select /transform data for inclusion into a result set. Perhaps it also makes
sense to use smaller/simpler interfaces. I know this breaks away from the F/J API, but
I'm beginning to wonder if there is a proper alignment of purpose here in the first
place - going back on my original plans here. How's this for an alternate API:
Mapper<K, V, T> {
// just "maps" entries on a remote node. Map = filter and transform. Invoked
once for each entry on a remote cache.
// null responses are ignored/filtered out.
T map(K, V);
}
Reducer<T, R> {
// incrementally reduces a transformed entry. Called once for each T produced by the
mapper.
// previously reduced value passed in each time.
R reduce(T, R);
}
Collator<R> {
// Adds reduced results from remote nodes. Called once for each R returned by a
RemoteReducer.
add(Address origin, R remote);
// collates all results added so far.
R collate();
}
And the API could do something like
MapReduceContext c = new MapReduceContext(cache);
// 1) distributes 'mapper' cluster wide. Calls mapper.map() for each K/V pair.
Stores result T for each invocation if T != null.
// 2) For each T, reducer.reduce() is called. Each time, the previous value of R is
passed back in to reduce().
// 3) Final value of R is sent back as a RPC result. For each result, address and R is
passed collator.add()
// 4) Once all remote RPCs have responded, collator.collate() is called, pass result back
to caller.
R r = c.invoke(mapper, reducer, collator);
Variants may include:
Filtering nodes:
// restricts the set of nodes where RPCs are sent, based on the subset of the cluster
that contain one or more of K.
// question: does this mean only K/V pairs that are in K... are passed in to the mapper?
R r = c.invoke(mapper, reducer, collator, K...);
Using futures:
NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)
Example: implementing a word count. but only for keys that start with "text"
:
Mapper<String, String, Integer> mapper = new Mapper<String, String, Integer>
() {
Integer map(String k, String v) {
return k.startsWith("text") ? v.length() : null;
}
}
Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
Integer reduce(Integer transformed, Integer prevReduced) {return transformed +
prevReduced;}
}
Collator<Integer> collator = Collator<Integer>() {
int collated = 0;
void add(Address origin, Integer result) {collated += result;}
Integer collate() {return collated;}
}
WDYT? :-)
Cheers
Manik
On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
> On 11-01-03 6:16 AM, Galder ZamarreƱo wrote:
>> Maybe I'm reading this wrong but are you saying that multiple caches cause
problem with mapping of task units to nodes in cluster?
>>
>> Or are you just doing it not to clutter the API?
> Clutter of API. If you did not like K,V,T,R imagine dealing with
> multiple cache confusion! It would be horrible.
>
>> I think DistributedTaskContext extending CacheContainer is rather confusing,
particularly when DistributedTaskContext has K,V parameters that generally are associated
with Cache rather than CacheContainer.
> Yes, true but DistributedTaskContext is primarily geared towards one
> cache while providing opportunity to read data from other caches as
> well. Hence K,V for the primary cache. Any suggestions how to deal with
> this in a more elegant way? Maybe pass DistributedTaskContext and
> CacheContainer as separate parameters?
>
>
>> Also, why is a context iterable? Iterates the contents of a CacheContainer?
extends generally means that "is something". AFAIK, you'd be able to iterate
a Map or Cache, but not a CacheContainer.
> True.
>
>> Personally, I think DistributedTask has too many generics (K, V, T, R) and
it's hard to read. IMO, only T and R should only exist. I would also try to stick to
Callable conventions that takes a V.
>>
>> I don't like to see things like this, reminds me of EJB 2.1 where you were
forced to implement a method to simply get hold of a ctx. There're much nicer ways to
do things like this, if completely necessary (see EJB3) :
> You mean injection? There is a proposal 2 that essentially does this.
>
>> @Override
>> public void mapped(DistributedTaskContext<String, String> ctx) {
>> this.ctx = ctx;
>> }
>>
>> Looking at the example provided, it seems to me that all DistributedTaskContext
is used for is to navigate the Cache contents from a user defined callable, in which case
I would limit its scope.
> What do you mean - "limit its scope"?
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev(a)lists.jboss.org
>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
--
Manik Surtani
manik(a)jboss.org
twitter.com/maniksurtani
Lead, Infinispan
http://www.infinispan.org
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev