On Tue, Mar 28, 2017 at 6:44 PM, William Burns <mudokonman(a)gmail.com> wrote:
On Tue, Mar 28, 2017 at 11:24 AM Sanne Grinovero <sanne(a)infinispan.org>
wrote:
>
> Hi Will,
>
> I'm confused about the premise; when you state
>
> " the Consumer is called while holding the lock for the given key and
> subsequently released after the Consumer operation completes."
>
> What are the transaction boundaries?
> I see two options, please correct me so I understand:
>
> A)
>
> transaction.begin();
> cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
> someOperation );
> transaction.commit(); <-- release all locks
>
> B)
>
> cache.lockedStream().filterKeys(keys).filter(predicate).forEach(
> transaction.begin();
> //to stuff on entry
> /transaction.commit(); <-- release this single entry's lock
> );
The user code doesn't really affect this, it is done internally by
Infinispan. We would acquire the lock on the user's behalf in the stream
operation then call the user's accept method. Then after they return we
would unlock the lock. The way I have it implemented at the moment in my PoC
(which could change) is I don't even start a transaction. It just locks the
key and then if the user were to invoke an operation that requires a
transaction it adds the lock owner to their tx context at that point so they
have ownership of the key.
Also to note that a commit or rollback in the Consumer currently doesn't
release the lock on the key. Although this could be discussed to be possibly
changed.
With a transactional cache I was assuming you manage the transaction
yourself... if the user has to call
transactionManager.begin()/commit()/rollback() anyway, why not use a
regular stream?
>
>
>
> I think it's important to clarify this as I suspect that #A is not
> implementable within reasonable guarantees, while in the case of B# I
> see no use for optimistic locking *from a user's perspective*.
Exactly my thoughts regarding optimistic. I don't think #A is even feasible
given constraints of having a distributed transaction like this.
Totally agree that #A can't work with any kind of transaction configuration.
As to optimistic locking, I would like having "feature parity" between
pessimistic and optimistic caches as much as possible, but I agree
locking eagerly and retrying the consumer on WriteSkewException are a
bit too different to fit under the same API.
>
>
> Also: what kind of guarantees do you expect about having the operation
> being applied on some entry which is strictly *still* a subset of the
> keys as defined by the filter predicates?
> I take it you'd want to acquire the locks during the filtering process?
That is a good question. I hadn't thought about the details but what I had
implemented was we have to first read the entry, lock the key, reread the
entry (in case of concurrent update) and then finally call their Predicate.
Another reason the filterKeys is much more efficient :) Note this read,
locking and reread is done even without a Predicate supplied. And actually I
can also optimize filterKeys to not do the initial read since we already
have the key.
Would this be more efficient than the consumer reading the key with
FORCE_WRITE_LOCK and deciding what to do based on the value?
>
>
> That would require the move the transaction boundary to the scenario
> A# which seems undesirable.
> Technically if I were to need something like this I guess I'd expect
> to have a user experience akin to B# but have Infinispan essentially
> use optimistic locking (and auto-skip) on entries which are mutated
> and fall out of the filter predicate during the lock attempt.
>
> Essentially I suspect that we'd not want to implement different
> versions of this depending on the transaction mode, but figure out the
> use case and implement a one and only transaction mode which suites
> such use cases. So for example we'd simply not offer a mode which
> requires to copy the whole grid into the current TX context.
This was never the intent and in my follow up emails I came to what seems
like the same conclusion that basically this can't be done with the user
controlling the transaction and it doesn't really make sense in an
optimistic transaction (since you are already at that node, you are already
doing everything pessimistically).
Even local caches can use optimistic locking :)
>
>
> Thanks,
> Sanne
>
>
>
> On 28 March 2017 at 14:49, William Burns <mudokonman(a)gmail.com> wrote:
> >
> >
> > On Mon, Mar 27, 2017 at 9:02 PM Galder ZamarreƱo <galder(a)redhat.com>
> > wrote:
> >>
> >>
> >> --
> >> Galder ZamarreƱo
> >> Infinispan, Red Hat
> >>
> >> > On 21 Mar 2017, at 17:16, Dan Berindei <dan.berindei(a)gmail.com>
> >> > wrote:
> >> >
> >> > I'm leaning towards option 1.
> >> >
> >> > Are you thinking about also allowing the consumer to modify the
> >> > entry,
> >> > like JCache's EntryProcessors? For a consumer that can only modify
> >> > the
> >> > current entry, we could even "emulate" locking in an
optimistic cache
> >> > by catching the WriteSkewException and running the consumer again.
> >> >
> >> > I wouldn't allow this to be mixed with other operations in a
stream,
> >> > because then you may have to run filters/mappers/sorting while
> >> > holding
> >> > the lock as well.
> >>
> >> ^ Would forEach w/ lock still run for all entries in originator? If so,
> >> not being able to filter could be a pain. IOW, you'd be forcing all
> >> entries
> >> to be shipped to a node and user to do its own filtering. Not ideal :\
> >
> >
> > No the primary owner would run the operation per entry. I was thinking
> > we
> > would have 2 levels of filtering in my proposal above.
> >
> > We would have the first one which is using filterKeys on the CacheStream
> > method. This requires serializing keys sent to each node (only primary
> > owned
> > keys are sent). While it has the cost of serialization it makes up for
> > by
> > having constant time lookups (no iterating memory/stores) for the keys
> > as it
> > creates a stream using Cache.get to populate it.
> >
> > The second was to support the filter method on the Stream API which
> > would
> > allow for a Predicate so you don't have to serialize keys. In this case
> > you
> > wouldn't want to include keys in this Predicate as all keys would be
> > serialized to all nodes and then you still have to iterate and check the
> > entire data container/store.
> >
> > You could actually do both as well. So if you only want a subset of
> > known
> > keys where their values match a Predicate this can be done too.
> >
> > cache.lockedStream().filterKeys(keys).filter(predicate).forEach();
> >
> >>
> >>
> >>
> >> >
> >> > Cheers
> >> > Dan
> >> >
> >> >
> >> > On Tue, Mar 21, 2017 at 5:37 PM, William Burns
<mudokonman(a)gmail.com>
> >> > wrote:
> >> >> Some users have expressed the need to have some sort of forEach
> >> >> operation
> >> >> that is performed where the Consumer is called while holding the
> >> >> lock
> >> >> for
> >> >> the given key and subsequently released after the Consumer
operation
> >> >> completes.
> >> >>
> >> >> Due to the nature of how streams work with retries and performing
> >> >> the
> >> >> operation on the primary owner, this works out quite well with
> >> >> forEach
> >> >> to be
> >> >> done in an efficient way.
> >> >>
> >> >> The problem is that this only really works well with non tx and
> >> >> pessimistic
> >> >> tx. This obviously leaves out optimistic tx, which at first I was
a
> >> >> little
> >> >> worried about. But after thinking about it more, this prelocking
and
> >> >> optimistic tx don't really fit that well together anyways. So I
am
> >> >> thinking
> >> >> whenever this operation is performed it would throw an exception
not
> >> >> letting
> >> >> the user use this feature in optimistic transactions.
> >> >>
> >> >> Another question is what does the API for this look like. I was
> >> >> debating
> >> >> between 3 options myself:
> >> >>
> >> >> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache,
CacheEntry<K, V>>
> >> >> consumer)
> >> >>
> >> >> This require the least amount of changes, however the user
can't
> >> >> customize
> >> >> certain parameters that CacheStream currently provides (listed
below
> >> >> -
> >> >> big
> >> >> one being filterKeys).
> >> >>
> >> >> 2. CacheStream.forEachWithLock(BiConsumer<Cache,
CacheEntry<K, V>>
> >> >> consumer)
> >> >>
> >> >> This method would only be allowed to be invoked on the Stream if
no
> >> >> other
> >> >> intermediate operations were invoked, otherwise an exception would
> >> >> be
> >> >> thrown. This still gives us access to all of the CacheStream
methods
> >> >> that
> >> >> aren't on the Stream interface (ie. sequentialDistribution,
> >> >> parallelDistribution, parallel, sequential, filterKeys,
> >> >> filterKeySegments,
> >> >> distributedBatchSize, disableRehashAware, timeout).
> >> >>
> >> >> 3. LockedStream<CacheEntry<K, V>>
AdvancedCache.lockedStream()
> >> >>
> >> >> This requires the most changes, however the API would be the most
> >> >> explicit.
> >> >> In this case the LockedStream would only have the methods on it
that
> >> >> are
> >> >> able to be invoked as noted above and forEach.
> >> >>
> >> >> I personally feel that #3 might be the cleanest, but obviously
> >> >> requires
> >> >> adding more classes. Let me know what you guys think and if you
> >> >> think
> >> >> the
> >> >> optimistic exclusion is acceptable.
> >> >>
> >> >> Thanks,
> >> >>
> >> >> - Will
> >> >>
> >> >> _______________________________________________
> >> >> infinispan-dev mailing list
> >> >> infinispan-dev(a)lists.jboss.org
> >> >>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >> > _______________________________________________
> >> > infinispan-dev mailing list
> >> > infinispan-dev(a)lists.jboss.org
> >> >
https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >>
> >>
> >> _______________________________________________
> >> infinispan-dev mailing list
> >> infinispan-dev(a)lists.jboss.org
> >>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
> >
> >
> > _______________________________________________
> > infinispan-dev mailing list
> > infinispan-dev(a)lists.jboss.org
> >
https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev(a)lists.jboss.org
>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev