[infinispan-dev] Stream operations under lock

Dan Berindei dan.berindei at gmail.com
Wed Mar 29 11:37:35 EDT 2017


On Wed, Mar 29, 2017 at 4:24 PM, William Burns <mudokonman at gmail.com> wrote:
>
>
> On Wed, Mar 29, 2017 at 4:05 AM Dan Berindei <dan.berindei at gmail.com> wrote:
>>
>> On Tue, Mar 28, 2017 at 8:54 PM, William Burns <mudokonman at gmail.com>
>> wrote:
>> >
>> >
>> > On Tue, Mar 28, 2017 at 12:52 PM Dan Berindei <dan.berindei at gmail.com>
>> > wrote:
>> >>
>> >> On Tue, Mar 28, 2017 at 6:44 PM, William Burns <mudokonman at gmail.com>
>> >> wrote:
>> >> >
>> >> >
>> >> > On Tue, Mar 28, 2017 at 11:24 AM Sanne Grinovero
>> >> > <sanne at infinispan.org>
>> >> > wrote:
>> >> >>
>> >> >> Hi Will,
>> >> >>
>> >> >> I'm confused about the premise; when you state
>> >> >>
>> >> >> " the Consumer is called while holding the lock for the given key
>> >> >> and
>> >> >> subsequently released after the Consumer operation completes."
>> >> >>
>> >> >> What are the transaction boundaries?
>> >> >> I see two options, please correct me so I understand:
>> >> >>
>> >> >> A)
>> >> >>
>> >> >> transaction.begin();
>> >> >> cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
>> >> >> someOperation );
>> >> >> transaction.commit(); <-- release all locks
>> >> >>
>> >> >> B)
>> >> >>
>> >> >> cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
>> >> >>     transaction.begin();
>> >> >>     //to stuff on entry
>> >> >>     /transaction.commit(); <-- release this single entry's lock
>> >> >> );
>> >> >
>> >> >
>> >> > The user code doesn't really affect this, it is done internally by
>> >> > Infinispan. We would acquire the lock on the user's behalf in the
>> >> > stream
>> >> > operation then call the user's accept method. Then after they return
>> >> > we
>> >> > would unlock the lock. The way I have it implemented at the moment in
>> >> > my
>> >> > PoC
>> >> > (which could change) is I don't even start a transaction. It just
>> >> > locks
>> >> > the
>> >> > key and then if the user were to invoke an operation that requires a
>> >> > transaction it adds the lock owner to their tx context at that point
>> >> > so
>> >> > they
>> >> > have ownership of the key.
>> >> >
>> >> > Also to note that a commit or rollback in the Consumer currently
>> >> > doesn't
>> >> > release the lock on the key. Although this could be discussed to be
>> >> > possibly
>> >> > changed.
>> >> >
>> >>
>> >> With a transactional cache I was assuming you manage the transaction
>> >> yourself... if the user has to call
>> >> transactionManager.begin()/commit()/rollback() anyway, why not use a
>> >> regular stream?
>> >
>> >
>> > The user would be using an implicit transaction or explicit. Like I said
>> > this is up for discussion. The main reason I am staying away from
>> > managing
>> > the transaction is that the user can mess with the transaction as well
>> > which
>> > would possibly release the lock. It is much simpler if all I am doing is
>> > managing the lock. And if the user doesn't require a transaction in
>> > Infinispan we didn't waste time starting one and releasing one.
>> >
>>
>> Every write to a transactional cache will start an implicit
>> transaction, so I don't think we really have the option of not
>> starting a transaction.
>
>
> Yes a write to it will, but the stream stuff doesn't write to it, it just
> locks. So if the user doesn't have to actually write to the cache, they may
> want to just update a 3rd party system for example.
>

Yeah, my main use case was "processing" all/most of the entries in the
cache in some way. I must confess I didn't even think about using
Infinispan locks to synchronize access to external systems, I was
thinking only what would happen when the Consumer would access
multiple caches :)

OTOH a read-only transaction doesn't have to be very expensive either.
We already have an optimization in PessimisticLockingInterceptor to
skip sending the LockControlCommand to backup owners [1]. This applies
only for LockControlCommands generated implicitly by writes, but we
could modify TxDistributionInterceptor.visitLockControlCommand() to
skip the RPC for explicit AdvancedCache.lock() calls as well. If there
are no writes and no remote locks, the prepare and commit should be
local as well.

There will still be some overhead from interacting with the
transaction manager, and I can understand wanting to avoid that. I'm
not yet convinced it's worth writing a new API and potentially
confusing lock release behaviour because of it...

[1]: https://github.com/infinispan/infinispan/blob/9fb8752d847a85e168d785237f196967ace313f3/core/src/main/java/org/infinispan/interceptors/locking/PessimisticLockingInterceptor.java#L364-L376

>>
>>
>> >>
>> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> I think it's important to clarify this as I suspect that #A is not
>> >> >> implementable within reasonable guarantees, while in the case of B#
>> >> >> I
>> >> >> see no use for optimistic locking *from a user's perspective*.
>> >> >
>> >> >
>> >> > Exactly my thoughts regarding optimistic. I don't think #A is even
>> >> > feasible
>> >> > given constraints of having a distributed transaction like this.
>> >> >
>> >>
>> >> Totally agree that #A can't work with any kind of transaction
>> >> configuration.
>> >>
>> >> As to optimistic locking, I would like having "feature parity" between
>> >> pessimistic and optimistic caches as much as possible, but I agree
>> >> locking eagerly and retrying the consumer on WriteSkewException are a
>> >> bit too different to fit under the same API.
>> >
>> >
>> > Yeah I would definitely like to have it as well, but I just can't see
>> > how it
>> > fits in. This is despite the implementation detail that it is quite
>> > difficult to get it working currently :D
>> >
>> >>
>> >>
>> >> >>
>> >> >>
>> >> >> Also: what kind of guarantees do you expect about having the
>> >> >> operation
>> >> >> being applied on some entry which is strictly *still* a subset of
>> >> >> the
>> >> >> keys as defined by the filter predicates?
>> >> >> I take it you'd want to acquire the locks during the filtering
>> >> >> process?
>> >> >
>> >> >
>> >> > That is a good question. I hadn't thought about the details but what
>> >> > I
>> >> > had
>> >> > implemented was we have to first read the entry, lock the key, reread
>> >> > the
>> >> > entry (in case of concurrent update) and then finally call their
>> >> > Predicate.
>> >> > Another reason the filterKeys is much more efficient :) Note this
>> >> > read,
>> >> > locking and reread is done even without a Predicate supplied. And
>> >> > actually I
>> >> > can also optimize filterKeys to not do the initial read since we
>> >> > already
>> >> > have the key.
>> >> >
>> >>
>> >> Would this be more efficient than the consumer reading the key with
>> >> FORCE_WRITE_LOCK and deciding what to do based on the value?
>> >>
>> >
>> > The problem with this is you have to go remote to lock the key, return
>> > the
>> > value then do something for every key (not to mention pulling those keys
>> > using an iterator). Very costly! The main benefit of the stream with
>> > lock is
>> > that you are performing everything on the primary owner of the data with
>> > the
>> > lock already acquired. The only piece sent remotely is the consumer and
>> > some
>> > internal classes, very light weight and you have all the benefits of
>> > data
>> > locality.
>> >
>>
>> I meant using cache.withFlags(FORCE_WRITE_LOCK).get(key) in the
>> Consumer itself, and assuming it runs on the primary owner.
>
>
> Ah yes you could for pessimistic, but not optimistic or nontx :( In this
> case it would be very similar.
>
>>
>>
>> >>
>> >> >>
>> >> >>
>> >> >> That would require the move the transaction boundary to the scenario
>> >> >> A# which seems undesirable.
>> >> >> Technically if I were to need something like this I guess I'd expect
>> >> >> to have a user experience akin to B# but have Infinispan essentially
>> >> >> use optimistic locking (and auto-skip) on entries which are mutated
>> >> >> and fall out of the filter predicate during the lock attempt.
>> >> >>
>> >> >> Essentially I suspect that we'd not want to implement different
>> >> >> versions of this depending on the transaction mode, but figure out
>> >> >> the
>> >> >> use case and implement a one and only transaction mode which suites
>> >> >> such use cases. So for example we'd simply not offer a mode which
>> >> >> requires to copy the whole grid into the current TX context.
>> >> >
>> >> >
>> >> > This was never the intent and in my follow up emails I came to what
>> >> > seems
>> >> > like the same conclusion that basically this can't be done with the
>> >> > user
>> >> > controlling the transaction and it doesn't really make sense in an
>> >> > optimistic transaction (since you are already at that node, you are
>> >> > already
>> >> > doing everything pessimistically).
>> >> >
>> >>
>> >> Even local caches can use optimistic locking :)
>> >
>> >
>> > Yes I know :) I was just framing it in the notion of remote. If anyone
>> > can
>> > think of a nice way of using this with optimistic transactions I would
>> > all
>> > be for it. But optimistic transactions just doesn't make any sense to me
>> > when you are locking a key eagerly for someone to do something with it
>> > (definition of pessimistic transaction).
>> >
>>
>> Maybe I didn't explain properly...
>>
>> I meant in optimistic caches we wouldn't acquire the lock at all. The
>> stream would just start an optimistic transaction, run the consumer,
>> and try to commit. If the prepare fails because of a
>> WriteSkewException, start from the beginning.
>
>
> This assumes the user will be updating the cache, I am not sure we can
> always assume that (in regards to starting a transaction).
>
> But this is a good point (I misunderstood earlier), it should be pretty easy
> to just catch WriteSkewException and retry the given Consumer with the new
> entry (if it exists still).
>
>>
>>
>> >>
>> >>
>> >> >>
>> >> >>
>> >> >> Thanks,
>> >> >> Sanne
>> >> >>
>> >> >>
>> >> >>
>> >> >> On 28 March 2017 at 14:49, William Burns <mudokonman at gmail.com>
>> >> >> wrote:
>> >> >> >
>> >> >> >
>> >> >> > On Mon, Mar 27, 2017 at 9:02 PM Galder Zamarreño
>> >> >> > <galder at redhat.com>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >>
>> >> >> >> --
>> >> >> >> Galder Zamarreño
>> >> >> >> Infinispan, Red Hat
>> >> >> >>
>> >> >> >> > On 21 Mar 2017, at 17:16, Dan Berindei <dan.berindei at gmail.com>
>> >> >> >> > wrote:
>> >> >> >> >
>> >> >> >> > I'm leaning towards option 1.
>> >> >> >> >
>> >> >> >> > Are you thinking about also allowing the consumer to modify the
>> >> >> >> > entry,
>> >> >> >> > like JCache's EntryProcessors? For a consumer that can only
>> >> >> >> > modify
>> >> >> >> > the
>> >> >> >> > current entry, we could even "emulate" locking in an optimistic
>> >> >> >> > cache
>> >> >> >> > by catching the WriteSkewException and running the consumer
>> >> >> >> > again.
>> >> >> >> >
>> >> >> >> > I wouldn't allow this to be mixed with other operations in a
>> >> >> >> > stream,
>> >> >> >> > because then you may have to run filters/mappers/sorting while
>> >> >> >> > holding
>> >> >> >> > the lock as well.
>> >> >> >>
>> >> >> >> ^ Would forEach w/ lock still run for all entries in originator?
>> >> >> >> If
>> >> >> >> so,
>> >> >> >> not being able to filter could be a pain. IOW, you'd be forcing
>> >> >> >> all
>> >> >> >> entries
>> >> >> >> to be shipped to a node and user to do its own filtering. Not
>> >> >> >> ideal
>> >> >> >> :\
>> >> >> >
>> >> >> >
>> >> >> > No the primary owner would run the operation per entry. I was
>> >> >> > thinking
>> >> >> > we
>> >> >> > would have 2 levels of filtering in my proposal above.
>> >> >> >
>> >> >> > We would have the first one which is using filterKeys on the
>> >> >> > CacheStream
>> >> >> > method. This requires serializing keys sent to each node (only
>> >> >> > primary
>> >> >> > owned
>> >> >> > keys are sent). While it has the cost of serialization it makes up
>> >> >> > for
>> >> >> > by
>> >> >> > having constant time lookups (no iterating memory/stores) for the
>> >> >> > keys
>> >> >> > as it
>> >> >> > creates a stream using Cache.get to populate it.
>> >> >> >
>> >> >> > The second was to support the filter method on the Stream API
>> >> >> > which
>> >> >> > would
>> >> >> > allow for a Predicate so you don't have to serialize keys. In this
>> >> >> > case
>> >> >> > you
>> >> >> > wouldn't want to include keys in this Predicate as all keys would
>> >> >> > be
>> >> >> > serialized to all nodes and then you still have to iterate and
>> >> >> > check
>> >> >> > the
>> >> >> > entire data container/store.
>> >> >> >
>> >> >> > You could actually do both as well. So if you only want a subset
>> >> >> > of
>> >> >> > known
>> >> >> > keys where their values match a Predicate this can be done too.
>> >> >> >
>> >> >> > cache.lockedStream().filterKeys(keys).filter(predicate).forEach();
>> >> >> >
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> >
>> >> >> >> > Cheers
>> >> >> >> > Dan
>> >> >> >> >
>> >> >> >> >
>> >> >> >> > On Tue, Mar 21, 2017 at 5:37 PM, William Burns
>> >> >> >> > <mudokonman at gmail.com>
>> >> >> >> > wrote:
>> >> >> >> >> Some users have expressed the need to have some sort of
>> >> >> >> >> forEach
>> >> >> >> >> operation
>> >> >> >> >> that is performed where the Consumer is called while holding
>> >> >> >> >> the
>> >> >> >> >> lock
>> >> >> >> >> for
>> >> >> >> >> the given key and subsequently released after the Consumer
>> >> >> >> >> operation
>> >> >> >> >> completes.
>> >> >> >> >>
>> >> >> >> >> Due to the nature of how streams work with retries and
>> >> >> >> >> performing
>> >> >> >> >> the
>> >> >> >> >> operation on the primary owner, this works out quite well with
>> >> >> >> >> forEach
>> >> >> >> >> to be
>> >> >> >> >> done in an efficient way.
>> >> >> >> >>
>> >> >> >> >> The problem is that this only really works well with non tx
>> >> >> >> >> and
>> >> >> >> >> pessimistic
>> >> >> >> >> tx. This obviously leaves out optimistic tx, which at first I
>> >> >> >> >> was
>> >> >> >> >> a
>> >> >> >> >> little
>> >> >> >> >> worried about. But after thinking about it more, this
>> >> >> >> >> prelocking
>> >> >> >> >> and
>> >> >> >> >> optimistic tx don't really fit that well together anyways. So
>> >> >> >> >> I
>> >> >> >> >> am
>> >> >> >> >> thinking
>> >> >> >> >> whenever this operation is performed it would throw an
>> >> >> >> >> exception
>> >> >> >> >> not
>> >> >> >> >> letting
>> >> >> >> >> the user use this feature in optimistic transactions.
>> >> >> >> >>
>> >> >> >> >> Another question is what does the API for this look like. I
>> >> >> >> >> was
>> >> >> >> >> debating
>> >> >> >> >> between 3 options myself:
>> >> >> >> >>
>> >> >> >> >> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache,
>> >> >> >> >> CacheEntry<K,
>> >> >> >> >> V>>
>> >> >> >> >> consumer)
>> >> >> >> >>
>> >> >> >> >> This require the least amount of changes, however the user
>> >> >> >> >> can't
>> >> >> >> >> customize
>> >> >> >> >> certain parameters that CacheStream currently provides (listed
>> >> >> >> >> below
>> >> >> >> >> -
>> >> >> >> >> big
>> >> >> >> >> one being filterKeys).
>> >> >> >> >>
>> >> >> >> >> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K,
>> >> >> >> >> V>>
>> >> >> >> >> consumer)
>> >> >> >> >>
>> >> >> >> >> This method would only be allowed to be invoked on the Stream
>> >> >> >> >> if
>> >> >> >> >> no
>> >> >> >> >> other
>> >> >> >> >> intermediate operations were invoked, otherwise an exception
>> >> >> >> >> would
>> >> >> >> >> be
>> >> >> >> >> thrown. This still gives us access to all of the CacheStream
>> >> >> >> >> methods
>> >> >> >> >> that
>> >> >> >> >> aren't on the Stream interface (ie. sequentialDistribution,
>> >> >> >> >> parallelDistribution, parallel, sequential, filterKeys,
>> >> >> >> >> filterKeySegments,
>> >> >> >> >> distributedBatchSize, disableRehashAware, timeout).
>> >> >> >> >>
>> >> >> >> >> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>> >> >> >> >>
>> >> >> >> >> This requires the most changes, however the API would be the
>> >> >> >> >> most
>> >> >> >> >> explicit.
>> >> >> >> >> In this case the LockedStream would only have the methods on
>> >> >> >> >> it
>> >> >> >> >> that
>> >> >> >> >> are
>> >> >> >> >> able to be invoked as noted above and forEach.
>> >> >> >> >>
>> >> >> >> >> I personally feel that #3 might be the cleanest, but obviously
>> >> >> >> >> requires
>> >> >> >> >> adding more classes. Let me know what you guys think and if
>> >> >> >> >> you
>> >> >> >> >> think
>> >> >> >> >> the
>> >> >> >> >> optimistic exclusion is acceptable.
>> >> >> >> >>
>> >> >> >> >> Thanks,
>> >> >> >> >>
>> >> >> >> >> - Will
>> >> >> >> >>
>> >> >> >> >> _______________________________________________
>> >> >> >> >> infinispan-dev mailing list
>> >> >> >> >> infinispan-dev at lists.jboss.org
>> >> >> >> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >> >> >> > _______________________________________________
>> >> >> >> > infinispan-dev mailing list
>> >> >> >> > infinispan-dev at lists.jboss.org
>> >> >> >> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >> >> >>
>> >> >> >>
>> >> >> >> _______________________________________________
>> >> >> >> infinispan-dev mailing list
>> >> >> >> infinispan-dev at lists.jboss.org
>> >> >> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >> >> >
>> >> >> >
>> >> >> > _______________________________________________
>> >> >> > infinispan-dev mailing list
>> >> >> > infinispan-dev at lists.jboss.org
>> >> >> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >> >>
>> >> >> _______________________________________________
>> >> >> infinispan-dev mailing list
>> >> >> infinispan-dev at lists.jboss.org
>> >> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >> >
>> >> >
>> >> > _______________________________________________
>> >> > infinispan-dev mailing list
>> >> > infinispan-dev at lists.jboss.org
>> >> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >>
>> >> _______________________________________________
>> >> infinispan-dev mailing list
>> >> infinispan-dev at lists.jboss.org
>> >> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> >
>> >
>> > _______________________________________________
>> > infinispan-dev mailing list
>> > infinispan-dev at lists.jboss.org
>> > https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev



More information about the infinispan-dev mailing list