[infinispan-dev] Stream operations under lock

Dan Berindei dan.berindei at gmail.com
Tue Mar 28 12:44:40 EDT 2017


On Tue, Mar 28, 2017 at 6:44 PM, William Burns <mudokonman at gmail.com> wrote:
>
>
> On Tue, Mar 28, 2017 at 11:24 AM Sanne Grinovero <sanne at infinispan.org>
> wrote:
>>
>> Hi Will,
>>
>> I'm confused about the premise; when you state
>>
>> " the Consumer is called while holding the lock for the given key and
>> subsequently released after the Consumer operation completes."
>>
>> What are the transaction boundaries?
>> I see two options, please correct me so I understand:
>>
>> A)
>>
>> transaction.begin();
>> cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
>> someOperation );
>> transaction.commit(); <-- release all locks
>>
>> B)
>>
>> cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
>>     transaction.begin();
>>     //to stuff on entry
>>     /transaction.commit(); <-- release this single entry's lock
>> );
>
>
> The user code doesn't really affect this, it is done internally by
> Infinispan. We would acquire the lock on the user's behalf in the stream
> operation then call the user's accept method. Then after they return we
> would unlock the lock. The way I have it implemented at the moment in my PoC
> (which could change) is I don't even start a transaction. It just locks the
> key and then if the user were to invoke an operation that requires a
> transaction it adds the lock owner to their tx context at that point so they
> have ownership of the key.
>
> Also to note that a commit or rollback in the Consumer currently doesn't
> release the lock on the key. Although this could be discussed to be possibly
> changed.
>

With a transactional cache I was assuming you manage the transaction
yourself... if the user has to call
transactionManager.begin()/commit()/rollback() anyway, why not use a
regular stream?

>>
>>
>>
>> I think it's important to clarify this as I suspect that #A is not
>> implementable within reasonable guarantees, while in the case of B# I
>> see no use for optimistic locking *from a user's perspective*.
>
>
> Exactly my thoughts regarding optimistic. I don't think #A is even feasible
> given constraints of having a distributed transaction like this.
>

Totally agree that #A can't work with any kind of transaction configuration.

As to optimistic locking, I would like having "feature parity" between
pessimistic and optimistic caches as much as possible, but I agree
locking eagerly and retrying the consumer on WriteSkewException are a
bit too different to fit under the same API.

>>
>>
>> Also: what kind of guarantees do you expect about having the operation
>> being applied on some entry which is strictly *still* a subset of the
>> keys as defined by the filter predicates?
>> I take it you'd want to acquire the locks during the filtering process?
>
>
> That is a good question. I hadn't thought about the details but what I had
> implemented was we have to first read the entry, lock the key, reread the
> entry (in case of concurrent update) and then finally call their Predicate.
> Another reason the filterKeys is much more efficient :) Note this read,
> locking and reread is done even without a Predicate supplied. And actually I
> can also optimize filterKeys to not do the initial read since we already
> have the key.
>

Would this be more efficient than the consumer reading the key with
FORCE_WRITE_LOCK and deciding what to do based on the value?

>>
>>
>> That would require the move the transaction boundary to the scenario
>> A# which seems undesirable.
>> Technically if I were to need something like this I guess I'd expect
>> to have a user experience akin to B# but have Infinispan essentially
>> use optimistic locking (and auto-skip) on entries which are mutated
>> and fall out of the filter predicate during the lock attempt.
>>
>> Essentially I suspect that we'd not want to implement different
>> versions of this depending on the transaction mode, but figure out the
>> use case and implement a one and only transaction mode which suites
>> such use cases. So for example we'd simply not offer a mode which
>> requires to copy the whole grid into the current TX context.
>
>
> This was never the intent and in my follow up emails I came to what seems
> like the same conclusion that basically this can't be done with the user
> controlling the transaction and it doesn't really make sense in an
> optimistic transaction (since you are already at that node, you are already
> doing everything pessimistically).
>

Even local caches can use optimistic locking :)

