[infinispan-dev] Distributed Streams

Dan Berindei dan.berindei at gmail.com
Thu Jul 9 05:10:59 EDT 2015


Hi Will

After the discussion we started in Galder's PR's comments [1], I
started thinking that we should really have a stream() method directly
in the Cache/AdvancedCache interface.

I feel entrySet(), keySet() or values() encourage users to use
external iteration with a for-each loop or iterator(), and we should
encourage users to use the Stream methods instead. I also really don't
like the idea of users having to close their iterators explicitly, and
lazy Iterators (or Spliterators) need to be properly closed or they
will leak resources.

My suggestion, then, is to make entrySet().iterator() and
entrySet().spliterator() eager, so that they don't need to implement
AutoCloseable. I would even go as far as to say that entrySet() should
be eager itself, but maybe keySet().stream() would make a better API
than adding a new keysStream() method.


Now to your questions:

1)
forEach() doesn't allow the consumer to modify the entries, so I think
the most common use case would be doing something with a captured
variable (or System.out). So I would make forEach execute the consumer
on the originator, and maybe add a distributedForEach method that
executes its consumer on each owner (accepting that the consumer may
be executed twice for some keys, or never, if the originator crashes).
distributedForEach probably makes more sense once you start injecting
the Cache (or other components) in the remote Consumers.

peek()'s intended use case is probably logging progress, so it will
definitely need to interact with an external component. However,
executing it to the originator would potentially change the execution
of the stream dramatically, and adding logging shouldn't have that
kind of impact. So peek() should be executed on the remote nodes, even
if we don't have remote injection yet.

2)
I would say implement sorting on the originator from the beginning,
and limit() and skip() as well. It's true that users may me
disappointed to see adding limit() doesn't improve the performance of
their sorted() execution, but I would rather have a complete API
available for applications who don't need to sort the entire cache.

Cheers
Dan


[1] https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22

On Wed, May 27, 2015 at 9:52 PM, William Burns <mudokonman at gmail.com> wrote:
> Hello everyone,
>
> I wanted to let you know I wrote up a design documenting the successor to
> EntryRetriever, Distributed Streams [1] !
>
> Any comments or feedback would be much appreciated.
>
> I especially would like targeted feedback regarding:
>
> 1. The operators forEach and peek may want to be ran locally.  Should we
> have an overridden method so users can pick which they want?  Personally I
> feel that peek is too specific to matter and forEach can always be done by
> the caller locally if desired.
> 2. The intermediate operators limit and skip do not seem worth implementing
> unless we have sorting support. (Sorting support will come later).  I am
> thinking to not support these until sorting is added.
>
> Thanks,
>
>  - Will
>
> [1] https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


More information about the infinispan-dev mailing list