[infinispan-dev] Distributed execution framework - API proposal(s)

Vladimir Blagojevic vblagoje at redhat.com
Tue Jan 4 22:46:40 EST 2011


Manik,

Of course we could go this direction as well. This is very similar to 
Hadoop approach and close to original map/reduce paradigm. I 
intentionally changed this paradigm into a simpler one because I read a 
lot of criticism how it is very hard for "regular" developers to adapt 
to original map/reduce paradigm. Hence simpler approach of mapping 
runnable to execution nodes and collating results - unfortunately I 
named them map and reduce as well.

Anyone else has an opinion while I think about this a bit more?

Regards,
Vladimir




On 11-01-04 3:47 PM, Manik Surtani wrote:
> Also, I think we need to be clear about these 2 (map and reduce) functions.  Map doesn't mean "pick node to run task on" in map/reduce speak.  Map means select /transform data for inclusion into a result set.  Perhaps it also makes sense to use smaller/simpler interfaces.  I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here.  How's this for an alternate API:
>
> Mapper<K, V, T>  {
> 	// just "maps" entries on a remote node.  Map = filter and transform.  Invoked once for each entry on a remote cache.
> 	// null responses are ignored/filtered out.
> 	T map(K, V);
> }
>
> Reducer<T, R>  {
> 	// incrementally reduces a transformed entry.  Called once for each T produced by the mapper.
> 	// previously reduced value passed in each time.
> 	R reduce(T, R);
> }
>
> Collator<R>  {
> 	// Adds reduced results from remote nodes.  Called once for each R returned by a RemoteReducer.
> 	add(Address origin, R remote);
>
> 	// collates all results added so far.
> 	R collate();
> }
>
>
> And the API could do something like
>
> MapReduceContext c = new MapReduceContext(cache);
>
> // 1) distributes 'mapper' cluster wide.  Calls mapper.map() for each K/V pair.  Stores result T for each invocation if T != null.
> // 2) For each T, reducer.reduce() is called.  Each time, the previous value of R is passed back in to reduce().
> // 3) Final value of R is sent back as a RPC result.  For each result, address and R is passed collator.add()
> // 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
> R r = c.invoke(mapper, reducer, collator);
>
> Variants may include:
>
> Filtering nodes:
> // restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
> // question: does this mean only K/V pairs that are in K... are passed in to the mapper?
> R r = c.invoke(mapper, reducer, collator, K...);
>
> Using futures:
> NotifyingFuture<R>  f = c.invokeFuture(mapper, reducer, collator)
>
> Example:  implementing a word count. but only for keys that start with "text" :
>
> Mapper<String, String, Integer>  mapper = new Mapper<String, String, Integer>  () {
>      Integer map(String k, String v) {
> 		return k.startsWith("text") ? v.length() : null;
> 	}
> }
>
> Reducer<Integer, Integer>  reducer = Reducer<Integer, Integer>() {
> 	Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
> }
>
> Collator<Integer>  collator = Collator<Integer>() {
> 	int collated = 0;
> 	void add(Address origin, Integer result) {collated += result;}
> 	Integer collate() {return collated;}
> }
>
>
> WDYT?  :-)
>
> Cheers
> Manik
>
>
>
> On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
>
>> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>>> Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?
>>>
>>> Or are you just doing it not to clutter the API?
>> Clutter of API. If you did not like K,V,T,R imagine dealing with
>> multiple cache confusion! It would be horrible.
>>
>>> I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.
>> Yes, true but DistributedTaskContext is primarily geared towards one
>> cache while providing opportunity to read data from other caches as
>> well. Hence K,V for the primary cache. Any suggestions how to deal with
>> this in a more elegant way? Maybe pass DistributedTaskContext and
>> CacheContainer as separate parameters?
>>
>>
>>> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.
>> True.
>>
>>> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.
>>>
>>> I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :
>> You mean injection? There is a proposal 2 that essentially does this.
>>
>>>        @Override
>>>        public void mapped(DistributedTaskContext<String, String>   ctx) {
>>>           this.ctx = ctx;
>>>        }
>>>
>>> Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.
>> What do you mean - "limit its scope"?
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> --
> Manik Surtani
> manik at jboss.org
> twitter.com/maniksurtani
>
> Lead, Infinispan
> http://www.infinispan.org
>
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev



More information about the infinispan-dev mailing list