https://hibernate.atlassian.net/browse/HSEARCH-4700 introduced customization for what should happen when submitting an operation to a background processor and the queue of operation is full, giving the choice between:
- Blocking until the operation can be added to the queue
- Throwing RejectedExecutionException
We could introduce a third option, matching what Infinispan does at the moment: offload the submitting of the operation to another another theadpool that is allowed to block. That’s a middle-ground between blocking and failing: we won’t fail, but we won’t block in the current thread either. Might be useful for operations that are not time-critical and not expected to be executed in high volume, such as getting the size of the index. E.g. we’d change the OperationSubmitter API and also add something like this:
static OperationSubmitter offload(Consumer<Runnable> executor) {
return new OperationSubmitter() {
@Override
public <T> void submitToQueue(BlockingQueue<T> queue, T element, Function<T, Runnable> blockingRetryProducer) throws InterruptedException {
if ( !queue.offer( element ) ) {
executor.accept( blockingRetryProducer.apply( element ) );
}
}
};
}
Users (Infinispan) would do something like this:
OperationSubmitter offloadSubmitter = OperationSubmitter.offload(myExecutor::submit);
While our batching executor would do something like this:
operationSubmitter.submitToQueue(workQueue, work, blockingRetry);
We’d have to be careful how we implement the blockingRetryProducer, however: it should not just re-submit the element to the queue, but should also re-acquire all relevant locks. In AbstractWorkOrchestrator for example, it would need to re-acquire the lifecycleLock. Basically blockingRetryProducer = work -> orchestrator.submit(work, OperationSubmitter.BLOCKING) as shown above. That means blockingRetryProducer would need to be a parameter of BatchingExecutor#submit, whose value is set in the (calling) orchestrator. |