[infinispan-dev] Distributed Streams

William Burns mudokonman at gmail.com
Thu Jul 9 12:09:02 EDT 2015


On Thu, Jul 9, 2015 at 11:33 AM Radim Vansa <rvansa at redhat.com> wrote:

> On 07/09/2015 02:49 PM, William Burns wrote:
> >
> >
> > On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berindei at gmail.com
> > <mailto:dan.berindei at gmail.com>> wrote:
> >
> >     Hi Will
> >
> >     After the discussion we started in Galder's PR's comments [1], I
> >     started thinking that we should really have a stream() method
> directly
> >     in the Cache/AdvancedCache interface.
> >
> >     I feel entrySet(), keySet() or values() encourage users to use
> >     external iteration with a for-each loop or iterator(), and we should
> >     encourage users to use the Stream methods instead. I also really
> don't
> >     like the idea of users having to close their iterators explicitly,
> and
> >     lazy Iterators (or Spliterators) need to be properly closed or they
> >     will leak resources.
> >
> >
> > The iterator and spliterator are automatically closed if it was fully
> > iterated upon.
> >
> > I don't think pulling all entries from the cluster (and loader) in for
> > entrySet, keySet or values is a good idea. Unless you are suggesting
> > that we only pull local entries only?  In which case we have reverted
> > these changes back to ISPN 6 and older.
> >
> > The entrySet, keySet and values as of ISPN 7.0 are actually completely
> > backing collections and methods are evaluated per invocation.  This
> > means any updates to them or the cache it was created from are seen by
> > each other.
> >
> >
> >     My suggestion, then, is to make entrySet().iterator() and
> >     entrySet().spliterator() eager, so that they don't need to implement
> >     AutoCloseable. I would even go as far as to say that entrySet()
> should
> >     be eager itself, but maybe keySet().stream() would make a better API
> >     than adding a new keysStream() method.
> >
> >
> > Just so I understand you are more saying that we leave the entrySet,
> > keySet and values the way they are so they are backing collections,
> > however invocation of the iterator or spliterator would pull in all
> > entries from the entire cache into memory at once?  It seems throwing
> > UnsupportedOperationException with a message stating to use
> > stream().iterator() and closing the stream would be better imo
> > (however that would preclude the usage of foreach). Note the foreach
> > loop is only an issue when iterating over that collection and you
> > break out of the loop early.
> >
> > try (Stream<Map.Entry<K, V> stream = entrySet.stream()) {
> >    Iterator<Map.Entry<K, V>> iterator = stream.iterator();
> > }
> >
> > Actually I think the issue here is that our CacheCollections don't
> > currently implement CloseableIterable like the EntryIterable does.  In
> > that case you can do a simple foreach loop with a break in a try with
> > resource.  We could then document that close closes any iterators or
> > spliterators that were created from this instance of the collection.
> >
> > It is a little awkward, but could work this way.
> >
> > try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) {
> >    for (Map.Entry<K, V> entry : closeableEntrySet) {
> >    }
> > }
>
> What resources is the iterator actually holding? If that is just memory,
> could we do some autoclose magic with phantom references?
>

The iterator will hold onto a thread on various machines while it is
processing return values.  There is a finalizer, but we all know we can't
trust that to run anytime soon.

I have thought of a different implementation that would not have this
limitatiion, but there is definitely not time for 8.0 to do that.  Also I
am not confident on the performance of it, but who knows :)


>
> >
> >
> >     Now to your questions:
> >
> >     1)
> >     forEach() doesn't allow the consumer to modify the entries, so I
> think
> >     the most common use case would be doing something with a captured
> >     variable (or System.out).
> >
> >
> > This is actually something I didn't cover in the document.  But upon
> > thinking about this more I was thinking we probably want to allow for
> > CDI Injection of the cache for the consumer action before firing.  In
> > this way the user can change values as they want.  This would behave
> > almost identically to map/reduce, however it is much more
> > understandable imo.
> >
> >     So I would make forEach execute the consumer
> >     on the originator, and maybe add a distributedForEach method that
> >     executes its consumer on each owner (accepting that the consumer may
> >     be executed twice for some keys, or never, if the originator
> crashes).
> >     distributedForEach probably makes more sense once you start injecting
> >     the Cache (or other components) in the remote Consumers.
> >
> >
> > This was my conundrum before, however I believe I found a happy
> > medium.  I figured if we implement it distributed gives more
> > flexibility.  The user can still choose to run it locally as they desire.
> >
> > For example you can call
> > *.stream().iterator().forEachRemaining(consumer) if you wanted to do a
> > forEach locally in a single thread.  And if you wanted it parallelized
> > you can do
> > StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)
>
> *.stream().distributedForEach(serializable consumer) looks much more
> obvious than ^. Despite that it would be documented.
>

