[infinispan-dev] Distributed execution framework - API proposal(s)

Manik Surtani manik at jboss.org
Wed Jan 5 11:19:13 EST 2011


On 5 Jan 2011, at 15:21, Vladimir Blagojevic wrote:

> These two approaches are not mutually exclusive. Here is what we can do. 
> We can have a very simple "execute this runnable on remote nodes and 
> collect results" model as an infrastructure for a higher layer 
> map/reduce you proposed Manik. That way we cover both ends - a simple 
> execution model on remote nodes for a given input data set and a more 
> sophisticated, original map/reduce model built on top of it. Users can 
> choose what fit their needs best. I can definitely see a need arising 
> for both of these approaches.

Ok, so something like what I proposed, plus:

DistributedCallable<T> extends Callable<T> () {
	setEmbeddedCacheManager(ecm);
	setCache(c);
}

DistributedRemoteTask dt = new DistributedRemoteTask(cache, new DistributedCallable<T>() {... });
List<Future<T>> res = dt.invoke(); // executes on all nodes
List<Future<T>> res = dt.invoke(K...); // executes on all nodes containing some or more of K

and expect the user to iterate over whatever is needed, performing any mapping, transforming, reducing, etc in DistributedCallable.call()?

If so, then we need to define the following APIs (along with an example of using each, so we are clear on how each API works).

M/R API:
	Mapper<K, V, T>
	Reducer<T, R>
	Collator<R>
	MapReduceTask<T, R>

DistributedCallable API:
	DistributedCallable<K, V, R>
	DistributedRemoteTask<R>

right?

MapReduceTask could extend DistributedRemoteTask... (maybe) ... and would need the following overloaded invocation methods, perhaps:

R invoke(); //invoke on all nodes
R invoke(K...); // invoke on a subset of nodes
R invoke(Address...); // invoke on a subset of nodes
Future<R> invokeAsync(); //invoke on all nodes
Future<R> invokeAsync(K...); // invoke on a subset of nodes
Future<R> invokeAsync(Address...); // invoke on a subset of nodes

Or, perhaps, making use of a more fluent API:

R result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collate(collator);

Future<R> result = new MapReduceTask(cache).on(K...).mappedWith(mapper).reducedWith(reducer).collateAsynchronously(collator);

List<R> result = new DistributedRemoteTask(cache).on(K...).execute(distributedCallable);

List<Future<R>> result = new DistributedRemoteTask(cache).on(K...).executeAsync(distributedCallable);


WDYT?  I think the fluent API makes a lot of sense, and it is very clear/user friendly.

Cheers
Manik


