[infinispan-dev] Distributed Streams

William Burns mudokonman at gmail.com
Thu Jul 9 08:49:28 EDT 2015


On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berindei at gmail.com> wrote:

> Hi Will
>
> After the discussion we started in Galder's PR's comments [1], I
> started thinking that we should really have a stream() method directly
> in the Cache/AdvancedCache interface.
>
> I feel entrySet(), keySet() or values() encourage users to use
> external iteration with a for-each loop or iterator(), and we should
> encourage users to use the Stream methods instead. I also really don't
> like the idea of users having to close their iterators explicitly, and
> lazy Iterators (or Spliterators) need to be properly closed or they
> will leak resources.
>

The iterator and spliterator are automatically closed if it was fully
iterated upon.

I don't think pulling all entries from the cluster (and loader) in for
entrySet, keySet or values is a good idea.  Unless you are suggesting that
we only pull local entries only?  In which case we have reverted these
changes back to ISPN 6 and older.

The entrySet, keySet and values as of ISPN 7.0 are actually completely
backing collections and methods are evaluated per invocation.  This means
any updates to them or the cache it was created from are seen by each other.


>
> My suggestion, then, is to make entrySet().iterator() and
> entrySet().spliterator() eager, so that they don't need to implement
> AutoCloseable. I would even go as far as to say that entrySet() should
> be eager itself, but maybe keySet().stream() would make a better API
> than adding a new keysStream() method.
>

Just so I understand you are more saying that we leave the entrySet, keySet
and values the way they are so they are backing collections, however
invocation of the iterator or spliterator would pull in all entries from
the entire cache into memory at once?  It seems throwing
UnsupportedOperationException with a message stating to use
stream().iterator() and closing the stream would be better imo (however
that would preclude the usage of foreach).  Note the foreach loop is only
an issue when iterating over that collection and you break out of the loop
early.

try (Stream<Map.Entry<K, V> stream = entrySet.stream()) {
   Iterator<Map.Entry<K, V>> iterator = stream.iterator();
}

Actually I think the issue here is that our CacheCollections don't
currently implement CloseableIterable like the EntryIterable does.  In that
case you can do a simple foreach loop with a break in a try with resource.
We could then document that close closes any iterators or spliterators that
were created from this instance of the collection.

It is a little awkward, but could work this way.

try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) {
   for (Map.Entry<K, V> entry : closeableEntrySet) {
   }
}


>
>
> Now to your questions:
>
> 1)
> forEach() doesn't allow the consumer to modify the entries, so I think
> the most common use case would be doing something with a captured
> variable (or System.out).


This is actually something I didn't cover in the document.  But upon
thinking about this more I was thinking we probably want to allow for CDI
Injection of the cache for the consumer action before firing.  In this way
the user can change values as they want.  This would behave almost
identically to map/reduce, however it is much more understandable imo.


> So I would make forEach execute the consumer
> on the originator, and maybe add a distributedForEach method that
> executes its consumer on each owner (accepting that the consumer may
> be executed twice for some keys, or never, if the originator crashes).
> distributedForEach probably makes more sense once you start injecting
> the Cache (or other components) in the remote Consumers.
>

This was my conundrum before, however I believe I found a happy medium.  I
figured if we implement it distributed gives more flexibility.  The user
can still choose to run it locally as they desire.

For example you can call *.stream().iterator().forEachRemaining(consumer)
if you wanted to do a forEach locally in a single thread.  And if you
wanted it parallelized you can do
StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)

This would all be documented on the forEach method.


>
> peek()'s intended use case is probably logging progress, so it will
> definitely need to interact with an external component. However,
> executing it to the originator would potentially change the execution
> of the stream dramatically, and adding logging shouldn't have that
> kind of impact. So peek() should be executed on the remote nodes, even
> if we don't have remote injection yet.
>

This is how I ended up doing it was to have it done remotely.


>
> 2)
> I would say implement sorting on the originator from the beginning,
> and limit() and skip() as well. It's true that users may me
> disappointed to see adding limit() doesn't improve the performance of
> their sorted() execution, but I would rather have a complete API
> available for applications who don't need to sort the entire cache.
>

This is how I did this as well :)  Basically if we find that there is a
sorted, distributed, limit or skip it performs all of the intermediate
operations up that point then uses an iterator to bring the results back
locally where it can be performed.  Limit and distinct are also actually
performed remotely first to reduce how many results are returned.  I am not
100% sold on performing distinct remotely first as it could actually be
significantly slower, but it should hopefully reduce some memory usage :P


>
> Cheers
> Dan
>
>
> [1]
> https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22
>
> On Wed, May 27, 2015 at 9:52 PM, William Burns <mudokonman at gmail.com>
> wrote:
> > Hello everyone,
> >
> > I wanted to let you know I wrote up a design documenting the successor to
> > EntryRetriever, Distributed Streams [1] !
> >
> > Any comments or feedback would be much appreciated.
> >
> > I especially would like targeted feedback regarding:
> >
> > 1. The operators forEach and peek may want to be ran locally.  Should we
> > have an overridden method so users can pick which they want?  Personally
> I
> > feel that peek is too specific to matter and forEach can always be done
> by
> > the caller locally if desired.
> > 2. The intermediate operators limit and skip do not seem worth
> implementing
> > unless we have sorting support. (Sorting support will come later).  I am
> > thinking to not support these until sorting is added.
> >
> > Thanks,
> >
> >  - Will
> >
> > [1]
> https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
> >
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev at lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20150709/374155c2/attachment-0001.html 


More information about the infinispan-dev mailing list