[infinispan-dev] Distributed execution framework - API proposal(s)
Vladimir Blagojevic
vblagoje at redhat.com
Wed Jan 5 10:21:09 EST 2011
These two approaches are not mutually exclusive. Here is what we can do.
We can have a very simple "execute this runnable on remote nodes and
collect results" model as an infrastructure for a higher layer
map/reduce you proposed Manik. That way we cover both ends - a simple
execution model on remote nodes for a given input data set and a more
sophisticated, original map/reduce model built on top of it. Users can
choose what fit their needs best. I can definitely see a need arising
for both of these approaches.
WDYT?
On 11-01-05 12:46 AM, Vladimir Blagojevic wrote:
> Manik,
>
> Of course we could go this direction as well. This is very similar to
> Hadoop approach and close to original map/reduce paradigm. I
> intentionally changed this paradigm into a simpler one because I read a
> lot of criticism how it is very hard for "regular" developers to adapt
> to original map/reduce paradigm. Hence simpler approach of mapping
> runnable to execution nodes and collating results - unfortunately I
> named them map and reduce as well.
>
> Anyone else has an opinion while I think about this a bit more?
>
> Regards,
> Vladimir
>
>
>
>
> On 11-01-04 3:47 PM, Manik Surtani wrote:
>> Also, I think we need to be clear about these 2 (map and reduce) functions. Map doesn't mean "pick node to run task on" in map/reduce speak. Map means select /transform data for inclusion into a result set. Perhaps it also makes sense to use smaller/simpler interfaces. I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here. How's this for an alternate API:
>>
>> Mapper<K, V, T> {
>> // just "maps" entries on a remote node. Map = filter and transform. Invoked once for each entry on a remote cache.
>> // null responses are ignored/filtered out.
>> T map(K, V);
>> }
>>
>> Reducer<T, R> {
>> // incrementally reduces a transformed entry. Called once for each T produced by the mapper.
>> // previously reduced value passed in each time.
>> R reduce(T, R);
>> }
>>
>> Collator<R> {
>> // Adds reduced results from remote nodes. Called once for each R returned by a RemoteReducer.
>> add(Address origin, R remote);
>>
>> // collates all results added so far.
>> R collate();
>> }
>>
>>
>> And the API could do something like
>>
>> MapReduceContext c = new MapReduceContext(cache);
>>
>> // 1) distributes 'mapper' cluster wide. Calls mapper.map() for each K/V pair. Stores result T for each invocation if T != null.
>> // 2) For each T, reducer.reduce() is called. Each time, the previous value of R is passed back in to reduce().
>> // 3) Final value of R is sent back as a RPC result. For each result, address and R is passed collator.add()
>> // 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
>> R r = c.invoke(mapper, reducer, collator);
>>
>> Variants may include:
>>
>> Filtering nodes:
>> // restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
>> // question: does this mean only K/V pairs that are in K... are passed in to the mapper?
>> R r = c.invoke(mapper, reducer, collator, K...);
>>
>> Using futures:
>> NotifyingFuture<R> f = c.invokeFuture(mapper, reducer, collator)
>>
>> Example: implementing a word count. but only for keys that start with "text" :
>>
>> Mapper<String, String, Integer> mapper = new Mapper<String, String, Integer> () {
>> Integer map(String k, String v) {
>> return k.startsWith("text") ? v.length() : null;
>> }
>> }
>>
>> Reducer<Integer, Integer> reducer = Reducer<Integer, Integer>() {
>> Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
>> }
>>
>> Collator<Integer> collator = Collator<Integer>() {
>> int collated = 0;
>> void add(Address origin, Integer result) {collated += result;}
>> Integer collate() {return collated;}
>> }
>>
>>
>> WDYT? :-)
>>
>> Cheers
>> Manik
>>
>>
>>
>> On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
>>
>>> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>>>> Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?
>>>>
>>>> Or are you just doing it not to clutter the API?
>>> Clutter of API. If you did not like K,V,T,R imagine dealing with
>>> multiple cache confusion! It would be horrible.
>>>
>>>> I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.
>>> Yes, true but DistributedTaskContext is primarily geared towards one
>>> cache while providing opportunity to read data from other caches as
>>> well. Hence K,V for the primary cache. Any suggestions how to deal with
>>> this in a more elegant way? Maybe pass DistributedTaskContext and
>>> CacheContainer as separate parameters?
>>>
>>>
>>>> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.
>>> True.
>>>
>>>> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.
>>>>
>>>> I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :
>>> You mean injection? There is a proposal 2 that essentially does this.
>>>
>>>> @Override
>>>> public void mapped(DistributedTaskContext<String, String> ctx) {
>>>> this.ctx = ctx;
>>>> }
>>>>
>>>> Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.
>>> What do you mean - "limit its scope"?
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> infinispan-dev at lists.jboss.org
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> --
>> Manik Surtani
>> manik at jboss.org
>> twitter.com/maniksurtani
>>
>> Lead, Infinispan
>> http://www.infinispan.org
>>
>>
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
More information about the infinispan-dev
mailing list