I'm gonna cc infinispan-dev as well, so others can pitch in.
But yeah, good start. I think your dissection of the use cases makes sense, except that I
think (a) and (c) are essentially the same; only that in the case of (c) the task ignores
any data input. This would raise the question of why would anybody do this, and what
purpose does it solve. :-) So yeah, we could support it as a part of (a), but I am less
than convinced of its value.
And as for data locality, perhaps this would work: don't define the necessary keys in
the Task, but instead provide these when submitting the task. E.g.,
CacheManager.submitTask(task); // task goes everywhere, and receives an input Iterator of
all local entries on each node.
CacheManager.submitTask(task, K...); // task goes to certain nodes that contain some or
all of K. Receives an input Iterator of all local entries on each node.
WDYT?
Cheers
Manik
On 15 Dec 2010, at 15:46, Vladimir Blagojevic wrote:
Manik,
Will write more towards the end if I remember something I forgot but here is my initial
feedback. I think two of us should focus on steps to finalize API. I am taking
Trustin's proposal as starting point because I really like it; it is very simple but
powerful. So the first thing that I want to review, something I think is missing, is that
Trustin's proposal covers only one case of task input and consequently does not
elaborate how tasks are mapped to execution nodes.
In order to be as clear as possible, lets go one step back. I can think of three distinct
cases of input:
a) all data of type T available on cluster is input for a task (covered by Trustin's
proposal)
b) subset of data of type T is an input for a task
c) no data from caches used as input for a task
For case a) we migrate/assign task units to all cache nodes, each migrated task fetches
local data avoiding double inclusion of input data i.e. do not fetch input data D of type
T on node N and D's replica on node M as well
For case b) we have to involve CH and find out where to migrate task units so that data
fetched is local once task units are moved to nodes N (N is a subset of entire cluster)
What do to for tasks c) if anything? Do we want to support this?
I think Trustin made a unintentional mistake in the example below by adding task units to
cm rather than task. I would clearly delineate distributed task as a whole and task units
which comprise distributed task.
So instead of:
// Now assemble the tasks into a larger task.
CacheManager cm = ...;
CoordinatedTask task = cm.newCoordinatedTask("finalOutcomeCacheName");
cm.addLocalTask(new GridFileQuery("my_gridfs")); // Fetch
cm.addLocalTask(new GridFileSizeCounter()); // Map
cm.addGlobalTask(new IntegerSummarizer("my_gridfs_size"); // Reduce
we have:
// Now assemble the tasks into a larger task.
CacheManager cm = ...;
DistributedTask task = cm.newDistributedTask("finalOutcomeCacheName",...);
task.addTaskUnit(new GridFileQuery("my_gridfs")); // Fetch
task.addTaskUnit(new GridFileSizeCounter()); // Map
task.addTaskUnit(DistributedTask.GLOBAL, new
IntegerSummarizer("my_gridfs_size"); // Reduce
So the grid file task example from wiki page falls under use case a) - we fetch all
available data of type GridFile.Metadata available on the cluster. There is no need to map
tasks across the cluster or consult CH - we migrate tasks to every node on the cluster and
feed node local data into task units pipeline.
For case b) we need to somehow indicate to DistributedTask what the input is so the task
and its units can be mapped across the cluster using CH. My suggestion is that
CacheManager#newDistributedTask factory method has a few forms. The first, which covers
use case a) with no additional parameters and another factory method similar to my
original proposal where we specify task input in the form Map<String,
Set<Object>> getRelatedKeys()
So maybe we can define an interface:
public interface DistributedTaskInput{
public Map<String, Set<Object>> getKeys();
}
and the factory method becomes:
public class CacheManager{
...
public DistributedTask newDistributedTask(String name,DistributedTaskInput input);
...
}
Now, we can distinguish this case, case b) from case a). For case b) when task is
executed we first have to ask CH where to migrate/map these tasks, everything else should
be the same as use case a) - at least when it comes to plumbing underneath. Of course,
task implementer is completely shielded from this. Also, there is no need for task
implementer to do the equivalent of GridFileQuery for case b). Since input is already
explicitly specified using newDistributedTask we can/need to somehow setup
EntryStreamProcessorContext with local input already set and pass it to the pipeline.
WDYT,
Vladimir
--
Manik Surtani
manik(a)jboss.org
twitter.com/maniksurtani
Lead, Infinispan
http://www.infinispan.org