[infinispan-dev] distributed fork-join executor prototype

matt hoffman matt at mhoffman.org
Sun Mar 3 23:09:19 EST 2013


Bela,

Thanks a lot for the response. I really appreciate your input.

Some comments inline:

On Mar 3, 2013 4:16 AM, "Bela Ban" <bban at redhat.com> wrote:
>
> Hi Matt,
>
> my view is that if you use Infinispan to distribute tasks to be
> processed (master-slave processing) to workers, then Infinispan is the
> wrong tech for this.
>
> Assuming I understood what you wrote below, you're distributing tasks to
> be executed to nodes identified by the consistent hash over the task
> data you insert into the cache.
>
> Workers get a notification when tasks are added to their cache, process
> them and remove the tasks when done. This is a system where you have 90%
> writes (task insertions and removals) and 10% reads (peeking into the
> cache to see if tasks are available).
>
> Infinispan was designed to be a cache for frequently used data, in which
> reads outnumber writes (perhaps 80%/20% w/r), so with your task
> distribution you use Infinispan in a way that's opposite to what it was
> designed for. It will cerianly work, but I don't think you'll get great
> perf.
>

Your understanding is basically right, except workers don't remove the
tasks when done. But it is certainly write-heavy. Tasks are inserted, read
by workers, updated when a worker starts working on them, updated again
when complete, and read again by the task originator. Perhaps 60/40 w/r,
but still write-heavy.

> I've seen similar abuses before where people used Infinispan/JBossCache
> as a notification bus.
>
> For the scenario you describe below I recommend a messaging system like
> JGroups or JMS. I've implemented a peer-to-peer task distribution system
> [1] in JGroups before, take a look, perhaps this is closer to what you
> have in kind...
>
> [1] http://www.jgroups.org/taskdistribution.html
>
>

My first version of this distribution system did use JGroups directly (and
drew some inspiration from your paper, actually). I can definitely go back
to a JGroups-based approach if you think it is worthwhile (or a
Infinispan-command-based approach, if there's interest from the Infinispan
community… I'm assuming that using Infinispan's command pattern and
RPCManager would be comparable to vanilla JGroups, performance-wise, but
would be more consistent with Infinispan's architecture).

The differences between this algorithm and the one you propose in your task
distribution paper, for whatever it's worth:

1.) tasks are broadcast to just a subset of nodes, not all. The subset
hopefully corresponds to the subset of nodes that contain the data being
processed, if the task involves data in cache.

2.) The algorithm is pull-based, instead of push-based, so that workers
only "pull" more work when they're idle (or nearly idle). The hope is that
this allows workers to remain more evenly loaded.

3.) the algorithm attempts to keep tasks on the originating node if
possible, for efficiency.

Now, those could all be done with vanilla JGroups with just a couple tweaks
to your design. There are some downsides to this algorithm vs. your design,
too -- it requires one more exchange between master & worker (to "claim" a
task), for example. I could implement something more like your design, with
a push-based distribution scheme that pins tasks to nodes to see what the
quantifiable effect is.

The thing that originally made me want to go with a task cache: in your
paper (and in any design that supports node failure), each node is holding
a view of other tasks in the system. When a new node joins, they receive a
list of tasks. There's a protocol for keeping the task lists synchronized
(multicast add and remove, for example), and if we want to scale either in
# of tasks or # of nodes, we need to partition the tasks so that each node
contains a subset of tasks, while still ensuring that each task is stored
on some number of nodes and that when one of those nodes goes down, that
data is assigned to another node…   This all sounds very much like a
distributed cache. It handles distributing data to new peers, making sure
data is stored on X nodes at all times, and ensuring that each node's view
of the data remains in sync. So on the one hand, using an actual cache for
that seems simpler: you're letting the cache do what it does best
(distribute data and keep it consistent amongst peers). It's not just using
it as a notification bus, as you suggest; it's the actual data distribution
I'm interested in.

On the other hand, I think you're arguing that a task cache is doing more
than just distributing the data, and a simpler task broadcast is more
appropriate.  I don't know all the things that Infinispan is doing under
the covers to optimize for a read-heavy workload, so I'll have to take your
word for it.

When I find some more time, I can dust off my JGroups-based prototype of
the same algorithm and run some more performance benchmarks. In very
 preliminary tests, the task-cache-based design and the JGroups-based
design were very similar in performance. I'll need to run more extensive
tests before I can really quantify the overhead, though. And I'm sure the
JGroups-based design could be made more efficient.

Thanks a lot for the feedback -- I really appreciate it.



Matt


