[infinispan-dev] distributed fork-join executor prototype

Bela Ban bban at redhat.com
Sun Mar 3 04:15:16 EST 2013


Hi Matt,

my view is that if you use Infinispan to distribute tasks to be 
processed (master-slave processing) to workers, then Infinispan is the 
wrong tech for this.

Assuming I understood what you wrote below, you're distributing tasks to 
be executed to nodes identified by the consistent hash over the task 
data you insert into the cache.

Workers get a notification when tasks are added to their cache, process 
them and remove the tasks when done. This is a system where you have 90% 
writes (task insertions and removals) and 10% reads (peeking into the 
cache to see if tasks are available).

Infinispan was designed to be a cache for frequently used data, in which 
reads outnumber writes (perhaps 80%/20% w/r), so with your task 
distribution you use Infinispan in a way that's opposite to what it was 
designed for. It will cerianly work, but I don't think you'll get great 
perf.

I've seen similar abuses before where people used Infinispan/JBossCache 
as a notification bus.

For the scenario you describe below I recommend a messaging system like 
JGroups or JMS. I've implemented a peer-to-peer task distribution system 
[1] in JGroups before, take a look, perhaps this is closer to what you 
have in kind...

[1] http://www.jgroups.org/taskdistribution.html


On 3/2/13 6:40 PM, matt hoffman wrote:
> Hey guys,
>
> I've been working on a prototype of integrating Infinispan into our
> app.  We do a lot of distributed processing across a small cluster, so
> I've played with Infinispan's existing distributed execution framework
> (which is nice), as well as using Infinispan alongside a normal message
> queue to distribute tasks.  But I've also put together a prototype of a
> new distributed execution framework using fork-join pools that you all
> might be interested in.  If it sounds like something that would be
> worthwhile for Infinispan, I can raise a Jira and submit a pull request
> with what I have so far. I'd need to get the CA and company policy stuff
> finalized; that might take a couple days. Meanwhile, in case there is
> any interest, I've described the approach I've taken below.
>
> First, a little background:
>
> A while back I worked on a side project that integrated a distributed
> work-stealing algorithm into the standard JDK fork-join queue.  It used
> JGroups for communication, because it was quick and easy for
> prototyping. So this week I thought i'd take a stab at porting that over
> to Infinispan. The algorithm I came up with for Infinispan is a bit less
> of a work-stealing algorithm, to take advantage of Infinispan's built-in
> distribution capabilities, but I think it's still fairly efficient.
>
> My basic approach was to take in a cache in the constructor, much like
> the existing distributed executor, and then create a parallel, DIST-mode
> cache that uses the same hash & grouping configuration as the original
> cache. That new parallel cache is the "task cache", and we use that to
> distribute available tasks across the cluster. It's a distributed cache
> so that tasks are partitioned across a large cluster, and it uses the
> hashing config of the original cache and a KeyAffinityService to attempt
> to distribute the tasks to the same nodes that contain the data being
> worked on. Nodes use cache listeners to be notified when there is new
> work available, and the atomic replace() to "check out" the tasks for
> execution, and "check in" the results.
>
> The basic algorithm is something like this:
>
> For a refresher, a normal FJ pool has a fork() method that takes in a
> task, and then places that task on an internal queue (actually, one of
> several queues). When threads are idle, they look to the nearest work
> queue for work. If that work queue does not have work, they "steal" work
> from another thread's queue.  So in the best case, tasks remain on the
> same thread as the task that spawned them, so tasks that process the
> same data as their parents may still have that data in the CPU's cache,
> etc. There's more to it than that, but that's the basic idea.
>
> This distributed algorithm just adds an extra layer on top for tasks
> that are marked "distributable" (by extending DistributedFJTask instead
> of the normal ForkJoinTask). When you call fork() with a
> DistributedFJTask, it first checks to see if the local pool's work queue
> is empty. If so, we just go ahead and submit it locally; there's no
> reason to distribute it. If not, we put the task in the task cache, and
> let Infinispan distribute it. When a node has no more work to do in its
> internal fork-join queues, it looks at the task cache and tries to pull
> work from there.
>
> So, it isn't really a "work-stealing" algorithm, per se; the
> distributable tasks are being distributed eagerly using Infinispan's
> normal cache distribution. But I'm hoping that doing that also makes it
> easier to handle node failure, since nodes collectively share a common
> picture of the work to be done.
>
> This approach required one change to the actual FJ classes themselves
> (in org.infinispan.util.concurrent.jdk8backported). That's probably the
> most controversial change. I had to make the original ForkJoinTask's
> fork() method non-final in order to extend it cleanly.  There's probably
> a way around that, but that's the cleanest option I have thought of thus
> far.
>
> And lastly, it's not done yet: basic task distribution is working, but I
> haven't tackled failover to any real extent yet. The biggest questions,
> though, are around what to do with the existing distributed execution
> interfaces. For example, DistributedTask has a getCallable() method
> because it assumes it's wrapping a Callable. But ForkJoinTasks don't
> extend Callable. I could put in a shim to wrap the DistributedFJTasks
> into Callables for the sake of that method, but I don't know if it's
> worth it.  Similarly, the DistributedExecutorService interface exposes a
> lot of submit-to-specific-address or submit-to-all-addresses methods,
> which are an odd fit here since tasks are distributed via their own
> cache.  Even if I used a KeyAffinityService to target the task to the
> given Address, it might get picked up by another node that shares that
> same hash. But I can add in a direct-to-single-Address capability in if
> that seems worthwhile. Alternately, I can just use entirely different
> interfaces (DistributedFJExecutorService, DistributedFJTask?).
>
> Thoughts?  Concerns?  Glaring issues?
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>

-- 
Bela Ban, JGroups lead (http://www.jgroups.org)


More information about the infinispan-dev mailing list