[infinispan-dev] Distributed Streams
Radim Vansa
rvansa at redhat.com
Thu Jul 9 11:40:37 EDT 2015
On 07/09/2015 05:33 PM, Radim Vansa wrote:
> On 07/09/2015 02:49 PM, William Burns wrote:
>>
>> On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berindei at gmail.com
>> <mailto:dan.berindei at gmail.com>> wrote:
>>
>> Hi Will
>>
>> After the discussion we started in Galder's PR's comments [1], I
>> started thinking that we should really have a stream() method directly
>> in the Cache/AdvancedCache interface.
>>
>> I feel entrySet(), keySet() or values() encourage users to use
>> external iteration with a for-each loop or iterator(), and we should
>> encourage users to use the Stream methods instead. I also really don't
>> like the idea of users having to close their iterators explicitly, and
>> lazy Iterators (or Spliterators) need to be properly closed or they
>> will leak resources.
>>
>>
>> The iterator and spliterator are automatically closed if it was fully
>> iterated upon.
>>
>> I don't think pulling all entries from the cluster (and loader) in for
>> entrySet, keySet or values is a good idea. Unless you are suggesting
>> that we only pull local entries only? In which case we have reverted
>> these changes back to ISPN 6 and older.
>>
>> The entrySet, keySet and values as of ISPN 7.0 are actually completely
>> backing collections and methods are evaluated per invocation. This
>> means any updates to them or the cache it was created from are seen by
>> each other.
>>
>>
>> My suggestion, then, is to make entrySet().iterator() and
>> entrySet().spliterator() eager, so that they don't need to implement
>> AutoCloseable. I would even go as far as to say that entrySet() should
>> be eager itself, but maybe keySet().stream() would make a better API
>> than adding a new keysStream() method.
>>
>>
>> Just so I understand you are more saying that we leave the entrySet,
>> keySet and values the way they are so they are backing collections,
>> however invocation of the iterator or spliterator would pull in all
>> entries from the entire cache into memory at once? It seems throwing
>> UnsupportedOperationException with a message stating to use
>> stream().iterator() and closing the stream would be better imo
>> (however that would preclude the usage of foreach). Note the foreach
>> loop is only an issue when iterating over that collection and you
>> break out of the loop early.
>>
>> try (Stream<Map.Entry<K, V> stream = entrySet.stream()) {
>> Iterator<Map.Entry<K, V>> iterator = stream.iterator();
>> }
>>
>> Actually I think the issue here is that our CacheCollections don't
>> currently implement CloseableIterable like the EntryIterable does. In
>> that case you can do a simple foreach loop with a break in a try with
>> resource. We could then document that close closes any iterators or
>> spliterators that were created from this instance of the collection.
>>
>> It is a little awkward, but could work this way.
>>
>> try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) {
>> for (Map.Entry<K, V> entry : closeableEntrySet) {
>> }
>> }
> What resources is the iterator actually holding? If that is just memory,
> could we do some autoclose magic with phantom references?
>
>>
>> Now to your questions:
>>
>> 1)
>> forEach() doesn't allow the consumer to modify the entries, so I think
>> the most common use case would be doing something with a captured
>> variable (or System.out).
>>
>>
>> This is actually something I didn't cover in the document. But upon
>> thinking about this more I was thinking we probably want to allow for
>> CDI Injection of the cache for the consumer action before firing. In
>> this way the user can change values as they want. This would behave
>> almost identically to map/reduce, however it is much more
>> understandable imo.
>>
>> So I would make forEach execute the consumer
>> on the originator, and maybe add a distributedForEach method that
>> executes its consumer on each owner (accepting that the consumer may
>> be executed twice for some keys, or never, if the originator crashes).
>> distributedForEach probably makes more sense once you start injecting
>> the Cache (or other components) in the remote Consumers.
>>
>>
>> This was my conundrum before, however I believe I found a happy
>> medium. I figured if we implement it distributed gives more
>> flexibility. The user can still choose to run it locally as they desire.
>>
>> For example you can call
>> *.stream().iterator().forEachRemaining(consumer) if you wanted to do a
>> forEach locally in a single thread. And if you wanted it parallelized
>> you can do
>> StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)
> *.stream().distributedForEach(serializable consumer) looks much more
> obvious than ^. Despite that it would be documented.
>
>
>> This would all be documented on the forEach method.
>>
>>
>> peek()'s intended use case is probably logging progress, so it will
>> definitely need to interact with an external component. However,
>> executing it to the originator would potentially change the execution
>> of the stream dramatically, and adding logging shouldn't have that
>> kind of impact. So peek() should be executed on the remote nodes, even
>> if we don't have remote injection yet.
>>
>>
>> This is how I ended up doing it was to have it done remotely.
>>
>>
>> 2)
>> I would say implement sorting on the originator from the beginning,
>> and limit() and skip() as well. It's true that users may me
>> disappointed to see adding limit() doesn't improve the performance of
>> their sorted() execution, but I would rather have a complete API
>> available for applications who don't need to sort the entire cache.
>>
>>
>> This is how I did this as well :) Basically if we find that there is
>> a sorted, distributed, limit or skip it performs all of the
>> intermediate operations up that point then uses an iterator to bring
>> the results back locally where it can be performed. Limit and
>> distinct are also actually performed remotely first to reduce how many
>> results are returned. I am not 100% sold on performing distinct
>> remotely first as it could actually be significantly slower, but it
>> should hopefully reduce some memory usage :P
> Probably not in the first implementation, but sort should be performed
> on each node remotely, and then the local node should do just n-way
> merge. That way, you can apply skip & limit on remote nodes and then
> again on the reduced merged set, dramatically reducing bandwith. Not
> sure about NoSQL use cases, but in classical DBs the top-N query is
> quite frequent operation, afaik.
Err, only limit can be applied remotely... For remote application of
skip, we would need multi-phase arbitration of the n-th element using
some pivot points... just limit, just limit :)
Radim
>
> My 2c
>
> Radim
>
>> Cheers
>> Dan
>>
>>
>> [1]
>> https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22
>>
>> On Wed, May 27, 2015 at 9:52 PM, William Burns
>> <mudokonman at gmail.com <mailto:mudokonman at gmail.com>> wrote:
>> > Hello everyone,
>> >
>> > I wanted to let you know I wrote up a design documenting the
>> successor to
>> > EntryRetriever, Distributed Streams [1] !
>> >
>> > Any comments or feedback would be much appreciated.
>> >
>> > I especially would like targeted feedback regarding:
>> >
>> > 1. The operators forEach and peek may want to be ran locally.
>> Should we
>> > have an overridden method so users can pick which they want?
>> Personally I
>> > feel that peek is too specific to matter and forEach can always
>> be done by
>> > the caller locally if desired.
>> > 2. The intermediate operators limit and skip do not seem worth
>> implementing
>> > unless we have sorting support. (Sorting support will come
>> later). I am
>> > thinking to not support these until sorting is added.
>> >
>> > Thanks,
>> >
>> > - Will
>> >
>> > [1]
>> https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
>> >
>> > _______________________________________________
>> > infinispan-dev mailing list
>> > infinispan-dev at lists.jboss.org
>> <mailto:infinispan-dev at lists.jboss.org>
>> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org <mailto:infinispan-dev at lists.jboss.org>
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
--
Radim Vansa <rvansa at redhat.com>
JBoss Performance Team
More information about the infinispan-dev
mailing list