I want to roughly outline what I believe needs to be done to implement basic distributed
execution framework for 5.0 Final.
If you recall, Distributed/MapReduce task is a logical work unit consisting of multiple
distributed executables executed individually across Infinispan cluster. Each individual
task execution on Infinispan cluster is governed by failover, load balancing and execution
policies.
Failover policy
Failover policy regulates how and if distributed task executables are migrated to backup
execution nodes in case of failure.
Executable can fail due to:
- exception raised in task implementation during execution
- node crash/leave
- migration failure to/from target execution node
Infinispan will invoke failover mechanism in all above cases except when exception is
raised by task executable itself. Exception will be returned to invoker of distributed
task who can act upon it.
By default, there will be two failover policies: failover off, and failover on. If
failover is on load balancing policy in place will decide where to migrate task executable
for execution. In case failure is off task invoker will be notified.
Load balancing policy
Load balancing policy decides how distributed task executables are dispersed for execution
around Infinispan cluster. By default data collocating load balancing policy is used as
soon as distributed task is invoked on a set of keys in cache. Other, simpler, load
balancing policies can be implemented as well if a need arises.
Execution policy
Execution policy decides how task executable is executed once it has been migrated to an
execution node. By default priority queue is used for queuing of execution task
executables. Users can, if needed, fine-tune task priority on per task basis. If priority
is not changed for all tasks then all their executables are effectively queued fifo on
execution nodes.
Time permitting, job stealing policy should be implemented taking into account ideas from
fork/join framework and applying it in a distributed fashion amongst Infinispan nodes.
Implementation sketches
In order to implement distributed/mapreduce task execution I believe we should reuse
existing Infinispan infrastructure (marshalling, remote command invocation, interceptor
chain, thread pools) as much as possible.
As user submits distributed task we would locate Infinispan nodes where the input keys are
located and send executables (DistributedCallable/Mapper/Reducer) to those nodes using
exisiting remote command invocation mechanism. Decision about migration of executables is
effectively done by load balancing policy, the default one being collocating policy.
When executables wrapped into commands arrive to Infinispan nodes they are handed off to a
special handling object (execution policy) rather than invocation handler. Execution
policy interacts with execution container and in turn queues and monitors executables as
they are executed in container's thread pool. DistributedCallable(s) are invoked and
results returned to invoking node. Mappers are invoked as well and their results handed
off to Reducers as described in mapreduce algorithm. Eventually a result of each Reducer
is also returned to task invoker and in turn Collator is invoked.
In case of task failure due to exception raised in task itself, exception is returned to
task submitter. In other cases, failover policy along with load balancing policy decides
how to migrate executable to other Infinispan nodes.
If you think that I omitted something and/or have suggestion let me know.
Regards,
Vladimir
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev