[infinispan-dev] Distributed Streams

William Burns mudokonman at gmail.com
Thu Jul 9 12:53:19 EDT 2015


On Thu, Jul 9, 2015 at 12:28 PM Dan Berindei <dan.berindei at gmail.com> wrote:

> On Thu, Jul 9, 2015 at 3:49 PM, William Burns <mudokonman at gmail.com>
> wrote:
> >
> >
> > On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berindei at gmail.com>
> wrote:
> >>
> >> Hi Will
> >>
> >> After the discussion we started in Galder's PR's comments [1], I
> >> started thinking that we should really have a stream() method directly
> >> in the Cache/AdvancedCache interface.
> >>
> >> I feel entrySet(), keySet() or values() encourage users to use
> >> external iteration with a for-each loop or iterator(), and we should
> >> encourage users to use the Stream methods instead. I also really don't
> >> like the idea of users having to close their iterators explicitly, and
> >> lazy Iterators (or Spliterators) need to be properly closed or they
> >> will leak resources.
> >
> >
> > The iterator and spliterator are automatically closed if it was fully
> > iterated upon.
> >
> > I don't think pulling all entries from the cluster (and loader) in for
> > entrySet, keySet or values is a good idea.  Unless you are suggesting
> that
> > we only pull local entries only?  In which case we have reverted these
> > changes back to ISPN 6 and older.
> >
> > The entrySet, keySet and values as of ISPN 7.0 are actually completely
> > backing collections and methods are evaluated per invocation.  This means
> > any updates to them or the cache it was created from are seen by each
> other.
> >
>
> And the iterators are already AutoCloseable, I know. But with the
> streams API we can hide resource management from the user, so I was
> hoping we could avoid using AutoCloseable altogether.
>

I would like that very much myself as well :)  Currently the streams don't
even return CloseableIterator/CloseableSpliterator because I rely on the
user calling close on the Stream itself.


>
> >>
> >>
> >> My suggestion, then, is to make entrySet().iterator() and
> >> entrySet().spliterator() eager, so that they don't need to implement
> >> AutoCloseable. I would even go as far as to say that entrySet() should
> >> be eager itself, but maybe keySet().stream() would make a better API
> >> than adding a new keysStream() method.
> >
> >
> > Just so I understand you are more saying that we leave the entrySet,
> keySet
> > and values the way they are so they are backing collections, however
> > invocation of the iterator or spliterator would pull in all entries from
> the
> > entire cache into memory at once?  It seems throwing
>
> Yes
>

I am not a big fan of having the entire cache contents in memory on one
node.


>
> > UnsupportedOperationException with a message stating to use
> > stream().iterator() and closing the stream would be better imo (however
> that
> > would preclude the usage of foreach).  Note the foreach loop is only an
> > issue when iterating over that collection and you break out of the loop
> > early.
> >
> > try (Stream<Map.Entry<K, V> stream = entrySet.stream()) {
> >    Iterator<Map.Entry<K, V>> iterator = stream.iterator();
> > }
> >
> > Actually I think the issue here is that our CacheCollections don't
> currently
> > implement CloseableIterable like the EntryIterable does.  In that case
> you
> > can do a simple foreach loop with a break in a try with resource.  We
> could
> > then document that close closes any iterators or spliterators that were
> > created from this instance of the collection.
> >
> > It is a little awkward, but could work this way.
> >
> > try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) {
> >    for (Map.Entry<K, V> entry : closeableEntrySet) {
> >    }
> > }
>
> On the other hand, you wouldn't need any try-with-resources if you
> used the forEach method, because the resources are both acquired and
> released in the forEach call:
>
> entrySet.stream().forEach((k, v) -> { ... })
>

Guessing you just meant entrySet.forEach ?


>
> But if you make stream() or entrySet() return an AutoCloseable, then
> users will put a try-with-resources without it anyway, just in case.
> So I'd rather keep the iterator AutoCloseable than the stream/entry
> set.
>
> Making the EntryIterable implement AutoCloseable made sense in Java 7,
> because the only thing you could do with it was iterate on it, maybe
> with for-each. But in Java 8 Iterable also has a forEach method, and I
> wouldn't want users of forEach() to think about whether they need a
> try block or not.


When in doubt close is my policy.  There wouldn't be any noticable overhead
by calling close.  But I agree having it on the collections is not
favorable :(


>
> >
> >>
> >>
> >>
> >> Now to your questions:
> >>
> >> 1)
> >> forEach() doesn't allow the consumer to modify the entries, so I think
> >> the most common use case would be doing something with a captured
> >> variable (or System.out).
> >
> >
> > This is actually something I didn't cover in the document.  But upon
> > thinking about this more I was thinking we probably want to allow for CDI
> > Injection of the cache for the consumer action before firing.  In this
> way
> > the user can change values as they want.  This would behave almost
> > identically to map/reduce, however it is much more understandable imo.
> >
>
> You mean like map/reduce when it is configured to store its results in
> an intermediate cache?


