[infinispan-dev] Distributed execution framework - API proposal(s)

Manik Surtani manik at jboss.org
Tue Jan 11 09:37:16 EST 2011


On 11 Jan 2011, at 14:18, Vladimir Blagojevic wrote:

> On 11-01-11 10:28 AM, Manik Surtani wrote:
>> Hi there.
>> 
>> Lets take the 2 API sets (M/R and distributed exec) separately:
>> 
>> M/R
>> * Why does MapReduceTask extend Set?  It probably shouldn't even 
>> extend DistributedRemoteTask, since some methods such as 
>> execute(Callable) doesn't make sense for MapReduceTask.  Perhaps a 
>> common super-interface?  That alone should simplify the generics...
> 
> Leftover that I did not catch. I meant Map<K1,K2> instead of 
> Set<Map.Entry<K2, V2>>; it does not extend Set it just constraints one 
> of the type parameters :-)
> 
> Completely agree on common parent that has the common setters for 
> various task aspects.
> 
>> * Why does Mapper.map() return a Map? [1] Shouldn't it return a single 
>> transformation for a given K and V?
> 
> Yes, I thought so too but how would we then match intermediate keys for 
> reduce phase. Our plumbing has to find all the same 
> transformed/intermediate keys returned from a map phase and then invoke 
> the reduce phase.

What's an intermediate key?  :-)  For each K, V pair, we call map(K, V) which transforms V to a T.  So all we expect from map() should be T.

>> * Reducer.reduce()'s return type should not necessarily be the same 
>> type as the mapped transformation.  Also, does the reducer really need 
>> the key? Surely just the transformed version of each K/V pair.
> 
> Yeah, when we solve the above this should fall into place.

Yep.  So we then just call reduce(T) for each value of T that we get from the mapping phase (along with the previous, reduced result of R from the previous invocation).  Just maybe, we may also need to mention the key for which the mapping took place.  In which case, reduce(T, R) would look like reduce(K, T, R).  But TBH I don't really see the use of passing in K.

> 
>> * Similarly, Collator.add() should just need the address and the 
>> reduced result from each node (each node would only produce 1 result!)
>> 
>> DistExec
>> * Why do you have execute() and executeAsync() with no params?  What 
>> do these methods do?
> 
> In case user provided Factory for DistributedCallable. We have to handle 
> cases where simple no parameter constructor for callables is not 
> sufficient - hence factory. 

Why are we constructing DistributedCallables?  Surely the user passes in an instance?

> Now that I think about it maybe adding K... 
> input as a parameter to call function of DistributedCallable makes more 
> sense, or no? WDYT?

Not sure this is needed either.  

	List<ResultType> res = new DistributedRemoteTask(cache).on(key1, key2).execute(myCallableInstance);


>> * Not sure what DistributedTaskContext is all about.  What does read() 
>> and write() do?  Is this meant to be a "layer" in front of the cache 
>> in question?  Why not just pass in a ref to the cache?
> 
> True.
> 
>> * What does Factory do?
> 
> Creates DistributedCallables. See above.

Also, isn't the name DistributedRemoteTask redundant?  :-)

Cheers
Manik

--
Manik Surtani
manik at jboss.org
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org






More information about the infinispan-dev mailing list