The method looks very similar to the single threaded example to me.

I personally think we should be drawing people to using the distributed
forEach by default, not the other way around at least.  The local ones will
be orders of magnitude slower due to having to pull the entire contents to
the local node.

One issue with adding a new method that is designed to be a terminal or
intermediate operation is that Stream uses builder type pattern and returns
Stream.  Currently I don't mess with that as the new methods are only
available on the original CacheStream, however due to how Stream is typed
you can't override it by default to return CacheStream, which will add
quite a bit more bloat to start adding custom methods (which we may want to
do).


>
>
> >
> > This would all be documented on the forEach method.
> >
> >
> >     peek()'s intended use case is probably logging progress, so it will
> >     definitely need to interact with an external component. However,
> >     executing it to the originator would potentially change the execution
> >     of the stream dramatically, and adding logging shouldn't have that
> >     kind of impact. So peek() should be executed on the remote nodes,
> even
> >     if we don't have remote injection yet.
> >
> >
> > This is how I ended up doing it was to have it done remotely.
> >
> >
> >     2)
> >     I would say implement sorting on the originator from the beginning,
> >     and limit() and skip() as well. It's true that users may me
> >     disappointed to see adding limit() doesn't improve the performance of
> >     their sorted() execution, but I would rather have a complete API
> >     available for applications who don't need to sort the entire cache.
> >
> >
> > This is how I did this as well :)  Basically if we find that there is
> > a sorted, distributed, limit or skip it performs all of the
> > intermediate operations up that point then uses an iterator to bring
> > the results back locally where it can be performed.  Limit and
> > distinct are also actually performed remotely first to reduce how many
> > results are returned.  I am not 100% sold on performing distinct
> > remotely first as it could actually be significantly slower, but it
> > should hopefully reduce some memory usage :P
>
> Probably not in the first implementation, but sort should be performed
> on each node remotely, and then the local node should do just n-way
> merge. That way, you can apply skip & limit on remote nodes and then
>

That is already planned to add for sorting with
https://issues.jboss.org/browse/ISPN-4358.

You already replied about skip, so I don't have to mention that one :)

Limit is already applied on remote nodes, it just may bring in (limit * n)
where n is the number of nodes and is locally limited to finally ensure
proper limiting.  It however doesn't support limit ran remotely after a
sort remotely.  I can only do this in the case of there being no
map/flatMap operation between them otherwise I would lose the key we are
sorting on.  It could be a use case to cover though.


> again on the reduced merged set, dramatically reducing bandwith. Not
> sure about NoSQL use cases, but in classical DBs the top-N query is
> quite frequent operation, afaik.
>
> My 2c
>
> Radim
>
> >
> >     Cheers
> >     Dan
> >
> >
> >     [1]
> >
> https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22
> >
> >     On Wed, May 27, 2015 at 9:52 PM, William Burns
> >     <mudokonman at gmail.com <mailto:mudokonman at gmail.com>> wrote:
> >     > Hello everyone,
> >     >
> >     > I wanted to let you know I wrote up a design documenting the
> >     successor to
> >     > EntryRetriever, Distributed Streams [1] !
> >     >
> >     > Any comments or feedback would be much appreciated.
> >     >
> >     > I especially would like targeted feedback regarding:
> >     >
> >     > 1. The operators forEach and peek may want to be ran locally.
> >     Should we
> >     > have an overridden method so users can pick which they want?
> >     Personally I
> >     > feel that peek is too specific to matter and forEach can always
> >     be done by
> >     > the caller locally if desired.
> >     > 2. The intermediate operators limit and skip do not seem worth
> >     implementing
> >     > unless we have sorting support. (Sorting support will come
> >     later).  I am
> >     > thinking to not support these until sorting is added.
> >     >
> >     > Thanks,
> >     >
> >     >  - Will
> >     >
> >     > [1]
> >
> https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
> >     >
> >     > _______________________________________________
> >     > infinispan-dev mailing list
> >     > infinispan-dev at lists.jboss.org
> >     <mailto:infinispan-dev at lists.jboss.org>
> >     > https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >     _______________________________________________
> >     infinispan-dev mailing list
> >     infinispan-dev at lists.jboss.org <mailto:
> infinispan-dev at lists.jboss.org>
> >     https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >
> >
> >
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev at lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> --
> Radim Vansa <rvansa at redhat.com>
> JBoss Performance Team
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20150709/c8c5be99/attachment-0001.html 


More information about the infinispan-dev mailing list