[infinispan-dev] TopologySafe Map / Reduce

Emmanuel Bernard emmanuel at hibernate.org
Fri Oct 10 11:49:48 EDT 2014


When wrestling with the subject, here is what I had in mind.

The M/R coordinator node sends the M task per segment on the node where
the segment is primary.
Each "per-segment" M task is executed and is offered the way to push
intermediary results in a temp cache.
The intermediary results are stored with a composite key [imtermKey-i, seg-j].
The M/R coordinator waits for all M tasks to return. If one does not
(timeout, rehash), the following happens:
- delete [intermKey-i, seg-i] (that operation could be handled by the
  new per-segment M before the map task is effectively started)
- ship the M task for that segment-i to the new primary owner of
  segment-i

When all M tasks are received the Reduce phase will read all [intermKey-i, *]
keys and reduce them.
Note that if the reduction phase is itself distributed, we could apply
the same key per segment and shipping split for these.

Again the tricky part is to expose the ability to write to intermediary
caches per segment without exposing segments per se as well as let
someone see a concatenated view if intermKey-i from all segments subkeys
during reduction.

Thoughts?

Dan, I did not quite get what alternative approach you wanted to
propose. Care to respin it for a slow brain? :)

Emmanuel

On Fri 2014-10-10 10:03, Dan Berindei wrote:
> > > I'd rather not expose this to the user. Instead, we could split the
> > > intermediary values for each key by the source segment, and do the
> > > invalidation of the retried segments in our M/R framework (e.g. when we
> > > detect that the primary owner at the start of the map/combine phase is
> > > not an owner at all at the end).
> > >
> > > I think we have another problem with the publishing of intermediary
> > > values not being idempotent. The default configuration for the
> > > intermediate cache is non-transactional, and retrying the put(delta)
> > > command after a topology change could add the same intermediate values
> > > twice. A transactional intermediary cache should be safe, though,
> > > because the tx won't commit on the old owner until the new owner knows
> > > about the tx.
> >
> > can you elaborate on it?
> >
> 
> say we have a cache with numOwners=2, owners(k) = [A, B]
> C will become the primary owner of k, but for now owners(k) = [A, B, C]
> O sends put(delta) to A (the primary)
> A sends put(delta) to B, C
> B sees a topology change (owners(k) = [C, B]), doesn't apply the delta and
> replies with an OutdatedTopologyException
> C applies the delta
> A resends put(delta) to C (new primary)
> C sends put(delta) to B, applies the delta again
> 
> I think it could be solved with versions, I just wanted to point out that
> we don't do that now.
> 
> 
> >
> > anyway, I think the retry mechanism should solve it. If we detect a
> > topology change (during the iteration of segment _i_) and the segment
> > _i_ is moved, then we can cancel the iteration, remove all the
> > intermediate values generated in segment _i_ and restart (on the primary
> > owner).
> >
> 
> The problem is that the intermediate keys aren't in the same segment: we
> want the reduce phase to access only keys local to the reducing node, and
> keys in different input segments can yield values for the same intermediate
> key. So like you say, we'd have to retry on every topology change in the
> intermediary cache, not just the ones affecting segment _i_.
> 
> There's another complication: in the scenario above, O may only get the
> topology update with owners(k) = [C, B] after the map/combine phase
> completed. So the originator of the M/R job would have to watch for
> topology changes seen by any node, and invalidate/retry any input segments
> that could have been affected. All that without slowing down the
> no-topology-change case too much...
> 
> >
> > >
> > >     >
> > >     > But before getting ahead of ourselves, what do you thing of the
> > general idea? Even without retry framework, this approach would be more
> > stable than our current per node approach during topology changes and
> > improve dependability.
> > >
> > >     Doing it solely based on segment would remove the possibility of
> > >     having duplicates.  However without a mechanism to send a new request
> > >     on rehash it would be possible to only find a subset of values (if a
> > >     segment is removed while iterating on it).


More information about the infinispan-dev mailing list