[infinispan-dev] TopologySafe Map / Reduce

Dan Berindei dan.berindei at gmail.com
Fri Oct 10 03:03:37 EDT 2014


On Fri, Oct 10, 2014 at 12:16 AM, Pedro Ruivo <pedro at infinispan.org> wrote:

>
>
> On 10/09/2014 04:41 PM, Dan Berindei wrote:
> >
> >
> > On Thu, Oct 9, 2014 at 3:40 PM, William Burns <mudokonman at gmail.com
> > <mailto:mudokonman at gmail.com>> wrote:
> >
> >     Actually this was something I was hoping to get to possibly in the
> >     near future.
> >
> >     I already have to do https://issues.jboss.org/browse/ISPN-4358 which
> >     will require rewriting parts of the distributed entry iterator.  In
> >     doing so I was planning on breaking this out to a more generic
> >     framework where you could run a given operation by segment
> >     guaranteeing it was only ran once per entry.  In doing so I was
> >     thinking I could try to move M/R on top of this to allow it to also
> be
> >     resilient to rehash events.
> >
> >     Additional comments inline.
> >
> >     On Thu, Oct 9, 2014 at 8:18 AM, Emmanuel Bernard
> >     <emmanuel at hibernate.org <mailto:emmanuel at hibernate.org>> wrote:
> >     > Pedro and I have been having discussions with the LEADS guys on
> their experience of Map / Reduce especially around stability during
> topology changes.
> >     >
> >     > This ties to the .size() thread you guys have been exchanging on
> (I only could read it partially).
> >     >
> >     > On the requirements, theirs is pretty straightforward and expected
> I think from most users.
> >     > They are fine with inconsistencies with entries
> create/updated/deleted between the M/R start and the end.
> >
> >     There is no way we can fix this without adding a very strict
> isolation
> >     level like SERIALIZABLE.
> >
> >
> >     > They are *not* fine with seeing the same key/value several time
> for the duration of the M/R execution. This AFAIK can happen when a
> topology change occurs.
> >
> >     This can happen if it was processed on one node and then rehash
> >     migrates the entry to another and runs it there.
> >
> >     >
> >     > Here is a proposal.
> >     > Why not run the M/R job not per node but rather per segment?
> >     > The point is that segments are stable across topology changes. The
> M/R tasks would then be about iterating over the keys in a given segment.
> >     >
> >     > The M/R request would send the task per segments on each node
> where the segment is primary.
> >
> >     This is exactly what the iterator does today but also watches for
> >     rehashes to send the request to a new owner when the segment moves
> >     between nodes.
> >
> >     > (We can imagine interesting things like sending it to one of the
> backups for workload optimization purposes or sending it to both primary
> and backups and to comparisons).
> >     > The M/R requester would be in an interesting situation. It could
> detect that a segment M/R never returns and trigger a new computation on
> another node than the one initially sent.
> >     >
> >     > One tricky question around that is when the M/R job store data in
> an intermediary state. We need some sort of way to expose the user
> indirectly to segments so that we can evict per segment intermediary caches
> in case of failure or retry.
> >
> >     This was one place I was thinking I would need to take special care
> to
> >     look into when doing a conversion like this.
> >
> >
> > I'd rather not expose this to the user. Instead, we could split the
> > intermediary values for each key by the source segment, and do the
> > invalidation of the retried segments in our M/R framework (e.g. when we
> > detect that the primary owner at the start of the map/combine phase is
> > not an owner at all at the end).
> >
> > I think we have another problem with the publishing of intermediary
> > values not being idempotent. The default configuration for the
> > intermediate cache is non-transactional, and retrying the put(delta)
> > command after a topology change could add the same intermediate values
> > twice. A transactional intermediary cache should be safe, though,
> > because the tx won't commit on the old owner until the new owner knows
> > about the tx.
>
> can you elaborate on it?
>

say we have a cache with numOwners=2, owners(k) = [A, B]
C will become the primary owner of k, but for now owners(k) = [A, B, C]
O sends put(delta) to A (the primary)
A sends put(delta) to B, C
B sees a topology change (owners(k) = [C, B]), doesn't apply the delta and
replies with an OutdatedTopologyException
C applies the delta
A resends put(delta) to C (new primary)
C sends put(delta) to B, applies the delta again

I think it could be solved with versions, I just wanted to point out that
we don't do that now.


>
> anyway, I think the retry mechanism should solve it. If we detect a
> topology change (during the iteration of segment _i_) and the segment
> _i_ is moved, then we can cancel the iteration, remove all the
> intermediate values generated in segment _i_ and restart (on the primary
> owner).
>

The problem is that the intermediate keys aren't in the same segment: we
want the reduce phase to access only keys local to the reducing node, and
keys in different input segments can yield values for the same intermediate
key. So like you say, we'd have to retry on every topology change in the
intermediary cache, not just the ones affecting segment _i_.

There's another complication: in the scenario above, O may only get the
topology update with owners(k) = [C, B] after the map/combine phase
completed. So the originator of the M/R job would have to watch for
topology changes seen by any node, and invalidate/retry any input segments
that could have been affected. All that without slowing down the
no-topology-change case too much...

>
> >
> >     >
> >     > But before getting ahead of ourselves, what do you thing of the
> general idea? Even without retry framework, this approach would be more
> stable than our current per node approach during topology changes and
> improve dependability.
> >
> >     Doing it solely based on segment would remove the possibility of
> >     having duplicates.  However without a mechanism to send a new request
> >     on rehash it would be possible to only find a subset of values (if a
> >     segment is removed while iterating on it).
> >
> >      >
> >      > Emmanuel
> >      > _______________________________________________
> >      > infinispan-dev mailing list
> >      > infinispan-dev at lists.jboss.org
> >     <mailto:infinispan-dev at lists.jboss.org>
> >      > https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >     _______________________________________________
> >     infinispan-dev mailing list
> >     infinispan-dev at lists.jboss.org <mailto:
> infinispan-dev at lists.jboss.org>
> >     https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >
> >
> >
> >
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev at lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20141010/b324ae41/attachment-0001.html 


More information about the infinispan-dev mailing list