On 07/09/2015 02:49 PM, William Burns wrote:
On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei <dan.berindei(a)gmail.com
<mailto:dan.berindei@gmail.com>> wrote:
Hi Will
After the discussion we started in Galder's PR's comments [1], I
started thinking that we should really have a stream() method directly
in the Cache/AdvancedCache interface.
I feel entrySet(), keySet() or values() encourage users to use
external iteration with a for-each loop or iterator(), and we should
encourage users to use the Stream methods instead. I also really don't
like the idea of users having to close their iterators explicitly, and
lazy Iterators (or Spliterators) need to be properly closed or they
will leak resources.
The iterator and spliterator are automatically closed if it was fully
iterated upon.
I don't think pulling all entries from the cluster (and loader) in for
entrySet, keySet or values is a good idea. Unless you are suggesting
that we only pull local entries only? In which case we have reverted
these changes back to ISPN 6 and older.
The entrySet, keySet and values as of ISPN 7.0 are actually completely
backing collections and methods are evaluated per invocation. This
means any updates to them or the cache it was created from are seen by
each other.
My suggestion, then, is to make entrySet().iterator() and
entrySet().spliterator() eager, so that they don't need to implement
AutoCloseable. I would even go as far as to say that entrySet() should
be eager itself, but maybe keySet().stream() would make a better API
than adding a new keysStream() method.
Just so I understand you are more saying that we leave the entrySet,
keySet and values the way they are so they are backing collections,
however invocation of the iterator or spliterator would pull in all
entries from the entire cache into memory at once? It seems throwing
UnsupportedOperationException with a message stating to use
stream().iterator() and closing the stream would be better imo
(however that would preclude the usage of foreach). Note the foreach
loop is only an issue when iterating over that collection and you
break out of the loop early.
try (Stream<Map.Entry<K, V> stream = entrySet.stream()) {
Iterator<Map.Entry<K, V>> iterator = stream.iterator();
}
Actually I think the issue here is that our CacheCollections don't
currently implement CloseableIterable like the EntryIterable does. In
that case you can do a simple foreach loop with a break in a try with
resource. We could then document that close closes any iterators or
spliterators that were created from this instance of the collection.
It is a little awkward, but could work this way.
try (CacheSet<Map.Entry<K, V>> closeableEntrySet = entrySet) {
for (Map.Entry<K, V> entry : closeableEntrySet) {
}
}
What resources is the iterator actually holding? If that is just memory,
could we do some autoclose magic with phantom references?
Now to your questions:
1)
forEach() doesn't allow the consumer to modify the entries, so I think
the most common use case would be doing something with a captured
variable (or System.out).
This is actually something I didn't cover in the document. But upon
thinking about this more I was thinking we probably want to allow for
CDI Injection of the cache for the consumer action before firing. In
this way the user can change values as they want. This would behave
almost identically to map/reduce, however it is much more
understandable imo.
So I would make forEach execute the consumer
on the originator, and maybe add a distributedForEach method that
executes its consumer on each owner (accepting that the consumer may
be executed twice for some keys, or never, if the originator crashes).
distributedForEach probably makes more sense once you start injecting
the Cache (or other components) in the remote Consumers.
This was my conundrum before, however I believe I found a happy
medium. I figured if we implement it distributed gives more
flexibility. The user can still choose to run it locally as they desire.
For example you can call
*.stream().iterator().forEachRemaining(consumer) if you wanted to do a
forEach locally in a single thread. And if you wanted it parallelized
you can do
StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)
*.stream().distributedForEach(serializable consumer) looks much more
obvious than ^. Despite that it would be documented.
This would all be documented on the forEach method.
peek()'s intended use case is probably logging progress, so it will
definitely need to interact with an external component. However,
executing it to the originator would potentially change the execution
of the stream dramatically, and adding logging shouldn't have that
kind of impact. So peek() should be executed on the remote nodes, even
if we don't have remote injection yet.
This is how I ended up doing it was to have it done remotely.
2)
I would say implement sorting on the originator from the beginning,
and limit() and skip() as well. It's true that users may me
disappointed to see adding limit() doesn't improve the performance of
their sorted() execution, but I would rather have a complete API
available for applications who don't need to sort the entire cache.
This is how I did this as well :) Basically if we find that there is
a sorted, distributed, limit or skip it performs all of the
intermediate operations up that point then uses an iterator to bring
the results back locally where it can be performed. Limit and
distinct are also actually performed remotely first to reduce how many
results are returned. I am not 100% sold on performing distinct
remotely first as it could actually be significantly slower, but it
should hopefully reduce some memory usage :P
Probably not in the first implementation, but sort should be performed
on each node remotely, and then the local node should do just n-way
merge. That way, you can apply skip & limit on remote nodes and then
again on the reduced merged set, dramatically reducing bandwith. Not
sure about NoSQL use cases, but in classical DBs the top-N query is
quite frequent operation, afaik.
My 2c
Radim
Cheers
Dan
[1]
https://github.com/infinispan/infinispan/pull/3571#discussion-diff-340333...
On Wed, May 27, 2015 at 9:52 PM, William Burns
<mudokonman(a)gmail.com <mailto:mudokonman@gmail.com>> wrote:
> Hello everyone,
>
> I wanted to let you know I wrote up a design documenting the
successor to
> EntryRetriever, Distributed Streams [1] !
>
> Any comments or feedback would be much appreciated.
>
> I especially would like targeted feedback regarding:
>
> 1. The operators forEach and peek may want to be ran locally.
Should we
> have an overridden method so users can pick which they want?
Personally I
> feel that peek is too specific to matter and forEach can always
be done by
> the caller locally if desired.
> 2. The intermediate operators limit and skip do not seem worth
implementing
> unless we have sorting support. (Sorting support will come
later). I am
> thinking to not support these until sorting is added.
>
> Thanks,
>
> - Will
>
> [1]
https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev(a)lists.jboss.org
<mailto:infinispan-dev@lists.jboss.org>
>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org <mailto:infinispan-dev@lists.jboss.org>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev
--
Radim Vansa <rvansa(a)redhat.com>
JBoss Performance Team