[infinispan-dev] distributed fork-join executor prototype

Bela Ban bban at redhat.com
Mon Mar 4 03:11:48 EST 2013


Hi Matt,

there are subtle differences between map-reduce scenarios and task 
distribution: map-reduce works mostly on cached data (lots of reads with 
few writes), whereas task distribution produces mostly new data (lots of 
writes).

If I misunderstood you, and you're actually doing map-reduce, then using 
Infinispan is fine.
Comments inline ...

On 3/4/13 5:09 AM, matt hoffman wrote:
> Bela,
>
> Thanks a lot for the response. I really appreciate your input.
>
> Some comments inline:
>
> On Mar 3, 2013 4:16 AM, "Bela Ban" <bban at redhat.com
> <mailto:bban at redhat.com>> wrote:
>  >
>  > Hi Matt,
>  >
>  > my view is that if you use Infinispan to distribute tasks to be
>  > processed (master-slave processing) to workers, then Infinispan is the
>  > wrong tech for this.
>  >
>  > Assuming I understood what you wrote below, you're distributing tasks to
>  > be executed to nodes identified by the consistent hash over the task
>  > data you insert into the cache.
>  >
>  > Workers get a notification when tasks are added to their cache, process
>  > them and remove the tasks when done. This is a system where you have 90%
>  > writes (task insertions and removals) and 10% reads (peeking into the
>  > cache to see if tasks are available).
>  >
>  > Infinispan was designed to be a cache for frequently used data, in which
>  > reads outnumber writes (perhaps 80%/20% w/r), so with your task
>  > distribution you use Infinispan in a way that's opposite to what it was
>  > designed for. It will cerianly work, but I don't think you'll get great
>  > perf.
>  >
>
> Your understanding is basically right, except workers don't remove the
> tasks when done. But it is certainly write-heavy. Tasks are inserted,
> read by workers, updated when a worker starts working on them, updated
> again when complete, and read again by the task originator. Perhaps
> 60/40 w/r, but still write-heavy.


So if we use DIST, we have a write (insert) to N nodes, a local read, 
one or more writes (updates) and a final write (done), followed by a 
(potentially remote) read by the coordinator.

I'm wondering whether the intermediate writes (updates) are needed, as 
they trigger messages to other nodes: if the intermediate steps are 
relatively small, then there's no need to back them up to other nodes, 
as others could simply restart the work from scratch if a node crashes. 
Most of the time, nodes don't crash, so the update-writes might not be 
amortized by crashes, unless it's a lot of work to recreate those 
intermediate steps.

Anyway, yes, this is write-heavy, and there are a few disadvantages 
using DIST:
- Unneeded writes for intermediate work
- DIST reshuffles state on a view change. This means that even if a 
given worker doesn't crash, it might have to surrender its work to a 
different worker if the view changes in the middle of a task. Actually, 
your system can probably deal with this, but even if a worker continues 
processing a task during a view change, chances are its writes are now 
mostly remote, which is inefficient
- While work stealing is a good thing in general, in Infinispan it leads 
to contention on the same locks, ie. the original worker and the worker 
which stole the task.


>  > I've seen similar abuses before where people used Infinispan/JBossCache
>  > as a notification bus.
>  >
>  > For the scenario you describe below I recommend a messaging system like
>  > JGroups or JMS. I've implemented a peer-to-peer task distribution system
>  > [1] in JGroups before, take a look, perhaps this is closer to what you
>  > have in kind...
>  >
>  > [1] http://www.jgroups.org/taskdistribution.html
>  >
>  >
>
> My first version of this distribution system did use JGroups directly
> (and drew some inspiration from your paper, actually). I can definitely
> go back to a JGroups-based approach if you think it is worthwhile (or a
> Infinispan-command-based approach, if there's interest from the
> Infinispan community… I'm assuming that using Infinispan's command
> pattern and RPCManager would be comparable to vanilla JGroups,
> performance-wise, but would be more consistent with Infinispan's
> architecture).


While I'm a big proponent of Infinispan, there are cases where it's the 
wrong tool for the job, e.g. for messaging systems, notification buses 
and master-slave work distribution.



> The differences between this algorithm and the one you propose in your
> task distribution paper, for whatever it's worth:
>
> 1.) tasks are broadcast to just a subset of nodes, not all. The subset
> hopefully corresponds to the subset of nodes that contain the data being
> processed, if the task involves data in cache.


Got it, using DIST. You could use a similar mechanism to send tasks to a 
subset, using a consistent hash over the task ID, for example. In a 
large system, this would certainly be better as the tasks are now not 
stored by everyone, but only by a small subset. My system used 
multicasts because it was a prototype and mcasts were simpler to implement.


> 2.) The algorithm is pull-based, instead of push-based, so that workers
> only "pull" more work when they're idle (or nearly idle). The hope is
> that this allows workers to remain more evenly loaded.


OK, makes senses. But this gets back to using the right tool. If I has 
given the task of implementing such a system, I'd intuitively had used 
JMS queues, or a peer-to-peer system like JGroups.


> 3.) the algorithm attempts to keep tasks on the originating node if
> possible, for efficiency.
>
> Now, those could all be done with vanilla JGroups with just a couple
> tweaks to your design. There are some downsides to this algorithm vs.
> your design, too -- it requires one more exchange between master &
> worker (to "claim" a task), for example. I could implement something
> more like your design, with a push-based distribution scheme that pins
> tasks to nodes to see what the quantifiable effect is.


Yes, in my design, a task is only rescheduled to a different node if the 
original worker node *died*, ie. was removed from the view, and not if 
the consistent hash over the task ID pointed to a different node.


> The thing that originally made me want to go with a task cache: in your
> paper (and in any design that supports node failure), each node is
> holding a view of other tasks in the system. When a new node joins, they
> receive a list of tasks. There's a protocol for keeping the task lists
> synchronized (multicast add and remove, for example), and if we want to
> scale either in # of tasks or # of nodes, we need to partition the tasks
> so that each node contains a subset of tasks, while still ensuring that
> each task is stored on some number of nodes and that when one of those
> nodes goes down, that data is assigned to another node…


Agreed. But as I said above, this could be done by computing a hash over 
the task and sending the work to a subset of the cluster (with N 
unicasts) rather than a multicast.

Or you could multicast the task and only the nodes which match the 
hash(task_id) would store the task... the former is probably more 
efficient, the latter simpler to implement. Although either case doesn't 
sound like it's overly complicated to implement.


> On the other hand, I think you're arguing that a task cache is doing
> more than just distributing the data, and a simpler task broadcast is
> more appropriate.  I don't know all the things that Infinispan is doing
> under the covers to optimize for a read-heavy workload, so I'll have to
> take your word for it.


I wrote JBossCache (Infinispan's predecessor) and my design was geared 
towards read-heavy workloads. TBH, I just *assumed* but didn't really 
know whether this is still the case with Infinispan, perhaps someone 
from the Infinispan team can chime in ?


> When I find some more time, I can dust off my JGroups-based prototype of
> the same algorithm and run some more performance benchmarks. In very
>   preliminary tests, the task-cache-based design and the JGroups-based
> design were very similar in performance. I'll need to run more extensive
> tests before I can really quantify the overhead, though. And I'm sure
> the JGroups-based design could be made more efficient.


Yes, see above


> Thanks a lot for the feedback -- I really appreciate it.

NP, cheers,

-- 
Bela Ban, JGroups lead (http://www.jgroups.org)


More information about the infinispan-dev mailing list