[infinispan-dev] Distributed execution framework - update
Manik Surtani
manik at jboss.org
Wed Mar 2 05:46:26 EST 2011
On 1 Mar 2011, at 18:07, Vladimir Blagojevic wrote:
> I want to roughly outline what I believe needs to be done to implement
> basic distributed
> execution framework for 5.0 Final.
>
> If you recall, Distributed/MapReduce task is a logical work unit
> consisting of multiple distributed
> executables executed individually across Infinispan cluster. Each
> individual task execution
> on Infinispan cluster is governed by failover, load balancing and
> execution policies.
>
> Failover policy
>
> Failover policy regulates how and if distributed task executables are
> migrated to
> backup execution nodes in case of failure.
>
> Executable can fail due to:
>
> - exception raised in task implementation during execution
> - node crash/leave
> - migration failure to/from target execution node
>
> Infinispan will invoke failover mechanism in all above cases except when
> exception
> is raised by task executable itself. Exception will be returned to
> invoker of
> distributed task who can act upon it.
>
> By default, there will be two failover policies: failover off, and
> failover on.
> If failover is on load balancing policy in place will decide where to
> migrate
> task executable for execution. In case failure is off task invoker will
> be notified.
Hmm - tasks may fail for various reasons, including a badly written task which may always fail. Maybe all you need is a simple retry count?
> Load balancing policy
>
> Load balancing policy decides how distributed task executables are
> dispersed for
> execution around Infinispan cluster. By default data collocating load
> balancing
> policy is used as soon as distributed task is invoked on a set of keys
> in cache.
> Other, simpler, load balancing policies can be implemented as well if a
> need arises.
I don't understand - I thought tasks are executed on all data owners (if any keys are provided) or on all nodes if no keys are provided. What other "load balancing" policies do you foresee? :-)
If it doesn't make sense maybe it would be better to remove load balancing from the API for now, add it later if we see a need?
> Execution policy
>
> Execution policy decides how task executable is executed once it has
> been migrated
> to an execution node. By default priority queue is used for queuing of
> execution
> task executables. Users can, if needed, fine-tune task priority on per
> task basis.
> If priority is not changed for all tasks then all their executables are
> effectively
> queued fifo on execution nodes.
>
> Time permitting, job stealing policy should be implemented taking into
> account ideas
> from fork/join framework and applying it in a distributed fashion
> amongst Infinispan
> nodes.
I would say leave prioritising out for now. Lets keep this simple. This is all stuff that can be added later if needed.
> Implementation sketches
>
>
> In order to implement distributed/mapreduce task execution I believe we
> should reuse
> existing Infinispan infrastructure (marshalling, remote command
> invocation, interceptor
> chain, thread pools) as much as possible.
>
> As user submits distributed task we would locate Infinispan nodes where
> the input keys
> are located and send executables (DistributedCallable/Mapper/Reducer) to
> those nodes
> using exisiting remote command invocation mechanism. Decision about
> migration of
> executables is effectively done by load balancing policy, the default
> one being
> collocating policy.
>
> When executables wrapped into commands arrive to Infinispan nodes they
> are handed
> off to a special handling object (execution policy) rather than
> invocation handler.
Why? If you are creating a special Command for this, the Command's perform() method could handle execution. Saves you having to re-engineer the InvocationHandler (which is very complex as it is).
> Execution policy interacts with execution container and in turn queues
> and monitors
> executables as they are executed in container's thread pool.
> DistributedCallable(s) are
> invoked and results returned to invoking node. Mappers are invoked as
> well and their
> results handed off to Reducers as described in mapreduce algorithm.
> Eventually a
> result of each Reducer is also returned to task invoker and in turn
> Collator is invoked.
>
> In case of task failure due to exception raised in task itself,
> exception is returned to
> task submitter.
Yes, the Command's perform() method would just have to throw an exception. The InvocationHandler will wrap it in an appropriate ExceptionResponse to me sent back to the caller.
> In other cases, failover policy along with load
> balancing policy decides
> how to migrate executable to other Infinispan nodes.
>
>
> If you think that I omitted something and/or have suggestion let me know.
>
> Regards,
> Vladimir
>
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
--
Manik Surtani
manik at jboss.org
twitter.com/maniksurtani
Lead, Infinispan
http://www.infinispan.org
More information about the infinispan-dev
mailing list