> 
> WDYT?
> 
> On 11-01-05 12:46 AM, Vladimir Blagojevic wrote:
>> Manik,
>> 
>> Of course we could go this direction as well. This is very similar to
>> Hadoop approach and close to original map/reduce paradigm. I
>> intentionally changed this paradigm into a simpler one because I read a
>> lot of criticism how it is very hard for "regular" developers to adapt
>> to original map/reduce paradigm. Hence simpler approach of mapping
>> runnable to execution nodes and collating results - unfortunately I
>> named them map and reduce as well.
>> 
>> Anyone else has an opinion while I think about this a bit more?
>> 
>> Regards,
>> Vladimir
>> 
>> 
>> 
>> 
>> On 11-01-04 3:47 PM, Manik Surtani wrote:
>>> Also, I think we need to be clear about these 2 (map and reduce) functions.  Map doesn't mean "pick node to run task on" in map/reduce speak.  Map means select /transform data for inclusion into a result set.  Perhaps it also makes sense to use smaller/simpler interfaces.  I know this breaks away from the F/J API, but I'm beginning to wonder if there is a proper alignment of purpose here in the first place - going back on my original plans here.  How's this for an alternate API:
>>> 
>>> Mapper<K, V, T>   {
>>> 	// just "maps" entries on a remote node.  Map = filter and transform.  Invoked once for each entry on a remote cache.
>>> 	// null responses are ignored/filtered out.
>>> 	T map(K, V);
>>> }
>>> 
>>> Reducer<T, R>   {
>>> 	// incrementally reduces a transformed entry.  Called once for each T produced by the mapper.
>>> 	// previously reduced value passed in each time.
>>> 	R reduce(T, R);
>>> }
>>> 
>>> Collator<R>   {
>>> 	// Adds reduced results from remote nodes.  Called once for each R returned by a RemoteReducer.
>>> 	add(Address origin, R remote);
>>> 
>>> 	// collates all results added so far.
>>> 	R collate();
>>> }
>>> 
>>> 
>>> And the API could do something like
>>> 
>>> MapReduceContext c = new MapReduceContext(cache);
>>> 
>>> // 1) distributes 'mapper' cluster wide.  Calls mapper.map() for each K/V pair.  Stores result T for each invocation if T != null.
>>> // 2) For each T, reducer.reduce() is called.  Each time, the previous value of R is passed back in to reduce().
>>> // 3) Final value of R is sent back as a RPC result.  For each result, address and R is passed collator.add()
>>> // 4) Once all remote RPCs have responded, collator.collate() is called, pass result back to caller.
>>> R r = c.invoke(mapper, reducer, collator);
>>> 
>>> Variants may include:
>>> 
>>> Filtering nodes:
>>> // restricts the set of nodes where RPCs are sent, based on the subset of the cluster that contain one or more of K.
>>> // question: does this mean only K/V pairs that are in K... are passed in to the mapper?
>>> R r = c.invoke(mapper, reducer, collator, K...);
>>> 
>>> Using futures:
>>> NotifyingFuture<R>   f = c.invokeFuture(mapper, reducer, collator)
>>> 
>>> Example:  implementing a word count. but only for keys that start with "text" :
>>> 
>>> Mapper<String, String, Integer>   mapper = new Mapper<String, String, Integer>   () {
>>>      Integer map(String k, String v) {
>>> 		return k.startsWith("text") ? v.length() : null;
>>> 	}
>>> }
>>> 
>>> Reducer<Integer, Integer>   reducer = Reducer<Integer, Integer>() {
>>> 	Integer reduce(Integer transformed, Integer prevReduced) {return transformed + prevReduced;}
>>> }
>>> 
>>> Collator<Integer>   collator = Collator<Integer>() {
>>> 	int collated = 0;
>>> 	void add(Address origin, Integer result) {collated += result;}
>>> 	Integer collate() {return collated;}
>>> }
>>> 
>>> 
>>> WDYT?  :-)
>>> 
>>> Cheers
>>> Manik
>>> 
>>> 
>>> 
>>> On 3 Jan 2011, at 11:37, Vladimir Blagojevic wrote:
>>> 
>>>> On 11-01-03 6:16 AM, Galder Zamarreño wrote:
>>>>> Maybe I'm reading this wrong but are you saying that multiple caches cause problem with mapping of task units to nodes in cluster?
>>>>> 
>>>>> Or are you just doing it not to clutter the API?
>>>> Clutter of API. If you did not like K,V,T,R imagine dealing with
>>>> multiple cache confusion! It would be horrible.
>>>> 
>>>>> I think DistributedTaskContext extending CacheContainer is rather confusing, particularly when DistributedTaskContext has K,V parameters that generally are associated with Cache rather than CacheContainer.
>>>> Yes, true but DistributedTaskContext is primarily geared towards one
>>>> cache while providing opportunity to read data from other caches as
>>>> well. Hence K,V for the primary cache. Any suggestions how to deal with
>>>> this in a more elegant way? Maybe pass DistributedTaskContext and
>>>> CacheContainer as separate parameters?
>>>> 
>>>> 
>>>>> Also, why is a context iterable? Iterates the contents of a CacheContainer? extends generally means that "is something". AFAIK, you'd be able to iterate a Map or Cache, but not a CacheContainer.
>>>> True.
>>>> 
>>>>> Personally, I think DistributedTask has too many generics (K, V, T, R) and it's hard to read. IMO, only T and R should only exist. I would also try to stick to Callable conventions that takes a V.
>>>>> 
>>>>> I don't like to see things like this, reminds me of EJB 2.1 where you were forced to implement a method to simply get hold of a ctx. There're much nicer ways to do things like this, if completely necessary (see EJB3) :
>>>> You mean injection? There is a proposal 2 that essentially does this.
>>>> 
>>>>>        @Override
>>>>>        public void mapped(DistributedTaskContext<String, String>    ctx) {
>>>>>           this.ctx = ctx;
>>>>>        }
>>>>> 
>>>>> Looking at the example provided, it seems to me that all DistributedTaskContext is used for is to navigate the Cache contents from a user defined callable, in which case I would limit its scope.
>>>> What do you mean - "limit its scope"?
>>>> 
>>>> _______________________________________________
>>>> infinispan-dev mailing list
>>>> infinispan-dev at lists.jboss.org
>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>> --
>>> Manik Surtani
>>> manik at jboss.org
>>> twitter.com/maniksurtani
>>> 
>>> Lead, Infinispan
>>> http://www.infinispan.org
>>> 
>>> 
>>> 
>>> 
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> infinispan-dev at lists.jboss.org
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> 
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Manik Surtani
manik at jboss.org
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org






More information about the infinispan-dev mailing list