[infinispan-dev] Distributed Streams

Radim Vansa rvansa at redhat.com
Thu Jul 9 11:33:31 EDT 2015


On 07/09/2015 02:49 PM, William Burns wrote:
>
>
> On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berindei at gmail.com 
> <mailto:dan.berindei at gmail.com>> wrote:
>
>     Hi Will
>
>     After the discussion we started in Galder's PR's comments [1], I
>     started thinking that we should really have a stream() method directly
>     in the Cache/AdvancedCache interface.
>
>     I feel entrySet(), keySet() or values() encourage users to use
>     external iteration with a for-each loop or iterator(), and we should
>     encourage users to use the Stream methods instead. I also really don't
>     like the idea of users having to close their iterators explicitly, and
>     lazy Iterators (or Spliterators) need to be properly closed or they
>     will leak resources.
>
>
> The iterator and spliterator are automatically closed if it was fully 
> iterated upon.
>
> I don't think pulling all entries from the cluster (and loader) in for 
> entrySet, keySet or values is a good idea. Unless you are suggesting 
> that we only pull local entries only?  In which case we have reverted 
> these changes back to ISPN 6 and older.
>
> The entrySet, keySet and values as of ISPN 7.0 are actually completely 
> backing collections and methods are evaluated per invocation.  This 
> means any updates to them or the cache it was created from are seen by 
> each other.
>
>
>     My suggestion, then, is to make entrySet().iterator() and
>     entrySet().spliterator() eager, so that they don't need to implement
>     AutoCloseable. I would even go as far as to say that entrySet() should
>     be eager itself, but maybe keySet().stream() would make a better API
>     than adding a new keysStream() method.
>
>
> Just so I understand you are more saying that we leave the entrySet, 
> keySet and values the way they are so they are backing collections, 
> however invocation of the iterator or spliterator would pull in all 
> entries from the entire cache into memory at once?  It seems throwing 
> UnsupportedOperationException with a message stating to use 
> stream().iterator() and closing the stream would be better imo 
> (however that would preclude the usage of foreach). Note the foreach 
> loop is only an issue when iterating over that collection and you 
> break out of the loop early.
>
> try (Stream<Map.Entry<K, V> stream = entrySet.stream()) {
>    Iterator<Map.Entry<K, V>> iterator = stream.iterator();
> }
>
> Actually I think the issue here is that our CacheCollections don't 
> currently implement CloseableIterable like the EntryIterable does.  In 
> that case you can do a simple foreach loop with a break in a try with 
> resource.  We could then document that close closes any iterators or 
> spliterators that were created from this instance of the collection.
>
> It is a little awkward, but could work this way.
>
> try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) {
>    for (Map.Entry<K, V> entry : closeableEntrySet) {
>    }
> }

What resources is the iterator actually holding? If that is just memory, 
could we do some autoclose magic with phantom references?

>
>
>     Now to your questions:
>
>     1)
>     forEach() doesn't allow the consumer to modify the entries, so I think
>     the most common use case would be doing something with a captured
>     variable (or System.out). 
>
>
> This is actually something I didn't cover in the document.  But upon 
> thinking about this more I was thinking we probably want to allow for 
> CDI Injection of the cache for the consumer action before firing.  In 
> this way the user can change values as they want.  This would behave 
> almost identically to map/reduce, however it is much more 
> understandable imo.
>
>     So I would make forEach execute the consumer
>     on the originator, and maybe add a distributedForEach method that
>     executes its consumer on each owner (accepting that the consumer may
>     be executed twice for some keys, or never, if the originator crashes).
>     distributedForEach probably makes more sense once you start injecting
>     the Cache (or other components) in the remote Consumers.
>
>
> This was my conundrum before, however I believe I found a happy 
> medium.  I figured if we implement it distributed gives more 
> flexibility.  The user can still choose to run it locally as they desire.
>
> For example you can call 
> *.stream().iterator().forEachRemaining(consumer) if you wanted to do a 
> forEach locally in a single thread.  And if you wanted it parallelized 
> you can do
> StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)

*.stream().distributedForEach(serializable consumer) looks much more 
obvious than ^. Despite that it would be documented.


>
> This would all be documented on the forEach method.
>
>
>     peek()'s intended use case is probably logging progress, so it will
>     definitely need to interact with an external component. However,
>     executing it to the originator would potentially change the execution
>     of the stream dramatically, and adding logging shouldn't have that
>     kind of impact. So peek() should be executed on the remote nodes, even
>     if we don't have remote injection yet.
>
>
> This is how I ended up doing it was to have it done remotely.
>
>
>     2)
>     I would say implement sorting on the originator from the beginning,
>     and limit() and skip() as well. It's true that users may me
>     disappointed to see adding limit() doesn't improve the performance of
>     their sorted() execution, but I would rather have a complete API
>     available for applications who don't need to sort the entire cache.
>
>
> This is how I did this as well :)  Basically if we find that there is 
> a sorted, distributed, limit or skip it performs all of the 
> intermediate operations up that point then uses an iterator to bring 
> the results back locally where it can be performed.  Limit and 
> distinct are also actually performed remotely first to reduce how many 
> results are returned.  I am not 100% sold on performing distinct 
> remotely first as it could actually be significantly slower, but it 
> should hopefully reduce some memory usage :P

Probably not in the first implementation, but sort should be performed 
on each node remotely, and then the local node should do just n-way 
merge. That way, you can apply skip & limit on remote nodes and then 
again on the reduced merged set, dramatically reducing bandwith. Not 
sure about NoSQL use cases, but in classical DBs the top-N query is 
quite frequent operation, afaik.

My 2c

Radim

>
>     Cheers
>     Dan
>
>
>     [1]
>     https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22
>
>     On Wed, May 27, 2015 at 9:52 PM, William Burns
>     <mudokonman at gmail.com <mailto:mudokonman at gmail.com>> wrote:
>     > Hello everyone,
>     >
>     > I wanted to let you know I wrote up a design documenting the
>     successor to
>     > EntryRetriever, Distributed Streams [1] !
>     >
>     > Any comments or feedback would be much appreciated.
>     >
>     > I especially would like targeted feedback regarding:
>     >
>     > 1. The operators forEach and peek may want to be ran locally. 
>     Should we
>     > have an overridden method so users can pick which they want? 
>     Personally I
>     > feel that peek is too specific to matter and forEach can always
>     be done by
>     > the caller locally if desired.
>     > 2. The intermediate operators limit and skip do not seem worth
>     implementing
>     > unless we have sorting support. (Sorting support will come
>     later).  I am
>     > thinking to not support these until sorting is added.
>     >
>     > Thanks,
>     >
>     >  - Will
>     >
>     > [1]
>     https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
>     >
>     > _______________________________________________
>     > infinispan-dev mailing list
>     > infinispan-dev at lists.jboss.org
>     <mailto:infinispan-dev at lists.jboss.org>
>     > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>     _______________________________________________
>     infinispan-dev mailing list
>     infinispan-dev at lists.jboss.org <mailto:infinispan-dev at lists.jboss.org>
>     https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


-- 
Radim Vansa <rvansa at redhat.com>
JBoss Performance Team



More information about the infinispan-dev mailing list