>>
>>
>> Thanks,
>> Sanne
>>
>>
>>
>> On 28 March 2017 at 14:49, William Burns <mudokonman at gmail.com> wrote:
>> >
>> >
>> > On Mon, Mar 27, 2017 at 9:02 PM Galder Zamarreño <galder at redhat.com>
>> > wrote:
>> >>
>> >>
>> >> --
>> >> Galder Zamarreño
>> >> Infinispan, Red Hat
>> >>
>> >> > On 21 Mar 2017, at 17:16, Dan Berindei <dan.berindei at gmail.com>
>> >> > wrote:
>> >> >
>> >> > I'm leaning towards option 1.
>> >> >
>> >> > Are you thinking about also allowing the consumer to modify the
>> >> > entry,
>> >> > like JCache's EntryProcessors? For a consumer that can only modify
>> >> > the
>> >> > current entry, we could even "emulate" locking in an optimistic cache
>> >> > by catching the WriteSkewException and running the consumer again.
>> >> >
>> >> > I wouldn't allow this to be mixed with other operations in a stream,
>> >> > because then you may have to run filters/mappers/sorting while
>> >> > holding
>> >> > the lock as well.
>> >>
>> >> ^ Would forEach w/ lock still run for all entries in originator? If so,
>> >> not being able to filter could be a pain. IOW, you'd be forcing all
>> >> entries
>> >> to be shipped to a node and user to do its own filtering. Not ideal :\
>> >
>> >
>> > No the primary owner would run the operation per entry. I was thinking
>> > we
>> > would have 2 levels of filtering in my proposal above.
>> >
>> > We would have the first one which is using filterKeys on the CacheStream
>> > method. This requires serializing keys sent to each node (only primary
>> > owned
>> > keys are sent). While it has the cost of serialization it makes up for
>> > by
>> > having constant time lookups (no iterating memory/stores) for the keys
>> > as it
>> > creates a stream using Cache.get to populate it.
>> >
>> > The second was to support the filter method on the Stream API which
>> > would
>> > allow for a Predicate so you don't have to serialize keys. In this case
>> > you
>> > wouldn't want to include keys in this Predicate as all keys would be
>> > serialized to all nodes and then you still have to iterate and check the
>> > entire data container/store.
>> >
>> > You could actually do both as well. So if you only want a subset of
>> > known
>> > keys where their values match a Predicate this can be done too.
>> >
>> > cache.lockedStream().filterKeys(keys).filter(predicate).forEach();
>> >
>> >>
>> >>
>> >>
>> >> >
>> >> > Cheers
>> >> > Dan
>> >> >
>> >> >
>> >> > On Tue, Mar 21, 2017 at 5:37 PM, William Burns <mudokonman at gmail.com>
>> >> > wrote:
>> >> >> Some users have expressed the need to have some sort of forEach
>> >> >> operation
>> >> >> that is performed where the Consumer is called while holding the
>> >> >> lock
>> >> >> for
>> >> >> the given key and subsequently released after the Consumer operation
>> >> >> completes.
>> >> >>
>> >> >> Due to the nature of how streams work with retries and performing
>> >> >> the
>> >> >> operation on the primary owner, this works out quite well with
>> >> >> forEach
>> >> >> to be
>> >> >> done in an efficient way.
>> >> >>
>> >> >> The problem is that this only really works well with non tx and
>> >> >> pessimistic
>> >> >> tx. This obviously leaves out optimistic tx, which at first I was a
>> >> >> little
>> >> >> worried about. But after thinking about it more, this prelocking and
>> >> >> optimistic tx don't really fit that well together anyways. So I am
>> >> >> thinking
>> >> >> whenever this operation is performed it would throw an exception not
>> >> >> letting
>> >> >> the user use this feature in optimistic transactions.
>> >> >>
>> >> >> Another question is what does the API for this look like. I was
>> >> >> debating
>> >> >> between 3 options myself:
>> >> >>
>> >> >> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> >> consumer)
>> >> >>
>> >> >> This require the least amount of changes, however the user can't
>> >> >> customize
>> >> >> certain parameters that CacheStream currently provides (listed below
>> >> >> -
>> >> >> big
>> >> >> one being filterKeys).
>> >> >>
>> >> >> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
>> >> >> consumer)
>> >> >>
>> >> >> This method would only be allowed to be invoked on the Stream if no
>> >> >> other
>> >> >> intermediate operations were invoked, otherwise an exception would
>> >> >> be
>> >> >> thrown. This still gives us access to all of the CacheStream methods
>> >> >> that
>> >> >> aren't on the Stream interface (ie. sequentialDistribution,
>> >> >> parallelDistribution, parallel, sequential, filterKeys,
>> >> >> filterKeySegments,
>> >> >> distributedBatchSize, disableRehashAware, timeout).
>> >> >>
>> >> >> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>> >> >>
>> >> >> This requires the most changes, however the API would be the most
>> >> >> explicit.
>> >> >> In this case the LockedStream would only have the methods on it that
>> >> >> are
>> >> >> able to be invoked as noted above and forEach.
>> >> >>
>> >> >> I personally feel that #3 might be the cleanest, but obviously
>> >> >> requires
>> >> >> adding more classes. Let me know what you guys think and if you
>> >> >> think
>> >> >> the
>> >> >> optimistic exclusion is acceptable.
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> - Will
>> >> >>
>> >> >> _______________________________________________
>> >> >> infinispan-dev mailing list
>> >> >> infinispan-dev at lists.jboss.org
>> >> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >> > _______________________________________________
>> >> > infinispan-dev mailing list
>> >> > infinispan-dev at lists.jboss.org
>> >> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >>
>> >>
>> >> _______________________________________________
>> >> infinispan-dev mailing list
>> >> infinispan-dev at lists.jboss.org
>> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >
>> >
>> > _______________________________________________
>> > infinispan-dev mailing list
>> > infinispan-dev at lists.jboss.org
>> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev



More information about the infinispan-dev mailing list