[infinispan-dev] distributed fork-join executor prototype
romano at inesc-id.pt
Mon Mar 4 07:07:08 EST 2013
This sounds really interesting Matt. In the Cloud-TM project
(www.cloudtm.eu) we are currently developing a parallel graph-analysis
algorithm on top of Infinispan's DEF. I would be really curious to take
a look at the framework you developed, and see how it could be exploited
in our application.
Paolo Romano, PhD
Coordinator of the Cloud-TM ICT FP7 Project (www.cloudtm.eu)
Senior Researcher @ INESC-ID (www.inesc-id.pt)
Assistant Professor @ Instituto Superior Tecnico (www.ist.utl.pt)
Rua Alves Redol, 9
1000-059, Lisbon Portugal
Tel. + 351 21 3100300
Fax + 351 21 3145843
On 3/2/13 5:40 PM, matt hoffman wrote:
> Hey guys,
> I've been working on a prototype of integrating Infinispan into our
> app. We do a lot of distributed processing across a small cluster, so
> I've played with Infinispan's existing distributed execution framework
> (which is nice), as well as using Infinispan alongside a normal
> message queue to distribute tasks. But I've also put together a
> prototype of a new distributed execution framework using fork-join
> pools that you all might be interested in. If it sounds like
> something that would be worthwhile for Infinispan, I can raise a Jira
> and submit a pull request with what I have so far. I'd need to get the
> CA and company policy stuff finalized; that might take a couple days.
> Meanwhile, in case there is any interest, I've described the approach
> I've taken below.
> First, a little background:
> A while back I worked on a side project that integrated a distributed
> work-stealing algorithm into the standard JDK fork-join queue. It
> used JGroups for communication, because it was quick and easy for
> prototyping. So this week I thought i'd take a stab at porting that
> over to Infinispan. The algorithm I came up with for Infinispan is a
> bit less of a work-stealing algorithm, to take advantage of
> Infinispan's built-in distribution capabilities, but I think it's
> still fairly efficient.
> My basic approach was to take in a cache in the constructor, much like
> the existing distributed executor, and then create a parallel,
> DIST-mode cache that uses the same hash & grouping configuration as
> the original cache. That new parallel cache is the "task cache", and
> we use that to distribute available tasks across the cluster. It's a
> distributed cache so that tasks are partitioned across a large
> cluster, and it uses the hashing config of the original cache and a
> KeyAffinityService to attempt to distribute the tasks to the same
> nodes that contain the data being worked on. Nodes use cache listeners
> to be notified when there is new work available, and the atomic
> replace() to "check out" the tasks for execution, and "check in" the
> The basic algorithm is something like this:
> For a refresher, a normal FJ pool has a fork() method that takes in a
> task, and then places that task on an internal queue (actually, one of
> several queues). When threads are idle, they look to the nearest work
> queue for work. If that work queue does not have work, they "steal"
> work from another thread's queue. So in the best case, tasks remain on
> the same thread as the task that spawned them, so tasks that process
> the same data as their parents may still have that data in the CPU's
> cache, etc. There's more to it than that, but that's the basic idea.
> This distributed algorithm just adds an extra layer on top for tasks
> that are marked "distributable" (by extending DistributedFJTask
> instead of the normal ForkJoinTask). When you call fork() with a
> DistributedFJTask, it first checks to see if the local pool's work
> queue is empty. If so, we just go ahead and submit it locally; there's
> no reason to distribute it. If not, we put the task in the task cache,
> and let Infinispan distribute it. When a node has no more work to do
> in its internal fork-join queues, it looks at the task cache and tries
> to pull work from there.
> So, it isn't really a "work-stealing" algorithm, per se; the
> distributable tasks are being distributed eagerly using Infinispan's
> normal cache distribution. But I'm hoping that doing that also makes
> it easier to handle node failure, since nodes collectively share a
> common picture of the work to be done.
> This approach required one change to the actual FJ classes themselves
> (in org.infinispan.util.concurrent.jdk8backported). That's probably
> the most controversial change. I had to make the original
> ForkJoinTask's fork() method non-final in order to extend it cleanly.
> There's probably a way around that, but that's the cleanest option I
> have thought of thus far.
> And lastly, it's not done yet: basic task distribution is working, but
> I haven't tackled failover to any real extent yet. The biggest
> questions, though, are around what to do with the existing distributed
> execution interfaces. For example, DistributedTask has a getCallable()
> method because it assumes it's wrapping a Callable. But ForkJoinTasks
> don't extend Callable. I could put in a shim to wrap the
> DistributedFJTasks into Callables for the sake of that method, but I
> don't know if it's worth it. Similarly, the
> DistributedExecutorService interface exposes a lot of
> submit-to-specific-address or submit-to-all-addresses methods, which
> are an odd fit here since tasks are distributed via their own cache.
> Even if I used a KeyAffinityService to target the task to the given
> Address, it might get picked up by another node that shares that same
> hash. But I can add in a direct-to-single-Address capability in if
> that seems worthwhile. Alternately, I can just use entirely different
> interfaces (DistributedFJExecutorService, DistributedFJTask?).
> Thoughts? Concerns? Glaring issues?
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
-------------- next part --------------
An HTML attachment was scrubbed...
More information about the infinispan-dev