I was referring to the injection part being the same.  I am not sure what
brings up the intermediate cache.


> I think you'd also need a
> CacheStream.reduceOnOwner() operation to put in the pipeline before
> forEach(), but yeah, it sounds like it should work.
>

Not sure what this method would do.  The forEach action would already be
ran on the primary owner node, unless we had an intermediate operation that
required it come local.


>
> >>
> >> So I would make forEach execute the consumer
> >> on the originator, and maybe add a distributedForEach method that
> >> executes its consumer on each owner (accepting that the consumer may
> >> be executed twice for some keys, or never, if the originator crashes).
> >> distributedForEach probably makes more sense once you start injecting
> >> the Cache (or other components) in the remote Consumers.
> >
> >
> > This was my conundrum before, however I believe I found a happy medium.
> I
> > figured if we implement it distributed gives more flexibility.  The user
> can
> > still choose to run it locally as they desire.
> >
> > For example you can call
> *.stream().iterator().forEachRemaining(consumer) if
> > you wanted to do a forEach locally in a single thread.  And if you
> wanted it
> > parallelized you can do
> > StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)
>
> Except now you have to access the spliterator directly... Will this
> work, or will the user need try-with-resources? Do we want users to
> think about it?
>
> StreamSupport.stream(*.stream().spliterator(),
> true).limit(10).forEach(consumer)
>

The user can't call close on the spliterator from the stream, they would
have to do it on the Stream.  The AutoCloseable interfaces is only added to
the iterator and spliterator on the collections returned via keySet,
entrySet, and values.


>
> >
> > This would all be documented on the forEach method.
> >
> >>
> >>
> >> peek()'s intended use case is probably logging progress, so it will
> >> definitely need to interact with an external component. However,
> >> executing it to the originator would potentially change the execution
> >> of the stream dramatically, and adding logging shouldn't have that
> >> kind of impact. So peek() should be executed on the remote nodes, even
> >> if we don't have remote injection yet.
> >
> >
> > This is how I ended up doing it was to have it done remotely.
> >
> >>
> >>
> >> 2)
> >> I would say implement sorting on the originator from the beginning,
> >> and limit() and skip() as well. It's true that users may me
> >> disappointed to see adding limit() doesn't improve the performance of
> >> their sorted() execution, but I would rather have a complete API
> >> available for applications who don't need to sort the entire cache.
> >
> >
> > This is how I did this as well :)  Basically if we find that there is a
> > sorted, distributed, limit or skip it performs all of the intermediate
> > operations up that point then uses an iterator to bring the results back
> > locally where it can be performed.  Limit and distinct are also actually
> > performed remotely first to reduce how many results are returned.  I am
> not
> > 100% sold on performing distinct remotely first as it could actually be
> > significantly slower, but it should hopefully reduce some memory usage :P
> >
>
> Shouldn't running distinct remotely actually require *more* memory,
> because now you have to keep track of unique results on each node?
> There are definitely scenarios where running it remotely will save a
> lot in network traffic, though.
>

It depends on what you mean by more.  I am thinking more total memory on a
given node.  Running remote only reads data from each remote node from
segments it primarily owns.  If any elements are found to be the same it
will reduce the overall total memory required on the originator node by the
same amount of matches.  If you do it only local you will have to have all
data from all nodes including matches in memory on node.


>
> >>
> >>
> >> Cheers
> >> Dan
> >>
> >>
> >> [1]
> >>
> https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22
> >>
> >> On Wed, May 27, 2015 at 9:52 PM, William Burns <mudokonman at gmail.com>
> >> wrote:
> >> > Hello everyone,
> >> >
> >> > I wanted to let you know I wrote up a design documenting the successor
> >> > to
> >> > EntryRetriever, Distributed Streams [1] !
> >> >
> >> > Any comments or feedback would be much appreciated.
> >> >
> >> > I especially would like targeted feedback regarding:
> >> >
> >> > 1. The operators forEach and peek may want to be ran locally.  Should
> we
> >> > have an overridden method so users can pick which they want?
> Personally
> >> > I
> >> > feel that peek is too specific to matter and forEach can always be
> done
> >> > by
> >> > the caller locally if desired.
> >> > 2. The intermediate operators limit and skip do not seem worth
> >> > implementing
> >> > unless we have sorting support. (Sorting support will come later).  I
> am
> >> > thinking to not support these until sorting is added.
> >> >
> >> > Thanks,
> >> >
> >> >  - Will
> >> >
> >> > [1]
> >> >
> https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
> >> >
> >> > _______________________________________________
> >> > infinispan-dev mailing list
> >> > infinispan-dev at lists.jboss.org
> >> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >> _______________________________________________
> >> infinispan-dev mailing list
> >> infinispan-dev at lists.jboss.org
> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >
> >
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev at lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20150709/c43dd76d/attachment-0001.html 


More information about the infinispan-dev mailing list