[infinispan-dev] Stream operations under lock

Sanne Grinovero sanne at infinispan.org
Tue Mar 28 10:55:29 EDT 2017


Hi Will,

I'm confused about the premise; when you state

" the Consumer is called while holding the lock for the given key and
subsequently released after the Consumer operation completes."

What are the transaction boundaries?
I see two options, please correct me so I understand:

A)

transaction.begin();
cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
someOperation );
transaction.commit(); <-- release all locks

B)

cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
    transaction.begin();
    //to stuff on entry
    /transaction.commit(); <-- release this single entry's lock
);


I think it's important to clarify this as I suspect that #A is not
implementable within reasonable guarantees, while in the case of B# I
see no use for optimistic locking *from a user's perspective*.

Also: what kind of guarantees do you expect about having the operation
being applied on some entry which is strictly *still* a subset of the
keys as defined by the filter predicates?
I take it you'd want to acquire the locks during the filtering process?

That would require the move the transaction boundary to the scenario
A# which seems undesirable.
Technically if I were to need something like this I guess I'd expect
to have a user experience akin to B# but have Infinispan essentially
use optimistic locking (and auto-skip) on entries which are mutated
and fall out of the filter predicate during the lock attempt.

Essentially I suspect that we'd not want to implement different
versions of this depending on the transaction mode, but figure out the
use case and implement a one and only transaction mode which suites
such use cases. So for example we'd simply not offer a mode which
requires to copy the whole grid into the current TX context.

Thanks,
Sanne



On 28 March 2017 at 14:49, William Burns <mudokonman at gmail.com> wrote:
>
>
> On Mon, Mar 27, 2017 at 9:02 PM Galder Zamarreño <galder at redhat.com> wrote:
>>
>>
>> --
>> Galder Zamarreño
>> Infinispan, Red Hat
>>
>> > On 21 Mar 2017, at 17:16, Dan Berindei <dan.berindei at gmail.com> wrote:
>> >
>> > I'm leaning towards option 1.
>> >
>> > Are you thinking about also allowing the consumer to modify the entry,
>> > like JCache's EntryProcessors? For a consumer that can only modify the
>> > current entry, we could even "emulate" locking in an optimistic cache
>> > by catching the WriteSkewException and running the consumer again.
>> >
>> > I wouldn't allow this to be mixed with other operations in a stream,
>> > because then you may have to run filters/mappers/sorting while holding
>> > the lock as well.
>>
>> ^ Would forEach w/ lock still run for all entries in originator? If so,
>> not being able to filter could be a pain. IOW, you'd be forcing all entries
>> to be shipped to a node and user to do its own filtering. Not ideal :\
>
>
> No the primary owner would run the operation per entry. I was thinking we
> would have 2 levels of filtering in my proposal above.
>
> We would have the first one which is using filterKeys on the CacheStream
> method. This requires serializing keys sent to each node (only primary owned
> keys are sent). While it has the cost of serialization it makes up for by
> having constant time lookups (no iterating memory/stores) for the keys as it
> creates a stream using Cache.get to populate it.
>
> The second was to support the filter method on the Stream API which would
> allow for a Predicate so you don't have to serialize keys. In this case you
> wouldn't want to include keys in this Predicate as all keys would be
> serialized to all nodes and then you still have to iterate and check the
> entire data container/store.
>
> You could actually do both as well. So if you only want a subset of known
> keys where their values match a Predicate this can be done too.
>
> cache.lockedStream().filterKeys(keys).filter(predicate).forEach();
>
>>
>>
>>
>> >
>> > Cheers
>> > Dan
>> >
>> >
>> > On Tue, Mar 21, 2017 at 5:37 PM, William Burns <mudokonman at gmail.com>
>> > wrote:
>> >> Some users have expressed the need to have some sort of forEach
>> >> operation
>> >> that is performed where the Consumer is called while holding the lock
>> >> for
>> >> the given key and subsequently released after the Consumer operation
>> >> completes.
>> >>
>> >> Due to the nature of how streams work with retries and performing the
>> >> operation on the primary owner, this works out quite well with forEach
>> >> to be
>> >> done in an efficient way.
>> >>
>> >> The problem is that this only really works well with non tx and
>> >> pessimistic
>> >> tx. This obviously leaves out optimistic tx, which at first I was a
>> >> little
>> >> worried about. But after thinking about it more, this prelocking and
>> >> optimistic tx don't really fit that well together anyways. So I am
>> >> thinking
>> >> whenever this operation is performed it would throw an exception not
>> >> letting
>> >> the user use this feature in optimistic transactions.
>> >>
>> >> Another question is what does the API for this look like. I was
>> >> debating
>> >> between 3 options myself:
>> >>
>> >> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> consumer)
>> >>
>> >> This require the least amount of changes, however the user can't
>> >> customize
>> >> certain parameters that CacheStream currently provides (listed below -
>> >> big
>> >> one being filterKeys).
>> >>
>> >> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> consumer)
>> >>
>> >> This method would only be allowed to be invoked on the Stream if no
>> >> other
>> >> intermediate operations were invoked, otherwise an exception would be
>> >> thrown. This still gives us access to all of the CacheStream methods
>> >> that
>> >> aren't on the Stream interface (ie. sequentialDistribution,
>> >> parallelDistribution, parallel, sequential, filterKeys,
>> >> filterKeySegments,
>> >> distributedBatchSize, disableRehashAware, timeout).
>> >>
>> >> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>> >>
>> >> This requires the most changes, however the API would be the most
>> >> explicit.
>> >> In this case the LockedStream would only have the methods on it that
>> >> are
>> >> able to be invoked as noted above and forEach.
>> >>
>> >> I personally feel that #3 might be the cleanest, but obviously requires
>> >> adding more classes. Let me know what you guys think and if you think
>> >> the
>> >> optimistic exclusion is acceptable.
>> >>
>> >> Thanks,
>> >>
>> >> - Will
>> >>
>> >> _______________________________________________
>> >> infinispan-dev mailing list
>> >> infinispan-dev at lists.jboss.org
>> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> > _______________________________________________
>> > infinispan-dev mailing list
>> > infinispan-dev at lists.jboss.org
>> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev



More information about the infinispan-dev mailing list