> On 3/2/13 6:40 PM, matt hoffman wrote:
> > Hey guys,
> >
> > I've been working on a prototype of integrating Infinispan into our
> > app.  We do a lot of distributed processing across a small cluster, so
> > I've played with Infinispan's existing distributed execution framework
> > (which is nice), as well as using Infinispan alongside a normal message
> > queue to distribute tasks.  But I've also put together a prototype of a
> > new distributed execution framework using fork-join pools that you all
> > might be interested in.  If it sounds like something that would be
> > worthwhile for Infinispan, I can raise a Jira and submit a pull request
> > with what I have so far. I'd need to get the CA and company policy stuff
> > finalized; that might take a couple days. Meanwhile, in case there is
> > any interest, I've described the approach I've taken below.
> >
> > First, a little background:
> >
> > A while back I worked on a side project that integrated a distributed
> > work-stealing algorithm into the standard JDK fork-join queue.  It used
> > JGroups for communication, because it was quick and easy for
> > prototyping. So this week I thought i'd take a stab at porting that over
> > to Infinispan. The algorithm I came up with for Infinispan is a bit less
> > of a work-stealing algorithm, to take advantage of Infinispan's built-in
> > distribution capabilities, but I think it's still fairly efficient.
> >
> > My basic approach was to take in a cache in the constructor, much like
> > the existing distributed executor, and then create a parallel, DIST-mode
> > cache that uses the same hash & grouping configuration as the original
> > cache. That new parallel cache is the "task cache", and we use that to
> > distribute available tasks across the cluster. It's a distributed cache
> > so that tasks are partitioned across a large cluster, and it uses the
> > hashing config of the original cache and a KeyAffinityService to attempt
> > to distribute the tasks to the same nodes that contain the data being
> > worked on. Nodes use cache listeners to be notified when there is new
> > work available, and the atomic replace() to "check out" the tasks for
> > execution, and "check in" the results.
> >
> > The basic algorithm is something like this:
> >
> > For a refresher, a normal FJ pool has a fork() method that takes in a
> > task, and then places that task on an internal queue (actually, one of
> > several queues). When threads are idle, they look to the nearest work
> > queue for work. If that work queue does not have work, they "steal" work
> > from another thread's queue.  So in the best case, tasks remain on the
> > same thread as the task that spawned them, so tasks that process the
> > same data as their parents may still have that data in the CPU's cache,
> > etc. There's more to it than that, but that's the basic idea.
> >
> > This distributed algorithm just adds an extra layer on top for tasks
> > that are marked "distributable" (by extending DistributedFJTask instead
> > of the normal ForkJoinTask). When you call fork() with a
> > DistributedFJTask, it first checks to see if the local pool's work queue
> > is empty. If so, we just go ahead and submit it locally; there's no
> > reason to distribute it. If not, we put the task in the task cache, and
> > let Infinispan distribute it. When a node has no more work to do in its
> > internal fork-join queues, it looks at the task cache and tries to pull
> > work from there.
> >
> > So, it isn't really a "work-stealing" algorithm, per se; the
> > distributable tasks are being distributed eagerly using Infinispan's
> > normal cache distribution. But I'm hoping that doing that also makes it
> > easier to handle node failure, since nodes collectively share a common
> > picture of the work to be done.
> >
> > This approach required one change to the actual FJ classes themselves
> > (in org.infinispan.util.concurrent.jdk8backported). That's probably the
> > most controversial change. I had to make the original ForkJoinTask's
> > fork() method non-final in order to extend it cleanly.  There's probably
> > a way around that, but that's the cleanest option I have thought of thus
> > far.
> >
> > And lastly, it's not done yet: basic task distribution is working, but I
> > haven't tackled failover to any real extent yet. The biggest questions,
> > though, are around what to do with the existing distributed execution
> > interfaces. For example, DistributedTask has a getCallable() method
> > because it assumes it's wrapping a Callable. But ForkJoinTasks don't
> > extend Callable. I could put in a shim to wrap the DistributedFJTasks
> > into Callables for the sake of that method, but I don't know if it's
> > worth it.  Similarly, the DistributedExecutorService interface exposes a
> > lot of submit-to-specific-address or submit-to-all-addresses methods,
> > which are an odd fit here since tasks are distributed via their own
> > cache.  Even if I used a KeyAffinityService to target the task to the
> > given Address, it might get picked up by another node that shares that
> > same hash. But I can add in a direct-to-single-Address capability in if
> > that seems worthwhile. Alternately, I can just use entirely different
> > interfaces (DistributedFJExecutorService, DistributedFJTask?).
> >
> > Thoughts?  Concerns?  Glaring issues?
> >
> >
> >
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev at lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >
>
> --
> Bela Ban, JGroups lead (http://www.jgroups.org)
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20130303/c826147b/attachment.html 


More information about the infinispan-dev mailing list