On Thu, Jul 9, 2015 at 3:49 PM, William Burns <mudokonman(a)gmail.com> wrote:
On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berindei(a)gmail.com> wrote:
>
> Hi Will
>
> After the discussion we started in Galder's PR's comments [1], I
> started thinking that we should really have a stream() method directly
> in the Cache/AdvancedCache interface.
>
> I feel entrySet(), keySet() or values() encourage users to use
> external iteration with a for-each loop or iterator(), and we should
> encourage users to use the Stream methods instead. I also really don't
> like the idea of users having to close their iterators explicitly, and
> lazy Iterators (or Spliterators) need to be properly closed or they
> will leak resources.
The iterator and spliterator are automatically closed if it was fully
iterated upon.
I don't think pulling all entries from the cluster (and loader) in for
entrySet, keySet or values is a good idea. Unless you are suggesting that
we only pull local entries only? In which case we have reverted these
changes back to ISPN 6 and older.
The entrySet, keySet and values as of ISPN 7.0 are actually completely
backing collections and methods are evaluated per invocation. This means
any updates to them or the cache it was created from are seen by each other.
And the iterators are already AutoCloseable, I know. But with the
streams API we can hide resource management from the user, so I was
hoping we could avoid using AutoCloseable altogether.
>
>
> My suggestion, then, is to make entrySet().iterator() and
> entrySet().spliterator() eager, so that they don't need to implement
> AutoCloseable. I would even go as far as to say that entrySet() should
> be eager itself, but maybe keySet().stream() would make a better API
> than adding a new keysStream() method.
Just so I understand you are more saying that we leave the entrySet, keySet
and values the way they are so they are backing collections, however
invocation of the iterator or spliterator would pull in all entries from the
entire cache into memory at once? It seems throwing
Yes
UnsupportedOperationException with a message stating to use
stream().iterator() and closing the stream would be better imo (however that
would preclude the usage of foreach). Note the foreach loop is only an
issue when iterating over that collection and you break out of the loop
early.
try (Stream<Map.Entry<K, V> stream = entrySet.stream()) {
Iterator<Map.Entry<K, V>> iterator = stream.iterator();
}
Actually I think the issue here is that our CacheCollections don't currently
implement CloseableIterable like the EntryIterable does. In that case you
can do a simple foreach loop with a break in a try with resource. We could
then document that close closes any iterators or spliterators that were
created from this instance of the collection.
It is a little awkward, but could work this way.
try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) {
for (Map.Entry<K, V> entry : closeableEntrySet) {
}
}
On the other hand, you wouldn't need any try-with-resources if you
used the forEach method, because the resources are both acquired and
released in the forEach call:
entrySet.stream().forEach((k, v) -> { ... })
But if you make stream() or entrySet() return an AutoCloseable, then
users will put a try-with-resources without it anyway, just in case.
So I'd rather keep the iterator AutoCloseable than the stream/entry
set.
Making the EntryIterable implement AutoCloseable made sense in Java 7,
because the only thing you could do with it was iterate on it, maybe
with for-each. But in Java 8 Iterable also has a forEach method, and I
wouldn't want users of forEach() to think about whether they need a
try block or not.
>
>
>
> Now to your questions:
>
> 1)
> forEach() doesn't allow the consumer to modify the entries, so I think
> the most common use case would be doing something with a captured
> variable (or System.out).
This is actually something I didn't cover in the document. But upon
thinking about this more I was thinking we probably want to allow for CDI
Injection of the cache for the consumer action before firing. In this way
the user can change values as they want. This would behave almost
identically to map/reduce, however it is much more understandable imo.
You mean like map/reduce when it is configured to store its results in
an intermediate cache? I think you'd also need a
CacheStream.reduceOnOwner() operation to put in the pipeline before
forEach(), but yeah, it sounds like it should work.
>
> So I would make forEach execute the consumer
> on the originator, and maybe add a distributedForEach method that
> executes its consumer on each owner (accepting that the consumer may
> be executed twice for some keys, or never, if the originator crashes).
> distributedForEach probably makes more sense once you start injecting
> the Cache (or other components) in the remote Consumers.
This was my conundrum before, however I believe I found a happy medium. I
figured if we implement it distributed gives more flexibility. The user can
still choose to run it locally as they desire.
For example you can call *.stream().iterator().forEachRemaining(consumer) if
you wanted to do a forEach locally in a single thread. And if you wanted it
parallelized you can do
StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)
Except now you have to access the spliterator directly... Will this
work, or will the user need try-with-resources? Do we want users to
think about it?
StreamSupport.stream(*.stream().spliterator(), true).limit(10).forEach(consumer)
This would all be documented on the forEach method.
>
>
> peek()'s intended use case is probably logging progress, so it will
> definitely need to interact with an external component. However,
> executing it to the originator would potentially change the execution
> of the stream dramatically, and adding logging shouldn't have that
> kind of impact. So peek() should be executed on the remote nodes, even
> if we don't have remote injection yet.
This is how I ended up doing it was to have it done remotely.
>
>
> 2)
> I would say implement sorting on the originator from the beginning,
> and limit() and skip() as well. It's true that users may me
> disappointed to see adding limit() doesn't improve the performance of
> their sorted() execution, but I would rather have a complete API
> available for applications who don't need to sort the entire cache.
This is how I did this as well :) Basically if we find that there is a
sorted, distributed, limit or skip it performs all of the intermediate
operations up that point then uses an iterator to bring the results back
locally where it can be performed. Limit and distinct are also actually
performed remotely first to reduce how many results are returned. I am not
100% sold on performing distinct remotely first as it could actually be
significantly slower, but it should hopefully reduce some memory usage :P
Shouldn't running distinct remotely actually require *more* memory,
because now you have to keep track of unique results on each node?
There are definitely scenarios where running it remotely will save a
lot in network traffic, though.
>
>
> Cheers
> Dan
>
>
> [1]
>
https://github.com/infinispan/infinispan/pull/3571#discussion-diff-340333...
>
> On Wed, May 27, 2015 at 9:52 PM, William Burns <mudokonman(a)gmail.com>
> wrote:
> > Hello everyone,
> >
> > I wanted to let you know I wrote up a design documenting the successor
> > to
> > EntryRetriever, Distributed Streams [1] !
> >
> > Any comments or feedback would be much appreciated.
> >
> > I especially would like targeted feedback regarding:
> >
> > 1. The operators forEach and peek may want to be ran locally. Should we
> > have an overridden method so users can pick which they want? Personally
> > I
> > feel that peek is too specific to matter and forEach can always be done
> > by
> > the caller locally if desired.
> > 2. The intermediate operators limit and skip do not seem worth
> > implementing
> > unless we have sorting support. (Sorting support will come later). I am
> > thinking to not support these until sorting is added.
> >
> > Thanks,
> >
> > - Will
> >
> > [1]
> >
https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
> >
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev(a)lists.jboss.org
> >
https://lists.jboss.org/mailman/listinfo/infinispan-dev
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev(a)lists.jboss.org
>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev