[infinispan-dev] TopologySafe Map / Reduce

Dan Berindei dan.berindei at gmail.com
Mon Oct 13 04:45:42 EDT 2014


On Fri, Oct 10, 2014 at 6:49 PM, Emmanuel Bernard <emmanuel at hibernate.org>
wrote:

> When wrestling with the subject, here is what I had in mind.
>
> The M/R coordinator node sends the M task per segment on the node where
> the segment is primary.
>

What's M? Is it just a shorthand for "map", or is it a new parameter that
controls the number of map/combine tasks sent at once?


> Each "per-segment" M task is executed and is offered the way to push
> intermediary results in a temp cache.
>

Just to be clear, the user-provided mapper and combiner don't know anything
about the intermediary cache (which doesn't have to be temporary, if it's
shared by all M/R tasks). They only interact with the Collector interface.
The map/combine task on the other hand is our code, and it deals with the
intermediary cache directly.


> The intermediary results are stored with a composite key [imtermKey-i,
> seg-j].
> The M/R coordinator waits for all M tasks to return. If one does not
> (timeout, rehash), the following happens:
>

We can't allow time out map tasks, or they will keep writing to the
intermediate cache in parallel with the retried tasks. So the originator
has to wait for a response from each node to which it sent a map task.


> - delete [intermKey-i, seg-i] (that operation could be handled by the
>   new per-segment M before the map task is effectively started)
> - ship the M task for that segment-i to the new primary owner of
>   segment-i
>
> When all M tasks are received the Reduce phase will read all [intermKey-i,
> *]
> keys and reduce them.
>
Note that if the reduction phase is itself distributed, we could apply
> the same key per segment and shipping split for these.
>

Sure, we have to retry reduce tasks when the primary owner changes, and it
makes sense to retry as little as possible.


>
> Again the tricky part is to expose the ability to write to intermediary
> caches per segment without exposing segments per se as well as let
> someone see a concatenated view if intermKey-i from all segments subkeys
> during reduction.
>

Writing to and reading from the intermediate cache is already abstracted
from user code (in the Mapper and Reducer interfaces). So we don't need to
worry about exposing extra details to the user.


>
> Thoughts?
>
> Dan, I did not quite get what alternative approach you wanted to
> propose. Care to respin it for a slow brain? :)
>

I think where we differ is that I don't think user code needs to know about
how we store the intermediate values and what we retry, as long as their
mappers/combiners/reducers don't have side effects.

Otherwise I was thinking on the same lines: send 1 map/combine task for
each segment (maybe with a cap on the number of segments being processed at
the same time on each node), split the intermediate values per input
segment, cancel+retry each map task if the topology changes and the
executing node is no longer an owner. If the reduce phase is distributed,
run 1 reduce task per segment as well, and cancel+retry the reduce task if
the executing node is no longer an owner.

I had some ideas about assigning each map/combine phase a UUID and making
the intermediate keys [intermKey, seg, mctask] to allow the originator to
retry a map/combine task without waiting for the previous one to finish,
but I don't think I mentioned that before :)

There are also some details that I'm worried about:

1) If the reduce phase is distributed, and the intermediate cache is
non-transactional, any topology change in the intermediate cache will
require us to retry all the map/combine tasks that were running at the time
on any node (even if some nodes did not detect the topology change yet). So
it would make sense to limit the number of map/combine tasks that are
processed at one time, in order to limit the amount of tasks we retry (OR
require the intermediate cache to be transactional).

2) Running a separate map/combine task for each segment is not really an
option until we implement the the segment-aware data container and cache
stores. Without that change, it will make everything much slower, because
of all the extra iterations for each segment.

3) And finally, all this will be overkill when the input cache is small,
and the time needed to process the data is comparable to the time needed to
send all those extra RPCs.

So I'm thinking it might be better to adopt Vladimir's suggestion to retry
everything if we detect a topology change in the input and/or intermediate
cache at the end of the M/R task, at least in the first phase.

Cheers
Dan



>
> Emmanuel
>
> On Fri 2014-10-10 10:03, Dan Berindei wrote:
> > > > I'd rather not expose this to the user. Instead, we could split the
> > > > intermediary values for each key by the source segment, and do the
> > > > invalidation of the retried segments in our M/R framework (e.g. when
> we
> > > > detect that the primary owner at the start of the map/combine phase
> is
> > > > not an owner at all at the end).
> > > >
> > > > I think we have another problem with the publishing of intermediary
> > > > values not being idempotent. The default configuration for the
> > > > intermediate cache is non-transactional, and retrying the put(delta)
> > > > command after a topology change could add the same intermediate
> values
> > > > twice. A transactional intermediary cache should be safe, though,
> > > > because the tx won't commit on the old owner until the new owner
> knows
> > > > about the tx.
> > >
> > > can you elaborate on it?
> > >
> >
> > say we have a cache with numOwners=2, owners(k) = [A, B]
> > C will become the primary owner of k, but for now owners(k) = [A, B, C]
> > O sends put(delta) to A (the primary)
> > A sends put(delta) to B, C
> > B sees a topology change (owners(k) = [C, B]), doesn't apply the delta
> and
> > replies with an OutdatedTopologyException
> > C applies the delta
> > A resends put(delta) to C (new primary)
> > C sends put(delta) to B, applies the delta again
> >
> > I think it could be solved with versions, I just wanted to point out that
> > we don't do that now.
> >
> >
> > >
> > > anyway, I think the retry mechanism should solve it. If we detect a
> > > topology change (during the iteration of segment _i_) and the segment
> > > _i_ is moved, then we can cancel the iteration, remove all the
> > > intermediate values generated in segment _i_ and restart (on the
> primary
> > > owner).
> > >
> >
> > The problem is that the intermediate keys aren't in the same segment: we
> > want the reduce phase to access only keys local to the reducing node, and
> > keys in different input segments can yield values for the same
> intermediate
> > key. So like you say, we'd have to retry on every topology change in the
> > intermediary cache, not just the ones affecting segment _i_.
> >
> > There's another complication: in the scenario above, O may only get the
> > topology update with owners(k) = [C, B] after the map/combine phase
> > completed. So the originator of the M/R job would have to watch for
> > topology changes seen by any node, and invalidate/retry any input
> segments
> > that could have been affected. All that without slowing down the
> > no-topology-change case too much...
> >
> > >
> > > >
> > > >     >
> > > >     > But before getting ahead of ourselves, what do you thing of the
> > > general idea? Even without retry framework, this approach would be more
> > > stable than our current per node approach during topology changes and
> > > improve dependability.
> > > >
> > > >     Doing it solely based on segment would remove the possibility of
> > > >     having duplicates.  However without a mechanism to send a new
> request
> > > >     on rehash it would be possible to only find a subset of values
> (if a
> > > >     segment is removed while iterating on it).
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20141013/c244ab87/attachment-0001.html 


More information about the infinispan-dev mailing list