Hi Matt,
there are subtle differences between map-reduce scenarios and task
distribution: map-reduce works mostly on cached data (lots of reads with
few writes), whereas task distribution produces mostly new data (lots of
writes).
If I misunderstood you, and you're actually doing map-reduce, then using
Infinispan is fine.
Comments inline ...
On 3/4/13 5:09 AM, matt hoffman wrote:
Bela,
Thanks a lot for the response. I really appreciate your input.
Some comments inline:
On Mar 3, 2013 4:16 AM, "Bela Ban" <bban(a)redhat.com
<mailto:bban@redhat.com>> wrote:
>
> Hi Matt,
>
> my view is that if you use Infinispan to distribute tasks to be
> processed (master-slave processing) to workers, then Infinispan is the
> wrong tech for this.
>
> Assuming I understood what you wrote below, you're distributing tasks to
> be executed to nodes identified by the consistent hash over the task
> data you insert into the cache.
>
> Workers get a notification when tasks are added to their cache, process
> them and remove the tasks when done. This is a system where you have 90%
> writes (task insertions and removals) and 10% reads (peeking into the
> cache to see if tasks are available).
>
> Infinispan was designed to be a cache for frequently used data, in which
> reads outnumber writes (perhaps 80%/20% w/r), so with your task
> distribution you use Infinispan in a way that's opposite to what it was
> designed for. It will cerianly work, but I don't think you'll get great
> perf.
>
Your understanding is basically right, except workers don't remove the
tasks when done. But it is certainly write-heavy. Tasks are inserted,
read by workers, updated when a worker starts working on them, updated
again when complete, and read again by the task originator. Perhaps
60/40 w/r, but still write-heavy.
So if we use DIST, we have a write (insert) to N nodes, a local read,
one or more writes (updates) and a final write (done), followed by a
(potentially remote) read by the coordinator.
I'm wondering whether the intermediate writes (updates) are needed, as
they trigger messages to other nodes: if the intermediate steps are
relatively small, then there's no need to back them up to other nodes,
as others could simply restart the work from scratch if a node crashes.
Most of the time, nodes don't crash, so the update-writes might not be
amortized by crashes, unless it's a lot of work to recreate those
intermediate steps.
Anyway, yes, this is write-heavy, and there are a few disadvantages
using DIST:
- Unneeded writes for intermediate work
- DIST reshuffles state on a view change. This means that even if a
given worker doesn't crash, it might have to surrender its work to a
different worker if the view changes in the middle of a task. Actually,
your system can probably deal with this, but even if a worker continues
processing a task during a view change, chances are its writes are now
mostly remote, which is inefficient
- While work stealing is a good thing in general, in Infinispan it leads
to contention on the same locks, ie. the original worker and the worker
which stole the task.
> I've seen similar abuses before where people used
Infinispan/JBossCache
> as a notification bus.
>
> For the scenario you describe below I recommend a messaging system like
> JGroups or JMS. I've implemented a peer-to-peer task distribution system
> [1] in JGroups before, take a look, perhaps this is closer to what you
> have in kind...
>
> [1]
http://www.jgroups.org/taskdistribution.html
>
>
My first version of this distribution system did use JGroups directly
(and drew some inspiration from your paper, actually). I can definitely
go back to a JGroups-based approach if you think it is worthwhile (or a
Infinispan-command-based approach, if there's interest from the
Infinispan community… I'm assuming that using Infinispan's command
pattern and RPCManager would be comparable to vanilla JGroups,
performance-wise, but would be more consistent with Infinispan's
architecture).
While I'm a big proponent of Infinispan, there are cases where it's the
wrong tool for the job, e.g. for messaging systems, notification buses
and master-slave work distribution.
The differences between this algorithm and the one you propose in
your
task distribution paper, for whatever it's worth:
1.) tasks are broadcast to just a subset of nodes, not all. The subset
hopefully corresponds to the subset of nodes that contain the data being
processed, if the task involves data in cache.
Got it, using DIST. You could use a similar mechanism to send tasks to a
subset, using a consistent hash over the task ID, for example. In a
large system, this would certainly be better as the tasks are now not
stored by everyone, but only by a small subset. My system used
multicasts because it was a prototype and mcasts were simpler to implement.
2.) The algorithm is pull-based, instead of push-based, so that
workers
only "pull" more work when they're idle (or nearly idle). The hope is
that this allows workers to remain more evenly loaded.
OK, makes senses. But this gets back to using the right tool. If I has
given the task of implementing such a system, I'd intuitively had used
JMS queues, or a peer-to-peer system like JGroups.
3.) the algorithm attempts to keep tasks on the originating node if
possible, for efficiency.
Now, those could all be done with vanilla JGroups with just a couple
tweaks to your design. There are some downsides to this algorithm vs.
your design, too -- it requires one more exchange between master &
worker (to "claim" a task), for example. I could implement something
more like your design, with a push-based distribution scheme that pins
tasks to nodes to see what the quantifiable effect is.
Yes, in my design, a task is only rescheduled to a different node if the
original worker node *died*, ie. was removed from the view, and not if
the consistent hash over the task ID pointed to a different node.
The thing that originally made me want to go with a task cache: in
your
paper (and in any design that supports node failure), each node is
holding a view of other tasks in the system. When a new node joins, they
receive a list of tasks. There's a protocol for keeping the task lists
synchronized (multicast add and remove, for example), and if we want to
scale either in # of tasks or # of nodes, we need to partition the tasks
so that each node contains a subset of tasks, while still ensuring that
each task is stored on some number of nodes and that when one of those
nodes goes down, that data is assigned to another node…
Agreed. But as I said above, this could be done by computing a hash over
the task and sending the work to a subset of the cluster (with N
unicasts) rather than a multicast.
Or you could multicast the task and only the nodes which match the
hash(task_id) would store the task... the former is probably more
efficient, the latter simpler to implement. Although either case doesn't
sound like it's overly complicated to implement.
On the other hand, I think you're arguing that a task cache is
doing
more than just distributing the data, and a simpler task broadcast is
more appropriate. I don't know all the things that Infinispan is doing
under the covers to optimize for a read-heavy workload, so I'll have to
take your word for it.
I wrote JBossCache (Infinispan's predecessor) and my design was geared
towards read-heavy workloads. TBH, I just *assumed* but didn't really
know whether this is still the case with Infinispan, perhaps someone
from the Infinispan team can chime in ?
When I find some more time, I can dust off my JGroups-based prototype
of
the same algorithm and run some more performance benchmarks. In very
preliminary tests, the task-cache-based design and the JGroups-based
design were very similar in performance. I'll need to run more extensive
tests before I can really quantify the overhead, though. And I'm sure
the JGroups-based design could be made more efficient.
Yes, see above
Thanks a lot for the feedback -- I really appreciate it.
NP, cheers,
--
Bela Ban, JGroups lead (
http://www.jgroups.org)