Hi Matt,
my view is that if you use Infinispan to distribute tasks to be
processed (master-slave processing) to workers, then Infinispan is the
wrong tech for this.
Assuming I understood what you wrote below, you're distributing tasks to
be executed to nodes identified by the consistent hash over the task
data you insert into the cache.
Workers get a notification when tasks are added to their cache, process
them and remove the tasks when done. This is a system where you have 90%
writes (task insertions and removals) and 10% reads (peeking into the
cache to see if tasks are available).
Infinispan was designed to be a cache for frequently used data, in which
reads outnumber writes (perhaps 80%/20% w/r), so with your task
distribution you use Infinispan in a way that's opposite to what it was
designed for. It will cerianly work, but I don't think you'll get great
perf.
I've seen similar abuses before where people used Infinispan/JBossCache
as a notification bus.
For the scenario you describe below I recommend a messaging system like
JGroups or JMS. I've implemented a peer-to-peer task distribution system
[1] in JGroups before, take a look, perhaps this is closer to what you
have in kind...
[1]
Hey guys,
I've been working on a prototype of integrating Infinispan into our
app. We do a lot of distributed processing across a small cluster, so
I've played with Infinispan's existing distributed execution framework
(which is nice), as well as using Infinispan alongside a normal message
queue to distribute tasks. But I've also put together a prototype of a
new distributed execution framework using fork-join pools that you all
might be interested in. If it sounds like something that would be
worthwhile for Infinispan, I can raise a Jira and submit a pull request
with what I have so far. I'd need to get the CA and company policy stuff
finalized; that might take a couple days. Meanwhile, in case there is
any interest, I've described the approach I've taken below.
First, a little background:
A while back I worked on a side project that integrated a distributed
work-stealing algorithm into the standard JDK fork-join queue. It used
JGroups for communication, because it was quick and easy for
prototyping. So this week I thought i'd take a stab at porting that over
to Infinispan. The algorithm I came up with for Infinispan is a bit less
of a work-stealing algorithm, to take advantage of Infinispan's built-in
distribution capabilities, but I think it's still fairly efficient.
My basic approach was to take in a cache in the constructor, much like
the existing distributed executor, and then create a parallel, DIST-mode
cache that uses the same hash & grouping configuration as the original
cache. That new parallel cache is the "task cache", and we use that to
distribute available tasks across the cluster. It's a distributed cache
so that tasks are partitioned across a large cluster, and it uses the
hashing config of the original cache and a KeyAffinityService to attempt
to distribute the tasks to the same nodes that contain the data being
worked on. Nodes use cache listeners to be notified when there is new
work available, and the atomic replace() to "check out" the tasks for
execution, and "check in" the results.
The basic algorithm is something like this:
For a refresher, a normal FJ pool has a fork() method that takes in a
task, and then places that task on an internal queue (actually, one of
several queues). When threads are idle, they look to the nearest work
queue for work. If that work queue does not have work, they "steal" work
from another thread's queue. So in the best case, tasks remain on the
same thread as the task that spawned them, so tasks that process the
same data as their parents may still have that data in the CPU's cache,
etc. There's more to it than that, but that's the basic idea.
This distributed algorithm just adds an extra layer on top for tasks
that are marked "distributable" (by extending DistributedFJTask instead
of the normal ForkJoinTask). When you call fork() with a
DistributedFJTask, it first checks to see if the local pool's work queue
is empty. If so, we just go ahead and submit it locally; there's no
reason to distribute it. If not, we put the task in the task cache, and
let Infinispan distribute it. When a node has no more work to do in its
internal fork-join queues, it looks at the task cache and tries to pull
work from there.
So, it isn't really a "work-stealing" algorithm, per se; the
distributable tasks are being distributed eagerly using Infinispan's
normal cache distribution. But I'm hoping that doing that also makes it
easier to handle node failure, since nodes collectively share a common
picture of the work to be done.
This approach required one change to the actual FJ classes themselves
(in org.infinispan.util.concurrent.jdk8backported). That's probably the
most controversial change. I had to make the original ForkJoinTask's
fork() method non-final in order to extend it cleanly. There's probably
a way around that, but that's the cleanest option I have thought of thus
far.
And lastly, it's not done yet: basic task distribution is working, but I
haven't tackled failover to any real extent yet. The biggest questions,
though, are around what to do with the existing distributed execution
interfaces. For example, DistributedTask has a getCallable() method
because it assumes it's wrapping a Callable. But ForkJoinTasks don't
extend Callable. I could put in a shim to wrap the DistributedFJTasks
into Callables for the sake of that method, but I don't know if it's
worth it. Similarly, the DistributedExecutorService interface exposes a
lot of submit-to-specific-address or submit-to-all-addresses methods,
which are an odd fit here since tasks are distributed via their own
cache. Even if I used a KeyAffinityService to target the task to the
given Address, it might get picked up by another node that shares that
same hash. But I can add in a direct-to-single-Address capability in if
that seems worthwhile. Alternately, I can just use entirely different
interfaces (DistributedFJExecutorService, DistributedFJTask?).
Thoughts? Concerns? Glaring issues?
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev