[infinispan-dev] Stream operations under lock

Dan Berindei dan.berindei at gmail.com
Tue Mar 21 12:16:21 EDT 2017


I'm leaning towards option 1.

Are you thinking about also allowing the consumer to modify the entry,
like JCache's EntryProcessors? For a consumer that can only modify the
current entry, we could even "emulate" locking in an optimistic cache
by catching the WriteSkewException and running the consumer again.

I wouldn't allow this to be mixed with other operations in a stream,
because then you may have to run filters/mappers/sorting while holding
the lock as well.

Cheers
Dan


On Tue, Mar 21, 2017 at 5:37 PM, William Burns <mudokonman at gmail.com> wrote:
> Some users have expressed the need to have some sort of forEach operation
> that is performed where the Consumer is called while holding the lock for
> the given key and subsequently released after the Consumer operation
> completes.
>
> Due to the nature of how streams work with retries and performing the
> operation on the primary owner, this works out quite well with forEach to be
> done in an efficient way.
>
> The problem is that this only really works well with non tx and pessimistic
> tx. This obviously leaves out optimistic tx, which at first I was a little
> worried about. But after thinking about it more, this prelocking and
> optimistic tx don't really fit that well together anyways. So I am thinking
> whenever this operation is performed it would throw an exception not letting
> the user use this feature in optimistic transactions.
>
> Another question is what does the API for this look like. I was debating
> between 3 options myself:
>
> 1. AdvancedCache.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>>
> consumer)
>
> This require the least amount of changes, however the user can't customize
> certain parameters that CacheStream currently provides (listed below - big
> one being filterKeys).
>
> 2. CacheStream.forEachWithLock(BiConsumer<Cache, CacheEntry<K, V>> consumer)
>
> This method would only be allowed to be invoked on the Stream if no other
> intermediate operations were invoked, otherwise an exception would be
> thrown. This still gives us access to all of the CacheStream methods that
> aren't on the Stream interface (ie. sequentialDistribution,
> parallelDistribution, parallel, sequential, filterKeys, filterKeySegments,
> distributedBatchSize, disableRehashAware, timeout).
>
> 3. LockedStream<CacheEntry<K, V>> AdvancedCache.lockedStream()
>
> This requires the most changes, however the API would be the most explicit.
> In this case the LockedStream would only have the methods on it that are
> able to be invoked as noted above and forEach.
>
> I personally feel that #3 might be the cleanest, but obviously requires
> adding more classes. Let me know what you guys think and if you think the
> optimistic exclusion is acceptable.
>
> Thanks,
>
>  - Will
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


More information about the infinispan-dev mailing list