<div dir="ltr"><br><br><div class="gmail_quote"><div dir="ltr">On Thu, Jul 9, 2015 at 5:11 AM Dan Berindei &lt;<a href="mailto:dan.berindei@gmail.com">dan.berindei@gmail.com</a>&gt; wrote:<br></div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">Hi Will<br>
<br>
After the discussion we started in Galder&#39;s PR&#39;s comments [1], I<br>
started thinking that we should really have a stream() method directly<br>
in the Cache/AdvancedCache interface.<br>
<br>
I feel entrySet(), keySet() or values() encourage users to use<br>
external iteration with a for-each loop or iterator(), and we should<br>
encourage users to use the Stream methods instead. I also really don&#39;t<br>
like the idea of users having to close their iterators explicitly, and<br>
lazy Iterators (or Spliterators) need to be properly closed or they<br>
will leak resources.<br></blockquote><div><br></div><div>The iterator and spliterator are automatically closed if it was fully iterated upon.</div><div><br></div><div>I don&#39;t think pulling all entries from the cluster (and loader) in for entrySet, keySet or values is a good idea.  Unless you are suggesting that we only pull local entries only?  In which case we have reverted these changes back to ISPN 6 and older.</div><div><br></div><div>The entrySet, keySet and values as of ISPN 7.0 are actually completely backing collections and methods are evaluated per invocation.  This means any updates to them or the cache it was created from are seen by each other.</div><div> </div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<br>
My suggestion, then, is to make entrySet().iterator() and<br>
entrySet().spliterator() eager, so that they don&#39;t need to implement<br>
AutoCloseable. I would even go as far as to say that entrySet() should<br>
be eager itself, but maybe keySet().stream() would make a better API<br>
than adding a new keysStream() method.<br></blockquote><div><br></div><div>Just so I understand you are more saying that we leave the entrySet, keySet and values the way they are so they are backing collections, however invocation of the iterator or spliterator would pull in all entries from the entire cache into memory at once?  It seems throwing UnsupportedOperationException with a message stating to use stream().iterator() and closing the stream would be better imo (however that would preclude the usage of foreach).  Note the foreach loop is only an issue when iterating over that collection and you break out of the loop early.</div><div><br></div><div>try (Stream&lt;Map.Entry&lt;K, V&gt; stream = entrySet.stream()) {</div><div>   Iterator&lt;Map.Entry&lt;K, V&gt;&gt; iterator = stream.iterator();</div><div>}</div><div><br></div><div>Actually I think the issue here is that our CacheCollections don&#39;t currently implement CloseableIterable like the EntryIterable does.  In that case you can do a simple foreach loop with a break in a try with resource.  We could then document that close closes any iterators or spliterators that were created from this instance of the collection.</div><div><br></div><div>It is a little awkward, but could work this way.</div><div><br></div><div><div>try (CacheSet&lt;Map.Entry&lt;K, V&gt;&gt; closeableEntrySet = entrySet) {</div><div>   for (Map.Entry&lt;K, V&gt; entry : closeableEntrySet) {</div><div>   }</div><div>}</div></div><div> </div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<br>
<br>
Now to your questions:<br>
<br>
1)<br>
forEach() doesn&#39;t allow the consumer to modify the entries, so I think<br>
the most common use case would be doing something with a captured<br>
variable (or System.out). </blockquote><div><br></div><div>This is actually something I didn&#39;t cover in the document.  But upon thinking about this more I was thinking we probably want to allow for CDI Injection of the cache for the consumer action before firing.  In this way the user can change values as they want.  This would behave almost identically to map/reduce, however it is much more understandable imo.</div><div> </div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">So I would make forEach execute the consumer<br>
on the originator, and maybe add a distributedForEach method that<br>
executes its consumer on each owner (accepting that the consumer may<br>
be executed twice for some keys, or never, if the originator crashes).<br>
distributedForEach probably makes more sense once you start injecting<br>
the Cache (or other components) in the remote Consumers.<br></blockquote><div><br></div><div>This was my conundrum before, however I believe I found a happy medium.  I figured if we implement it distributed gives more flexibility.  The user can still choose to run it locally as they desire.</div><div><br></div><div>For example you can call *.stream().iterator().forEachRemaining(consumer) if you wanted to do a forEach locally in a single thread.  And if you wanted it parallelized you can do</div><div>StreamSupport.stream(*.stream().spliterator(), true).forEach(consumer)</div><div><br></div><div>This would all be documented on the forEach method.</div><div> </div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<br>
peek()&#39;s intended use case is probably logging progress, so it will<br>
definitely need to interact with an external component. However,<br>
executing it to the originator would potentially change the execution<br>
of the stream dramatically, and adding logging shouldn&#39;t have that<br>
kind of impact. So peek() should be executed on the remote nodes, even<br>
if we don&#39;t have remote injection yet.<br></blockquote><div><br></div><div>This is how I ended up doing it was to have it done remotely.</div><div> </div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<br>
2)<br>
I would say implement sorting on the originator from the beginning,<br>
and limit() and skip() as well. It&#39;s true that users may me<br>
disappointed to see adding limit() doesn&#39;t improve the performance of<br>
their sorted() execution, but I would rather have a complete API<br>
available for applications who don&#39;t need to sort the entire cache.<br></blockquote><div><br></div><div>This is how I did this as well :)  Basically if we find that there is a sorted, distributed, limit or skip it performs all of the intermediate operations up that point then uses an iterator to bring the results back locally where it can be performed.  Limit and distinct are also actually performed remotely first to reduce how many results are returned.  I am not 100% sold on performing distinct remotely first as it could actually be significantly slower, but it should hopefully reduce some memory usage :P</div><div> </div><blockquote class="gmail_quote" style="margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1ex">
<br>
Cheers<br>
Dan<br>
<br>
<br>
[1] <a href="https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22" rel="noreferrer" target="_blank">https://github.com/infinispan/infinispan/pull/3571#discussion-diff-34033399R22</a><br>
<br>
On Wed, May 27, 2015 at 9:52 PM, William Burns &lt;<a href="mailto:mudokonman@gmail.com" target="_blank">mudokonman@gmail.com</a>&gt; wrote:<br>
&gt; Hello everyone,<br>
&gt;<br>
&gt; I wanted to let you know I wrote up a design documenting the successor to<br>
&gt; EntryRetriever, Distributed Streams [1] !<br>
&gt;<br>
&gt; Any comments or feedback would be much appreciated.<br>
&gt;<br>
&gt; I especially would like targeted feedback regarding:<br>
&gt;<br>
&gt; 1. The operators forEach and peek may want to be ran locally.  Should we<br>
&gt; have an overridden method so users can pick which they want?  Personally I<br>
&gt; feel that peek is too specific to matter and forEach can always be done by<br>
&gt; the caller locally if desired.<br>
&gt; 2. The intermediate operators limit and skip do not seem worth implementing<br>
&gt; unless we have sorting support. (Sorting support will come later).  I am<br>
&gt; thinking to not support these until sorting is added.<br>
&gt;<br>
&gt; Thanks,<br>
&gt;<br>
&gt;  - Will<br>
&gt;<br>
&gt; [1] <a href="https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support" rel="noreferrer" target="_blank">https://github.com/infinispan/infinispan/wiki/Distributed-Stream-Support</a><br>
&gt;<br>
&gt; _______________________________________________<br>
&gt; infinispan-dev mailing list<br>
&gt; <a href="mailto:infinispan-dev@lists.jboss.org" target="_blank">infinispan-dev@lists.jboss.org</a><br>
&gt; <a href="https://lists.jboss.org/mailman/listinfo/infinispan-dev" rel="noreferrer" target="_blank">https://lists.jboss.org/mailman/listinfo/infinispan-dev</a><br>
_______________________________________________<br>
infinispan-dev mailing list<br>
<a href="mailto:infinispan-dev@lists.jboss.org" target="_blank">infinispan-dev@lists.jboss.org</a><br>
<a href="https://lists.jboss.org/mailman/listinfo/infinispan-dev" rel="noreferrer" target="_blank">https://lists.jboss.org/mailman/listinfo/infinispan-dev</a><br>
</blockquote></div></div>