[infinispan-dev] Distributed tasks - specifying task input

Manik Surtani manik at jboss.org
Wed Dec 15 13:03:15 EST 2010


I'm gonna cc infinispan-dev as well, so others can pitch in.

But yeah, good start. I think your dissection of the use cases makes sense, except that I think (a) and (c) are essentially the same; only that in the case of (c) the task ignores any data input.  This would raise the question of why would anybody do this, and what purpose does it solve. :-)  So yeah, we could support it as a part of (a), but I am less than convinced of its value.  

And as for data locality, perhaps this would work: don't define the necessary keys in the Task, but instead provide these when submitting the task. E.g.,

CacheManager.submitTask(task); // task goes everywhere, and receives an input Iterator of all local entries on each node.
CacheManager.submitTask(task, K...); // task goes to certain nodes that contain some or all of K.  Receives an input Iterator of all local entries on each node.

WDYT?

Cheers
Manik


On 15 Dec 2010, at 15:46, Vladimir Blagojevic wrote:

> Manik,
> 
> Will write more towards the end if I remember something I forgot but here is my initial feedback. I think two of us should focus on steps to finalize API. I am taking Trustin's proposal as starting point because I really like it; it is very simple but powerful. So the first thing that I want to review, something I think is missing, is that Trustin's proposal covers only one case of task input and consequently does not elaborate how tasks are mapped to execution nodes.
> 
> In order to be as clear as possible, lets go one step back. I can think of three distinct cases of input:
> 
> a) all data of type T available on cluster is input for a task (covered by Trustin's proposal)
> b) subset of data of type T is an input for a task
> c) no data from caches used as input for a task
> 
> For case a) we migrate/assign task units to all cache nodes, each migrated task fetches local data avoiding double inclusion of input data i.e. do not fetch input data D of type T on node N and D's replica on node M as well
> 
> For case b) we have to involve CH and find out where to migrate task units so that data fetched is local once task units are moved to nodes N (N is a subset of entire cluster)
> 
> What do to for tasks c) if anything? Do we want to support this?
> 
> I think Trustin made a unintentional mistake in the example below by adding task units to cm rather than task. I would clearly delineate distributed task as a whole and task units which comprise distributed task.
> 
> So instead of:
> 
> // Now assemble the tasks into a larger task.
> CacheManager cm = ...;
> CoordinatedTask task = cm.newCoordinatedTask("finalOutcomeCacheName");
> cm.addLocalTask(new GridFileQuery("my_gridfs"));          // Fetch
> cm.addLocalTask(new GridFileSizeCounter());               // Map
> cm.addGlobalTask(new IntegerSummarizer("my_gridfs_size"); // Reduce
> 
> we have:
> 
> // Now assemble the tasks into a larger task.
> CacheManager cm = ...;
> DistributedTask task = cm.newDistributedTask("finalOutcomeCacheName",...);
> task.addTaskUnit(new GridFileQuery("my_gridfs"));          // Fetch
> task.addTaskUnit(new GridFileSizeCounter());               // Map
> task.addTaskUnit(DistributedTask.GLOBAL, new IntegerSummarizer("my_gridfs_size"); // Reduce
> 
> 
> So the grid file task example from wiki page falls under use case a) - we fetch all available data of type GridFile.Metadata available on the cluster. There is no need to map tasks across the cluster or consult CH - we migrate tasks to every node on the cluster and feed node local data into task units pipeline.
> 
> For case b) we need to somehow indicate to DistributedTask what the input is so the task and its units can be mapped across the cluster using CH. My suggestion is that CacheManager#newDistributedTask factory method has a few forms. The first, which covers use case a) with no additional parameters and another factory method similar to my original proposal where we specify task input in the form Map<String, Set<Object>> getRelatedKeys()
> 
> So maybe we can define an interface:
> 
> public interface DistributedTaskInput{
> public Map<String, Set<Object>> getKeys();
> }
> 
> and the factory method becomes:
> 
> public class CacheManager{
> ...
> public DistributedTask newDistributedTask(String name,DistributedTaskInput input);
> ...
> }
> 
> Now, we can distinguish this case, case b) from case a). For case b) when task is executed we first have to ask CH where to migrate/map these tasks, everything else should be the same as use case a) - at least when it comes to plumbing underneath. Of course, task implementer is completely shielded from this. Also, there is no need for task implementer to do the equivalent of GridFileQuery for case b). Since input is already explicitly specified using newDistributedTask we can/need to somehow setup EntryStreamProcessorContext with local input already set and pass it to the pipeline.
> 
> WDYT,
> Vladimir
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 

--
Manik Surtani
manik at jboss.org
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org






More information about the infinispan-dev mailing list