[infinispan-dev] Distributed Streams

Radim Vansa rvansa at redhat.com
Thu Jul 9 11:40:37 EDT 2015


On 07/09/2015 05:33 PM, Radim Vansa wrote:
> On 07/09/2015 02:49 PM, William Burns wrote:
>>
>> On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berindei at gmail.com
>> <mailto:dan.berindei at gmail.com>> wrote:
>>
>>      Hi Will
>>
>>      After the discussion we started in Galder's PR's comments [1], I
>>      started thinking that we should really have a stream() method directly
>>      in the Cache/AdvancedCache interface.
>>
>>      I feel entrySet(), keySet() or values() encourage users to use
>>      external iteration with a for-each loop or iterator(), and we should
>>      encourage users to use the Stream methods instead. I also really don't
>>      like the idea of users having to close their iterators explicitly, and
>>      lazy Iterators (or Spliterators) need to be properly closed or they
>>      will leak resources.
>>
>>
>> The iterator and spliterator are automatically closed if it was fully
>> iterated upon.
>>
>> I don't think pulling all entries from the cluster (and loader) in for
>> entrySet, keySet or values is a good idea. Unless you are suggesting
>> that we only pull local entries only?  In which case we have reverted
>> these changes back to ISPN 6 and older.
>>
>> The entrySet, keySet and values as of ISPN 7.0 are actually completely
>> backing collections and methods are evaluated per invocation.  This
>> means any updates to them or the cache it was created from are seen by
>> each other.
>>
>>
>>      My suggestion, then, is to make entrySet().iterator() and
>>      entrySet().spliterator() eager, so that they don't need to implement
>>      AutoCloseable. I would even go as far as to say that entrySet() should
>>      be eager itself, but maybe keySet().stream() would make a better API
>>      than adding a new keysStream() method.
>>
>>
>> Just so I understand you are more saying that we leave the entrySet,
>> keySet and values the way they are so they are backing collections,
>> however invocation of the iterator or spliterator would pull in all
>> entries from the entire cache into memory at once?  It seems throwing
>> UnsupportedOperationException with a message stating to use
>> stream().iterator() and closing the stream would be better imo
>> (however that would preclude the usage of foreach). Note the foreach
>> loop is only an issue when iterating over that collection and you
>> break out of the loop early.
>>
>> try (Stream<Map.Entry<K, V> stream = entrySet.stream()) {
>>     Iterator<Map.Entry<K, V>> iterator = stream.iterator();
>> }
>>
>> Actually I think the issue here is that our CacheCollections don't
>> currently implement CloseableIterable like the EntryIterable does.  In
>> that case you can do a simple foreach loop with a break in a try with
>> resource.  We could then document that close closes any iterators or
>> spliterators that were created from this instance of the collection.
>>
>> It is a little awkward, but could work this way.
>>
>> try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) {
>>     for (Map.Entry<K, V> entry : closeableEntrySet) {
>>     }
>> }
> What resources is the iterator actually holding? If that is just memory,
> could we do some autoclose magic with phantom references?
>
>>
>>      Now to your questions:
>>
>>      1)
>>      forEach() doesn't allow the consumer to modify the entries, so I think
>>      the most common use case would be doing something with a captured
>>      variable (or System.out).
>>
>>
>> This is actually something I didn't cover in the document.  But upon
>> thinking about this more I was thinking we probably want to allow for
>> CDI Injection of the cache for the consumer action before firing.  In
>> this way the user can change values as they want.  This would behave
>> almost identically to map/reduce, however it is much more
>> understandable imo.
>>
>>      So I would make forEach execute the consumer
>>      on the originator, and maybe add a distributedForEach method that
>>      executes its consumer on each owner (accepting that the consumer may
>>      be executed twice for some keys, or never, if the originator crashes).
>>      distributedForEach probably makes more sense once you start injecting
>>      the Cache (or other components) in the remote Consumers.
>>
>>
>> This was my conundrum before, however I believe I found a happy
>> medium.  I figured if we implement it distributed gives more
>> flexibility.  The user can still choose to run it locally as they desire.
>>
>> For example you can call
>> *.stream().iterator().forEachRemaining(consumer) if you wanted to do a
>> forEach locally in a single thread.  And if you wanted it parallelized
>> you can do
>> StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)
> *.stream().distributedForEach(serializable consumer) looks much more
> obvious than ^. Despite that it would be documented.
>
>
>> This would all be documented on the forEach method.
>>
>>
>>      peek()'s intended use case is probably logging progress, so it will
>>      definitely need to interact with an external component. However,
>>      executing it to the originator would potentially change the execution
>>      of the stream dramatically, and adding logging shouldn't have that
>>      kind of impact. So peek() should be executed on the remote nodes, even
>>      if we don't have remote injection yet.
>>
>>
>> This is how I ended up doing it was to have it done remotely.
>>
>>
>>      2)
>>      I would say implement sorting on the originator from the beginning,
>>      and limit() and skip() as well. It's true that users may me
>>      disappointed to see adding limit() doesn't improve the performance of
>>      their sorted() execution, but I would rather have a complete API
>>      available for applications who don't need to sort the entire cache.
>>
>>
>> This is how I did this as well :)  Basically if we find that there is
>> a sorted, distributed, limit or skip it performs all of the
>> intermediate operations up that point then uses an iterator to bring
>> the results back locally where it can be performed.  Limit and
>> distinct are also actually performed remotely first to reduce how many
>> results are returned.  I am not 100% sold on performing distinct
>> remotely first as it could actually be significantly slower, but it
>> should hopefully reduce some memory usage :P
> Probably not in the first implementation, but sort should be performed
> on each node remotely, and then the local node should do just n-way
> merge. That way, you can apply skip & limit on remote nodes and then
> again on the reduced merged set, dramatically reducing bandwith. Not
> sure about NoSQL use cases, but in classical DBs the top-N query is
> quite frequent operation, afaik.

