[infinispan-dev] distributed fork-join executor prototype

matt hoffman matt at mhoffman.org
Sat Mar 2 12:40:05 EST 2013


Hey guys,

I've been working on a prototype of integrating Infinispan into our app.
We do a lot of distributed processing across a small cluster, so I've
played with Infinispan's existing distributed execution framework (which is
nice), as well as using Infinispan alongside a normal message queue to
distribute tasks.  But I've also put together a prototype of a new
distributed execution framework using fork-join pools that you all might be
interested in.  If it sounds like something that would be worthwhile for
Infinispan, I can raise a Jira and submit a pull request with what I have
so far. I'd need to get the CA and company policy stuff finalized; that
might take a couple days. Meanwhile, in case there is any interest, I've
described the approach I've taken below.

First, a little background:

A while back I worked on a side project that integrated a distributed
work-stealing algorithm into the standard JDK fork-join queue.  It used
JGroups for communication, because it was quick and easy for prototyping.
So this week I thought i'd take a stab at porting that over to Infinispan.
The algorithm I came up with for Infinispan is a bit less of a
work-stealing algorithm, to take advantage of Infinispan's built-in
distribution capabilities, but I think it's still fairly efficient.

My basic approach was to take in a cache in the constructor, much like the
existing distributed executor, and then create a parallel, DIST-mode cache
that uses the same hash & grouping configuration as the original cache.
That new parallel cache is the "task cache", and we use that to distribute
available tasks across the cluster. It's a distributed cache so that tasks
are partitioned across a large cluster, and it uses the hashing config of
the original cache and a KeyAffinityService to attempt to distribute the
tasks to the same nodes that contain the data being worked on. Nodes use
cache listeners to be notified when there is new work available, and the
atomic replace() to "check out" the tasks for execution, and "check in" the
results.

The basic algorithm is something like this:

For a refresher, a normal FJ pool has a fork() method that takes in a task,
and then places that task on an internal queue (actually, one of several
queues). When threads are idle, they look to the nearest work queue for
work. If that work queue does not have work, they "steal" work from another
thread's queue.  So in the best case, tasks remain on the same thread as
the task that spawned them, so tasks that process the same data as their
parents may still have that data in the CPU's cache, etc. There's more to
it than that, but that's the basic idea.

This distributed algorithm just adds an extra layer on top for tasks that
are marked "distributable" (by extending DistributedFJTask instead of the
normal ForkJoinTask). When you call fork() with a DistributedFJTask, it
first checks to see if the local pool's work queue is empty. If so, we just
go ahead and submit it locally; there's no reason to distribute it. If not,
we put the task in the task cache, and let Infinispan distribute it. When a
node has no more work to do in its internal fork-join queues, it looks at
the task cache and tries to pull work from there.

So, it isn't really a "work-stealing" algorithm, per se; the distributable
tasks are being distributed eagerly using Infinispan's normal cache
distribution. But I'm hoping that doing that also makes it easier to handle
node failure, since nodes collectively share a common picture of the work
to be done.

This approach required one change to the actual FJ classes themselves
(in org.infinispan.util.concurrent.jdk8backported).
That's probably the most controversial change. I had to make the original
ForkJoinTask's fork() method non-final in order to extend it cleanly.
There's probably a way around that, but that's the cleanest option I have
thought of thus far.

And lastly, it's not done yet: basic task distribution is working, but I
haven't tackled failover to any real extent yet. The biggest questions,
though, are around what to do with the existing distributed execution
interfaces. For example, DistributedTask has a getCallable() method because
it assumes it's wrapping a Callable. But ForkJoinTasks don't extend
Callable. I could put in a shim to wrap the DistributedFJTasks into
Callables for the sake of that method, but I don't know if it's worth it.
Similarly, the DistributedExecutorService interface exposes a lot of
submit-to-specific-address or submit-to-all-addresses methods, which are an
odd fit here since tasks are distributed via their own cache.  Even if I
used a KeyAffinityService to target the task to the given Address, it might
get picked up by another node that shares that same hash. But I can add in
a direct-to-single-Address capability in if that seems worthwhile.
Alternately, I can just use entirely different interfaces
(DistributedFJExecutorService, DistributedFJTask?).

Thoughts?  Concerns?  Glaring issues?
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20130302/f5c54903/attachment.html 


More information about the infinispan-dev mailing list