[
https://issues.redhat.com/browse/ISPN-10373?page=com.atlassian.jira.plugi...
]
Will Burns edited comment on ISPN-10373 at 5/13/20 10:33 AM:
-------------------------------------------------------------
Just so it is not lost this is how we can implement RemoteStore bulk operations
{code}
// @Override
// public CompletionStage<Void> bulkSegmentedWrite(int publisherCount,
//
Publisher<NonBlockingLoadWriteStore.SegmentedPublisher<MarshallableEntry<K,
V>>> publisher) {
// return Flowable.fromPublisher(publisher)
// .flatMap(Functions.identity(), false, publisherCount)
// .buffer(configuration.maxBatchSize())
// .concatMapCompletable(mes -> {
// CompletableFuture<Void> putAllFuture = remoteCache.putAllAsync(
// mes.stream().collect(Collectors.toMap(this::getKey,
this::getValue)));
// return RxJavaInterop.completionStageToCompletable(putAllFuture);
// })
// .doOnError(PersistenceException::new)
// .to(RxJavaInterop.completableToCompletionStage());
// }
// @Override
// public CompletionStage<Void> bulkSegmentedWrite(int publisherCount,
// Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K,
V>>> publisher) {
// return Flowable.fromPublisher(publisher)
// .flatMap(sp ->
// Flowable.fromPublisher(sp)
// .buffer(configuration.maxBatchSize())
// , false, publisherCount)
// .concatMapCompletable(mes -> {
// CompletableFuture<Void> putAllFuture = remoteCache.putAllAsync(
// mes.stream().collect(Collectors.toMap(this::getKey,
this::getValue)));
// return RxJavaInterop.completionStageToCompletable(putAllFuture);
// })
// .doOnError(PersistenceException::new)
// .to(RxJavaInterop.completableToCompletionStage());
// }
{code}
was (Author: william.burns):
Just so it is not lost this is how we can implement RemoteStore bulk operations
// @Override
// public CompletionStage<Void> bulkSegmentedWrite(int publisherCount,
//
Publisher<NonBlockingLoadWriteStore.SegmentedPublisher<MarshallableEntry<K,
V>>> publisher) {
// return Flowable.fromPublisher(publisher)
// .flatMap(Functions.identity(), false, publisherCount)
// .buffer(configuration.maxBatchSize())
// .concatMapCompletable(mes -> {
// CompletableFuture<Void> putAllFuture = remoteCache.putAllAsync(
// mes.stream().collect(Collectors.toMap(this::getKey,
this::getValue)));
// return RxJavaInterop.completionStageToCompletable(putAllFuture);
// })
// .doOnError(PersistenceException::new)
// .to(RxJavaInterop.completableToCompletionStage());
// }
// @Override
// public CompletionStage<Void> bulkSegmentedWrite(int publisherCount,
// Publisher<NonBlockingStore.SegmentedPublisher<MarshallableEntry<K,
V>>> publisher) {
// return Flowable.fromPublisher(publisher)
// .flatMap(sp ->
// Flowable.fromPublisher(sp)
// .buffer(configuration.maxBatchSize())
// , false, publisherCount)
// .concatMapCompletable(mes -> {
// CompletableFuture<Void> putAllFuture = remoteCache.putAllAsync(
// mes.stream().collect(Collectors.toMap(this::getKey,
this::getValue)));
// return RxJavaInterop.completionStageToCompletable(putAllFuture);
// })
// .doOnError(PersistenceException::new)
// .to(RxJavaInterop.completableToCompletionStage());
// }
Store/Loader Non blocking SPI
-----------------------------
Key: ISPN-10373
URL:
https://issues.redhat.com/browse/ISPN-10373
Project: Infinispan
Issue Type: Feature Request
Components: Loaders and Stores
Reporter: Will Burns
Assignee: Will Burns
Priority: Major
We need to add and use a non blocking SPI internally for our stores/loaders. We added
ISPN-9722, which is a good step and refactored all of our internal code to use "non
blocking" stores. However the stores themselves are all inherently sync even if the
store itself could be non blocking. We would have to add a new SPI interface to allow for
such non blocking operations. We would then remove all the explicit threading added in
ISPN-9722 and move it to a wrapper around a currently sync loader instead. This way an
invoking thread doesn't need to do a context switch or anything if invoking just a non
blocking store operation.
--
This message was sent by Atlassian Jira
(v7.13.8#713008)