[infinispan-dev] Stream operations under lock

William Burns mudokonman at gmail.com
Tue Mar 28 09:49:46 EDT 2017


On Mon, Mar 27, 2017 at 9:02 PM Galder Zamarreño <galder at redhat.com> wrote:

>
> --
> Galder Zamarreño
> Infinispan, Red Hat
>
> > On 21 Mar 2017, at 17:16, Dan Berindei <dan.berindei at gmail.com> wrote:
> >
> > I'm leaning towards option 1.
> >
> > Are you thinking about also allowing the consumer to modify the entry,
> > like JCache's EntryProcessors? For a consumer that can only modify the
> > current entry, we could even "emulate" locking in an optimistic cache
> > by catching the WriteSkewException and running the consumer again.
> >
> > I wouldn't allow this to be mixed with other operations in a stream,
> > because then you may have to run filters/mappers/sorting while holding
> > the lock as well.
>
> ^ Would forEach w/ lock still run for all entries in originator? If so,
> not being able to filter could be a pain. IOW, you'd be forcing all entries
> to be shipped to a node and user to do its own filtering. Not ideal :\
>

No the primary owner would run the operation per entry. I was thinking we
would have 2 levels of filtering in my proposal above.

We would have the first one which is using filterKeys on the CacheStream
method. This requires serializing keys sent to each node (only primary
owned keys are sent). While it has the cost of serialization it makes up
for by having constant time lookups (no iterating memory/stores) for the
keys as it creates a stream using Cache.get to populate it.

The second was to support the filter method on the Stream API which would
allow for a Predicate so you don't have to serialize keys. In this case you
wouldn't want to include keys in this Predicate as all keys would be
serialized to all nodes and then you still have to iterate and check the
entire data container/store.

You could actually do both as well. So if you only want a subset of known
keys where their values match a Predicate this can be done too.

cache.lockedStream().filterKeys(keys).filter(predicate).forEach();


>
>
> >
> > Cheers
> > Dan
> >
> >
> > On Tue, Mar 21, 2017 at 5:37 PM, William Burns <mudokonman at gmail.com>
> wrote:
> >> Some users have expressed the need to have some sort of forEach
> operation
> >> that is performed where the Consumer is called while holding the lock
> for
> >> the given key and subsequently released after the Consumer operation
> >> completes.
> >>
> >> Due to the nature of how streams work with retries and performing the
> >> operation on the primary owner, this works out quite well with forEach
> to be
> >> done in an efficient way.
> >>
> >> The problem is that this only really works well with non tx and
> pessimistic
> >> tx. This obviously leaves out optimistic tx, which at first I was a
> little
> >> worried about. But after thinking about it more, this prelocking and
> >> optimistic tx don't really fit that well together anyways. So I am
> thinking
> >> whenever this operation is performed it would throw an exception not
> letting
> >> the user use this feature in optimistic transactions.
> >>
> >> Another question is what does the API for this look like. I was debating
> >> between 3 options myself:
> >>
> >> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> >> consumer)
> >>
> >> This require the least amount of changes, however the user can't
> customize
> >> certain parameters that CacheStream currently provides (listed below -
> big
> >> one being filterKeys).
> >>
> >> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
> >>
> >> This method would only be allowed to be invoked on the Stream if no
> other
> >> intermediate operations were invoked, otherwise an exception would be
> >> thrown. This still gives us access to all of the CacheStream methods
> that
> >> aren't on the Stream interface (ie. sequentialDistribution,
> >> parallelDistribution, parallel, sequential, filterKeys,
> filterKeySegments,
> >> distributedBatchSize, disableRehashAware, timeout).
> >>
> >> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
> >>
> >> This requires the most changes, however the API would be the most
> explicit.
> >> In this case the LockedStream would only have the methods on it that are
> >> able to be invoked as noted above and forEach.
> >>
> >> I personally feel that #3 might be the cleanest, but obviously requires
> >> adding more classes. Let me know what you guys think and if you think
> the
> >> optimistic exclusion is acceptable.
> >>
> >> Thanks,
> >>
> >> - Will
> >>
> >> _______________________________________________
> >> infinispan-dev mailing list
> >> infinispan-dev at lists.jboss.org
> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev at lists.jboss.org
> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20170328/8dd3c405/attachment.html 


More information about the infinispan-dev mailing list