[infinispan-dev] Distributed execution framework - update

Manik Surtani manik at jboss.org
Wed Mar 2 05:46:26 EST 2011


On 1 Mar 2011, at 18:07, Vladimir Blagojevic wrote:

> I want to roughly outline what I believe needs to be done to implement 
> basic distributed
> execution framework for 5.0 Final.
> 
> If you recall, Distributed/MapReduce task is a logical work unit 
> consisting of multiple distributed
> executables executed individually across Infinispan cluster. Each 
> individual task execution
> on Infinispan cluster is governed by failover, load balancing and 
> execution policies.
> 
> Failover policy
> 
> Failover policy regulates how and if distributed task executables are 
> migrated to
> backup execution nodes in case of failure.
> 
> Executable can fail due to:
> 
> - exception raised in task implementation during execution
> - node crash/leave
> - migration failure to/from target execution node
> 
> Infinispan will invoke failover mechanism in all above cases except when 
> exception
> is raised by task executable itself. Exception will be returned to 
> invoker of
> distributed task who can act upon it.
> 
> By default, there will be two failover policies: failover off, and 
> failover on.
> If failover is on load balancing policy in place will decide where to 
> migrate
> task executable for execution. In case failure is off task invoker will 
> be notified.

Hmm - tasks may fail for various reasons, including a badly written task which may always fail.  Maybe all you need is a simple retry count?

> Load balancing policy
> 
> Load balancing policy decides how distributed task executables are 
> dispersed for
> execution around Infinispan cluster. By default data collocating load 
> balancing
> policy is used as soon as distributed task is invoked on a set of keys 
> in cache.
> Other, simpler, load balancing policies can be implemented as well if a 
> need arises.

I don't understand - I thought tasks are executed on all data owners (if any keys are provided) or on all nodes if no keys are provided.  What other "load balancing" policies do you foresee?  :-)

If it doesn't make sense maybe it would be better to remove load balancing from the API for now, add it later if we see a need?  

> Execution policy
> 
> Execution policy decides how task executable is executed once it has 
> been migrated
> to an execution node. By default priority queue is used for queuing of 
> execution
> task executables. Users can, if needed, fine-tune task priority on per 
> task basis.
> If priority is not changed for all tasks then all their executables are 
> effectively
> queued fifo on execution nodes.
> 
> Time permitting, job stealing policy should be implemented taking into 
> account ideas
> from fork/join framework and applying it in a distributed fashion 
> amongst Infinispan
> nodes.

I would say leave prioritising out for now.  Lets keep this simple.  This is all stuff that can be added later if needed.

> Implementation sketches
> 
> 
> In order to implement distributed/mapreduce task execution I believe we 
> should reuse
> existing Infinispan infrastructure (marshalling, remote command 
> invocation, interceptor
> chain, thread pools) as much as possible.
> 
> As user submits distributed task we would locate Infinispan nodes where 
> the input keys
> are located and send executables (DistributedCallable/Mapper/Reducer) to 
> those nodes
> using exisiting remote command invocation mechanism. Decision about 
> migration of
> executables is effectively done by load balancing policy, the default 
> one being
> collocating policy.
> 
> When executables wrapped into commands arrive to Infinispan nodes they 
> are handed
> off to a special handling object (execution policy) rather than 
> invocation handler.

Why?  If you are creating a special Command for this, the Command's perform() method could handle execution.  Saves you having to re-engineer the InvocationHandler (which is very complex as it is).

> Execution policy interacts with execution container and in turn queues 
> and monitors
> executables as they are executed in container's thread pool. 
> DistributedCallable(s) are
> invoked and results returned to invoking node. Mappers are invoked as 
> well and their
> results handed off to Reducers as described in mapreduce algorithm. 
> Eventually a
> result of each Reducer is also returned to task invoker and in turn 
> Collator is invoked.
> 
> In case of task failure due to exception raised in task itself, 
> exception is returned to
> task submitter.

Yes, the Command's perform() method would just have to throw an exception.  The InvocationHandler will wrap it in an appropriate ExceptionResponse to me sent back to the caller.

> In other cases, failover policy along with load 
> balancing policy decides
> how to migrate executable to other Infinispan nodes.
> 
> 
> If you think that I omitted something and/or have suggestion let me know.
> 
> Regards,
> Vladimir
> 
> 
> 
> 
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

--
Manik Surtani
manik at jboss.org
twitter.com/maniksurtani

Lead, Infinispan
http://www.infinispan.org






More information about the infinispan-dev mailing list