[infinispan-dev] Distributed Streams
Radim Vansa
rvansa at redhat.com
Thu Jul 9 11:33:31 EDT 2015
On 07/09/2015 02:49 PM, William Burns wrote:
>
>
> On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berindei at gmail.com
> <mailto:dan.berindei at gmail.com>> wrote:
>
> Hi Will
>
> After the discussion we started in Galder's PR's comments [1], I
> started thinking that we should really have a stream() method directly
> in the Cache/AdvancedCache interface.
>
> I feel entrySet(), keySet() or values() encourage users to use
> external iteration with a for-each loop or iterator(), and we should
> encourage users to use the Stream methods instead. I also really don't
> like the idea of users having to close their iterators explicitly, and
> lazy Iterators (or Spliterators) need to be properly closed or they
> will leak resources.
>
>
> The iterator and spliterator are automatically closed if it was fully
> iterated upon.
>
> I don't think pulling all entries from the cluster (and loader) in for
> entrySet, keySet or values is a good idea. Unless you are suggesting
> that we only pull local entries only? In which case we have reverted
> these changes back to ISPN 6 and older.
>
> The entrySet, keySet and values as of ISPN 7.0 are actually completely
> backing collections and methods are evaluated per invocation. This
> means any updates to them or the cache it was created from are seen by
> each other.
>
>
> My suggestion, then, is to make entrySet().iterator() and
> entrySet().spliterator() eager, so that they don't need to implement
> AutoCloseable. I would even go as far as to say that entrySet() should
> be eager itself, but maybe keySet().stream() would make a better API
> than adding a new keysStream() method.
>
>
> Just so I understand you are more saying that we leave the entrySet,
> keySet and values the way they are so they are backing collections,
> however invocation of the iterator or spliterator would pull in all
> entries from the entire cache into memory at once? It seems throwing
> UnsupportedOperationException with a message stating to use
> stream().iterator() and closing the stream would be better imo
> (however that would preclude the usage of foreach). Note the foreach
> loop is only an issue when iterating over that collection and you
> break out of the loop early.
>
> try (Stream<Map.Entry<K, V> stream = entrySet.stream()) {
> Iterator<Map.Entry<K, V>> iterator = stream.iterator();
> }
>
> Actually I think the issue here is that our CacheCollections don't
> currently implement CloseableIterable like the EntryIterable does. In
> that case you can do a simple foreach loop with a break in a try with
> resource. We could then document that close closes any iterators or
> spliterators that were created from this instance of the collection.
>
> It is a little awkward, but could work this way.
>
> try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) {
> for (Map.Entry<K, V> entry : closeableEntrySet) {
> }
> }
What resources is the iterator actually holding? If that is just memory,
could we do some autoclose magic with phantom references?
>
>
> Now to your questions:
>
> 1)
> forEach() doesn't allow the consumer to modify the entries, so I think
> the most common use case would be doing something with a captured
> variable (or System.out).
>
>
> This is actually something I didn't cover in the document. But upon
> thinking about this more I was thinking we probably want to allow for
> CDI Injection of the cache for the consumer action before firing. In
> this way the user can change values as they want. This would behave
> almost identically to map/reduce, however it is much more
> understandable imo.
>
> So I would make forEach execute the consumer
> on the originator, and maybe add a distributedForEach method that
> executes its consumer on each owner (accepting that the consumer may
> be executed twice for some keys, or never, if the originator crashes).
> distributedForEach probably makes more sense once you start injecting
> the Cache (or other components) in the remote Consumers.
>
>
> This was my conundrum before, however I believe I found a happy
> medium. I figured if we implement it distributed gives more
> flexibility. The user can still choose to run it locally as they desire.
>
> For example you can call
> *.stream().iterator().forEachRemaining(consumer) if you wanted to do a
> forEach locally in a single thread. And if you wanted it parallelized
> you can do
> StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)
*.stream().distributedForEach(serializable consumer) looks much more
obvious than ^. Despite that it would be documented.
>
> This would all be documented on the forEach method.
>
>
> peek()'s intended use case is probably logging progress, so it will
> definitely need to interact with an external component. However,
> executing it to the originator would potentially change the execution
> of the stream dramatically, and adding logging shouldn't have that
> kind of impact. So peek() should be executed on the remote nodes, even
> if we don't have remote injection yet.
>
>
> This is how I ended up doing it was to have it done remotely.
>
>
> 2)
> I would say implement sorting on the originator from the beginning,
> and limit() and skip() as well. It's true that users may me
> disappointed to see adding limit() doesn't improve the performance of
> their sorted() execution, but I would rather have a complete API
> available for applications who don't need to sort the entire cache.
>
>
> This is how I did this as well :) Basically if we find that there is
> a sorted, distributed, limit or skip it performs all of the
> intermediate operations up that point then uses an iterator to bring
> the results back locally where it can be performed. Limit and
> distinct are also actually performed remotely first to reduce how many
> results are returned. I am not 100% sold on performing distinct
> remotely first as it could actually be significantly slower, but it
> should hopefully reduce some memory usage :P
Probably not in the first implementation, but sort should be performed
on each node remotely, and then the local node should do just n-way
merge. That way, you can apply skip & limit on remote nodes and then
again on the reduced merged set, dramatically reducing bandwith. Not
sure about NoSQL use cases, but in classical DBs the top-N query is
quite frequent operation, afaik.
My 2c
Radim
>
> Cheers
> Dan
>
>
> [1]
> https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22
>
> On Wed, May 27, 2015 at 9:52 PM, William Burns
> <mudokonman at gmail.com <mailto:mudokonman at gmail.com>> wrote:
> > Hello everyone,
> >
> > I wanted to let you know I wrote up a design documenting the
> successor to
> > EntryRetriever, Distributed Streams [1] !
> >
> > Any comments or feedback would be much appreciated.
> >
> > I especially would like targeted feedback regarding:
> >
> > 1. The operators forEach and peek may want to be ran locally.
> Should we
> > have an overridden method so users can pick which they want?
> Personally I
> > feel that peek is too specific to matter and forEach can always
> be done by
> > the caller locally if desired.
> > 2. The intermediate operators limit and skip do not seem worth
> implementing
> > unless we have sorting support. (Sorting support will come
> later). I am
> > thinking to not support these until sorting is added.
> >
> > Thanks,
> >
> > - Will
> >
> > [1]
> https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
> >
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev at lists.jboss.org
> <mailto:infinispan-dev at lists.jboss.org>
> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org <mailto:infinispan-dev at lists.jboss.org>
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
--
Radim Vansa <rvansa at redhat.com>
JBoss Performance Team
More information about the infinispan-dev
mailing list