[infinispan-dev] Distributed execution framework - update

Vladimir Blagojevic vblagoje at redhat.com
Tue Mar 1 13:07:17 EST 2011


I want to roughly outline what I believe needs to be done to implement 
basic distributed
execution framework for 5.0 Final.

If you recall, Distributed/MapReduce task is a logical work unit 
consisting of multiple distributed
executables executed individually across Infinispan cluster. Each 
individual task execution
on Infinispan cluster is governed by failover, load balancing and 
execution policies.

Failover policy

Failover policy regulates how and if distributed task executables are 
migrated to
backup execution nodes in case of failure.

Executable can fail due to:

- exception raised in task implementation during execution
- node crash/leave
- migration failure to/from target execution node

Infinispan will invoke failover mechanism in all above cases except when 
exception
is raised by task executable itself. Exception will be returned to 
invoker of
distributed task who can act upon it.

By default, there will be two failover policies: failover off, and 
failover on.
If failover is on load balancing policy in place will decide where to 
migrate
task executable for execution. In case failure is off task invoker will 
be notified.


Load balancing policy

Load balancing policy decides how distributed task executables are 
dispersed for
execution around Infinispan cluster. By default data collocating load 
balancing
policy is used as soon as distributed task is invoked on a set of keys 
in cache.
Other, simpler, load balancing policies can be implemented as well if a 
need arises.



Execution policy

Execution policy decides how task executable is executed once it has 
been migrated
to an execution node. By default priority queue is used for queuing of 
execution
task executables. Users can, if needed, fine-tune task priority on per 
task basis.
If priority is not changed for all tasks then all their executables are 
effectively
queued fifo on execution nodes.

Time permitting, job stealing policy should be implemented taking into 
account ideas
from fork/join framework and applying it in a distributed fashion 
amongst Infinispan
nodes.


Implementation sketches


In order to implement distributed/mapreduce task execution I believe we 
should reuse
existing Infinispan infrastructure (marshalling, remote command 
invocation, interceptor
chain, thread pools) as much as possible.

As user submits distributed task we would locate Infinispan nodes where 
the input keys
are located and send executables (DistributedCallable/Mapper/Reducer) to 
those nodes
using exisiting remote command invocation mechanism. Decision about 
migration of
executables is effectively done by load balancing policy, the default 
one being
collocating policy.

When executables wrapped into commands arrive to Infinispan nodes they 
are handed
off to a special handling object (execution policy) rather than 
invocation handler.
Execution policy interacts with execution container and in turn queues 
and monitors
executables as they are executed in container's thread pool. 
DistributedCallable(s) are
invoked and results returned to invoking node. Mappers are invoked as 
well and their
results handed off to Reducers as described in mapreduce algorithm. 
Eventually a
result of each Reducer is also returned to task invoker and in turn 
Collator is invoked.

In case of task failure due to exception raised in task itself, 
exception is returned to
task submitter. In other cases, failover policy along with load 
balancing policy decides
how to migrate executable to other Infinispan nodes.


If you think that I omitted something and/or have suggestion let me know.

Regards,
Vladimir






More information about the infinispan-dev mailing list