[infinispan-dev] Distributed Streams

Dan Berindei dan.berindei at gmail.com
Thu Jul 9 12:27:56 EDT 2015


On Thu, Jul 9, 2015 at 3:49 PM, William Burns <mudokonman at gmail.com> wrote:
>
>
> On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berindei at gmail.com> wrote:
>>
>> Hi Will
>>
>> After the discussion we started in Galder's PR's comments [1], I
>> started thinking that we should really have a stream() method directly
>> in the Cache/AdvancedCache interface.
>>
>> I feel entrySet(), keySet() or values() encourage users to use
>> external iteration with a for-each loop or iterator(), and we should
>> encourage users to use the Stream methods instead. I also really don't
>> like the idea of users having to close their iterators explicitly, and
>> lazy Iterators (or Spliterators) need to be properly closed or they
>> will leak resources.
>
>
> The iterator and spliterator are automatically closed if it was fully
> iterated upon.
>
> I don't think pulling all entries from the cluster (and loader) in for
> entrySet, keySet or values is a good idea.  Unless you are suggesting that
> we only pull local entries only?  In which case we have reverted these
> changes back to ISPN 6 and older.
>
> The entrySet, keySet and values as of ISPN 7.0 are actually completely
> backing collections and methods are evaluated per invocation.  This means
> any updates to them or the cache it was created from are seen by each other.
>

And the iterators are already AutoCloseable, I know. But with the
streams API we can hide resource management from the user, so I was
hoping we could avoid using AutoCloseable altogether.

>>
>>
>> My suggestion, then, is to make entrySet().iterator() and
>> entrySet().spliterator() eager, so that they don't need to implement
>> AutoCloseable. I would even go as far as to say that entrySet() should
>> be eager itself, but maybe keySet().stream() would make a better API
>> than adding a new keysStream() method.
>
>
> Just so I understand you are more saying that we leave the entrySet, keySet
> and values the way they are so they are backing collections, however
> invocation of the iterator or spliterator would pull in all entries from the
> entire cache into memory at once?  It seems throwing

Yes

> UnsupportedOperationException with a message stating to use
> stream().iterator() and closing the stream would be better imo (however that
> would preclude the usage of foreach).  Note the foreach loop is only an
> issue when iterating over that collection and you break out of the loop
> early.
>
> try (Stream<Map.Entry<K, V> stream = entrySet.stream()) {
>    Iterator<Map.Entry<K, V>> iterator = stream.iterator();
> }
>
> Actually I think the issue here is that our CacheCollections don't currently
> implement CloseableIterable like the EntryIterable does.  In that case you
> can do a simple foreach loop with a break in a try with resource.  We could
> then document that close closes any iterators or spliterators that were
> created from this instance of the collection.
>
> It is a little awkward, but could work this way.
>
> try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) {
>    for (Map.Entry<K, V> entry : closeableEntrySet) {
>    }
> }

On the other hand, you wouldn't need any try-with-resources if you
used the forEach method, because the resources are both acquired and
released in the forEach call:

entrySet.stream().forEach((k, v) -> { ... })

But if you make stream() or entrySet() return an AutoCloseable, then
users will put a try-with-resources without it anyway, just in case.
So I'd rather keep the iterator AutoCloseable than the stream/entry
set.

Making the EntryIterable implement AutoCloseable made sense in Java 7,
because the only thing you could do with it was iterate on it, maybe
with for-each. But in Java 8 Iterable also has a forEach method, and I
wouldn't want users of forEach() to think about whether they need a
try block or not.

>
>>
>>
>>
>> Now to your questions:
>>
>> 1)
>> forEach() doesn't allow the consumer to modify the entries, so I think
>> the most common use case would be doing something with a captured
>> variable (or System.out).
>
>
> This is actually something I didn't cover in the document.  But upon
> thinking about this more I was thinking we probably want to allow for CDI
> Injection of the cache for the consumer action before firing.  In this way
> the user can change values as they want.  This would behave almost
> identically to map/reduce, however it is much more understandable imo.
>

You mean like map/reduce when it is configured to store its results in
an intermediate cache? I think you'd also need a
CacheStream.reduceOnOwner() operation to put in the pipeline before
forEach(), but yeah, it sounds like it should work.

>>
>> So I would make forEach execute the consumer
>> on the originator, and maybe add a distributedForEach method that
>> executes its consumer on each owner (accepting that the consumer may
>> be executed twice for some keys, or never, if the originator crashes).
>> distributedForEach probably makes more sense once you start injecting
>> the Cache (or other components) in the remote Consumers.
>
>
> This was my conundrum before, however I believe I found a happy medium.  I
> figured if we implement it distributed gives more flexibility.  The user can
> still choose to run it locally as they desire.
>
> For example you can call *.stream().iterator().forEachRemaining(consumer) if
> you wanted to do a forEach locally in a single thread.  And if you wanted it
> parallelized you can do
> StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)

Except now you have to access the spliterator directly... Will this
work, or will the user need try-with-resources? Do we want users to
think about it?

StreamSupport.stream(*.stream().spliterator(), true).limit(10).forEach(consumer)

>
> This would all be documented on the forEach method.
>
>>
>>
>> peek()'s intended use case is probably logging progress, so it will
>> definitely need to interact with an external component. However,
>> executing it to the originator would potentially change the execution
>> of the stream dramatically, and adding logging shouldn't have that
>> kind of impact. So peek() should be executed on the remote nodes, even
>> if we don't have remote injection yet.
>
>
> This is how I ended up doing it was to have it done remotely.
>
>>
>>
>> 2)
>> I would say implement sorting on the originator from the beginning,
>> and limit() and skip() as well. It's true that users may me
>> disappointed to see adding limit() doesn't improve the performance of
>> their sorted() execution, but I would rather have a complete API
>> available for applications who don't need to sort the entire cache.
>
>
> This is how I did this as well :)  Basically if we find that there is a
> sorted, distributed, limit or skip it performs all of the intermediate
> operations up that point then uses an iterator to bring the results back
> locally where it can be performed.  Limit and distinct are also actually
> performed remotely first to reduce how many results are returned.  I am not
> 100% sold on performing distinct remotely first as it could actually be
> significantly slower, but it should hopefully reduce some memory usage :P
>

Shouldn't running distinct remotely actually require *more* memory,
because now you have to keep track of unique results on each node?
There are definitely scenarios where running it remotely will save a
lot in network traffic, though.

>>
>>
>> Cheers
>> Dan
>>
>>
>> [1]
>> https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22
>>
>> On Wed, May 27, 2015 at 9:52 PM, William Burns <mudokonman at gmail.com>
>> wrote:
>> > Hello everyone,
>> >
>> > I wanted to let you know I wrote up a design documenting the successor
>> > to
>> > EntryRetriever, Distributed Streams [1] !
>> >
>> > Any comments or feedback would be much appreciated.
>> >
>> > I especially would like targeted feedback regarding:
>> >
>> > 1. The operators forEach and peek may want to be ran locally.  Should we
>> > have an overridden method so users can pick which they want?  Personally
>> > I
>> > feel that peek is too specific to matter and forEach can always be done
>> > by
>> > the caller locally if desired.
>> > 2. The intermediate operators limit and skip do not seem worth
>> > implementing
>> > unless we have sorting support. (Sorting support will come later).  I am
>> > thinking to not support these until sorting is added.
>> >
>> > Thanks,
>> >
>> >  - Will
>> >
>> > [1]
>> > https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
>> >
>> > _______________________________________________
>> > infinispan-dev mailing list
>> > infinispan-dev at lists.jboss.org
>> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


More information about the infinispan-dev mailing list