Err, only limit can be applied remotely... For remote application of 
skip, we would need multi-phase arbitration of the n-th element using 
some pivot points... just limit, just limit :)

Radim

>
> My 2c
>
> Radim
>
>>      Cheers
>>      Dan
>>
>>
>>      [1]
>>      https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22
>>
>>      On Wed, May 27, 2015 at 9:52 PM, William Burns
>>      <mudokonman at gmail.com <mailto:mudokonman at gmail.com>> wrote:
>>      > Hello everyone,
>>      >
>>      > I wanted to let you know I wrote up a design documenting the
>>      successor to
>>      > EntryRetriever, Distributed Streams [1] !
>>      >
>>      > Any comments or feedback would be much appreciated.
>>      >
>>      > I especially would like targeted feedback regarding:
>>      >
>>      > 1. The operators forEach and peek may want to be ran locally.
>>      Should we
>>      > have an overridden method so users can pick which they want?
>>      Personally I
>>      > feel that peek is too specific to matter and forEach can always
>>      be done by
>>      > the caller locally if desired.
>>      > 2. The intermediate operators limit and skip do not seem worth
>>      implementing
>>      > unless we have sorting support. (Sorting support will come
>>      later).  I am
>>      > thinking to not support these until sorting is added.
>>      >
>>      > Thanks,
>>      >
>>      >  - Will
>>      >
>>      > [1]
>>      https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
>>      >
>>      > _______________________________________________
>>      > infinispan-dev mailing list
>>      > infinispan-dev at lists.jboss.org
>>      <mailto:infinispan-dev at lists.jboss.org>
>>      > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>      _______________________________________________
>>      infinispan-dev mailing list
>>      infinispan-dev at lists.jboss.org <mailto:infinispan-dev at lists.jboss.org>
>>      https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>


-- 
Radim Vansa <rvansa at redhat.com>
JBoss Performance Team



More information about the infinispan-dev mailing list