This sounds really interesting Matt. In the Cloud-TM project
(
) we are currently developing a parallel graph-analysis
algorithm on top of Infinispan's DEF. I would be really curious to take
a look at the framework you developed, and see how it could be exploited
in our application.
Regards,
Paolo
--
Paolo Romano, PhD
Coordinator of the Cloud-TM ICT FP7 Project (
)
Rua Alves Redol, 9
1000-059, Lisbon Portugal
Tel. + 351 21 3100300
Fax + 351 21 3145843
Webpage
Hey guys,
I've been working on a prototype of integrating Infinispan into our
app. We do a lot of distributed processing across a small cluster, so
I've played with Infinispan's existing distributed execution framework
(which is nice), as well as using Infinispan alongside a normal
message queue to distribute tasks. But I've also put together a
prototype of a new distributed execution framework using fork-join
pools that you all might be interested in. If it sounds like
something that would be worthwhile for Infinispan, I can raise a Jira
and submit a pull request with what I have so far. I'd need to get the
CA and company policy stuff finalized; that might take a couple days.
Meanwhile, in case there is any interest, I've described the approach
I've taken below.
First, a little background:
A while back I worked on a side project that integrated a distributed
work-stealing algorithm into the standard JDK fork-join queue. It
used JGroups for communication, because it was quick and easy for
prototyping. So this week I thought i'd take a stab at porting that
over to Infinispan. The algorithm I came up with for Infinispan is a
bit less of a work-stealing algorithm, to take advantage of
Infinispan's built-in distribution capabilities, but I think it's
still fairly efficient.
My basic approach was to take in a cache in the constructor, much like
the existing distributed executor, and then create a parallel,
DIST-mode cache that uses the same hash & grouping configuration as
the original cache. That new parallel cache is the "task cache", and
we use that to distribute available tasks across the cluster. It's a
distributed cache so that tasks are partitioned across a large
cluster, and it uses the hashing config of the original cache and a
KeyAffinityService to attempt to distribute the tasks to the same
nodes that contain the data being worked on. Nodes use cache listeners
to be notified when there is new work available, and the atomic
replace() to "check out" the tasks for execution, and "check in" the
results.
The basic algorithm is something like this:
For a refresher, a normal FJ pool has a fork() method that takes in a
task, and then places that task on an internal queue (actually, one of
several queues). When threads are idle, they look to the nearest work
queue for work. If that work queue does not have work, they "steal"
work from another thread's queue. So in the best case, tasks remain on
the same thread as the task that spawned them, so tasks that process
the same data as their parents may still have that data in the CPU's
cache, etc. There's more to it than that, but that's the basic idea.
This distributed algorithm just adds an extra layer on top for tasks
that are marked "distributable" (by extending DistributedFJTask
instead of the normal ForkJoinTask). When you call fork() with a
DistributedFJTask, it first checks to see if the local pool's work
queue is empty. If so, we just go ahead and submit it locally; there's
no reason to distribute it. If not, we put the task in the task cache,
and let Infinispan distribute it. When a node has no more work to do
in its internal fork-join queues, it looks at the task cache and tries
to pull work from there.
So, it isn't really a "work-stealing" algorithm, per se; the
distributable tasks are being distributed eagerly using Infinispan's
normal cache distribution. But I'm hoping that doing that also makes
it easier to handle node failure, since nodes collectively share a
common picture of the work to be done.
This approach required one change to the actual FJ classes themselves
(in org.infinispan.util.concurrent.jdk8backported). That's probably
the most controversial change. I had to make the original
ForkJoinTask's fork() method non-final in order to extend it cleanly.
There's probably a way around that, but that's the cleanest option I
have thought of thus far.
And lastly, it's not done yet: basic task distribution is working, but
I haven't tackled failover to any real extent yet. The biggest
questions, though, are around what to do with the existing distributed
execution interfaces. For example, DistributedTask has a getCallable()
method because it assumes it's wrapping a Callable. But ForkJoinTasks
don't extend Callable. I could put in a shim to wrap the
DistributedFJTasks into Callables for the sake of that method, but I
don't know if it's worth it. Similarly, the
DistributedExecutorService interface exposes a lot of
submit-to-specific-address or submit-to-all-addresses methods, which
are an odd fit here since tasks are distributed via their own cache.
Even if I used a KeyAffinityService to target the task to the given
Address, it might get picked up by another node that shares that same
hash. But I can add in a direct-to-single-Address capability in if
that seems worthwhile. Alternately, I can just use entirely different
interfaces (DistributedFJExecutorService, DistributedFJTask?).
Thoughts? Concerns? Glaring issues?
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev