[cdi-dev] Building EJB-like @Asynchronous via interceptor in CDI

Jozef Hartinger jharting at redhat.com
Mon Jan 19 03:21:59 EST 2015


On 01/17/2015 12:32 AM, arjan tijms wrote:
> Hi,
>
> On Fri, Jan 16, 2015 at 10:41 PM, Jozef Hartinger <jharting at redhat.com 
> <mailto:jharting at redhat.com>> wrote:
>
>     Hi Arjan,
>
>     I did some changes recently in Weld interceptors and this usecase
>     now works smoothly. The code is not part of a release yet. See
>     this test for a simple implementation of an @Async interceptor
>     (basically the same as your initial attempt). Note that the chain
>     is repeatable but at the same time it is not reset after dispatch
>     to a different thread so you no longer need the ThreadLocal nor
>     any other workaround.
>
>
> That's quite a coincidence, it's indeed rather similar ;)
>
> I wonder how it now works though, as the InvocationContext "ctx" does 
> not seem to be made aware that it's been dispatched to a different 
> thread from within the code. Does it use an internal thread local to 
> keep state or so?
The most straightforward implementations use a mutable integer index to 
keep track of which interceptor of the chain is the one to be invoked 
next. This index is incremented after each interceptor invocation and 
once it reaches the size of the interceptor chain, the interceptor 
method is called. In addition, there is a spec requirement that says 
that calls to InvocationContext.proceed() should be repeatable i.e. you 
should be able implement retry such as:

try {
     return invocationContext.proceed();
} catch (Exception ignored) {
     return invocationContext.proceed(); // retry once again
}

In order to implement this requirement, InvocationContext's internal 
mutable index needs to be reset to the initial position for retry to 
work as expected. This clashes with the AsyncInterceptor as it returns 
from the interceptor method before the chain is finished. This causes 
the index to be reset prematurely and things get stuck in an infinite loop.

In order to support AsyncInterceptor we switched to an implementation 
where we use multiple InvocationContext implementations with immutable 
indexes. That way we do not need to reset the index and AsyncInterceptor 
works.
Note that we do not guard access to the state of parameters and context 
data and these are therefore expected to be published safely between 
threads (as your implementation does).


>
> I'll also try to see what this does on OWB. Do you think this is 
> something that should work, or just something that Weld happens to 
> support regardless of the spec?
It's something that is currently not required by the spec but doable. 
Therefore, it is a good idea to implement it this way.
>
> Kind regards,
> Arjan
>
>
>
>     https://github.com/weld/core/blob/master/tests-arquillian/src/test/java/org/jboss/weld/tests/interceptors/thread/async/AsyncInterceptor.java
>
>     Jozef
>
>
>     On 01/16/2015 06:17 PM, arjan tijms wrote:
>>     Hi,
>>
>>     I'm attempting to emulate EJB's @Asynchronous in CDI using
>>     interceptors.
>>
>>     Originally I had defined my interceptor as follows;
>>
>>     @Interceptor
>>     @Asynchronous
>>     @Priority(APPLICATION)
>>     public class AsynchronousInterceptor implements Serializable {
>>
>>         private static final long serialVersionUID = 1L;
>>
>>         @Resource
>>         private ManagedExecutorService managedExecutorService;
>>
>>         @AroundInvoke
>>         public Object submitAsync(InvocationContext ctx) throws
>>     Exception {
>>             return new FutureDelegator(managedExecutorService.submit(
>>     ()-> { return ctx.proceed(); } ));
>>         }
>>
>>     }
>>
>>     With FutureDelegator as follows:
>>
>>     public class FutureDelegator implements Future<Object> {
>>
>>         private Future<?> future;
>>
>>         public FutureDelegator(Future<?> future) {
>>             this.future = future;
>>         }
>>
>>         @Override
>>         public Object get() throws InterruptedException,
>>     ExecutionException {
>>             AsyncResult<?> asyncResult = (AsyncResult<?>) future.get();
>>             if (asyncResult == null) {
>>                 return null;
>>             }
>>
>>             return asyncResult.get();
>>         }
>>
>>         @Override
>>         public Object get(long timeout, TimeUnit unit) throws
>>     InterruptedException, ExecutionException, TimeoutException {
>>             AsyncResult<?> asyncResult = (AsyncResult<?>)
>>     future.get(timeout, unit);
>>             if (asyncResult == null) {
>>                 return null;
>>             }
>>
>>             return asyncResult.get();
>>         }
>>
>>         @Override
>>         public boolean cancel(boolean mayInterruptIfRunning) {
>>             return future.cancel(mayInterruptIfRunning);
>>         }
>>
>>         @Override
>>         public boolean isCancelled() {
>>             return future.isCancelled();
>>         }
>>         @Override
>>         public boolean isDone() {
>>             return future.isDone();
>>         }
>>
>>     }
>>
>>     This of course didn't quite work, as the InvocationContext will
>>     be reset after the @AroundInvoke method returns, and an infinite
>>     intercept loop results (on Weld).
>>
>>     I got it to work though on Weld by using a thread local check to
>>     break that loop:
>>
>>     @Interceptor
>>     @Asynchronous
>>     @Priority(PLATFORM_BEFORE)
>>     public class AsynchronousInterceptor implements Serializable {
>>
>>         private static final long serialVersionUID = 1L;
>>
>>         @Resource
>>         private ManagedExecutorService managedExecutorService;
>>
>>         private static final ThreadLocal<Boolean> asyncInvocation =
>>     new ThreadLocal<Boolean>();
>>
>>         @AroundInvoke
>>         public synchronized Object submitAsync(InvocationContext ctx)
>>     throws Exception {
>>
>>             if (TRUE.equals(asyncInvocation.get())) {
>>                 return ctx.proceed();
>>             }
>>
>>             return new FutureDelegator(managedExecutorService.submit(
>>     ()-> {
>>                 try {
>>                     asyncInvocation.set(TRUE);
>>                     return ctx.proceed();
>>                 } finally {
>>                      asyncInvocation.remove();
>>                 }
>>             }));
>>         }
>>
>>     }
>>
>>     But I've got a feeling this works just by chance and not because
>>     the workaround is so clever.
>>
>>     What do you guys think, what would be the best way to support
>>     this with the current CDI version? Or would CDI/Interceptors need
>>     something like Servlet's async support, where the
>>     InvocationContext is put into async mode whereafter it "simply"
>>     allows an other thread to continue processing on it?
>>
>>     Kind regards,
>>     Arjan Tijms
>>
>>
>>
>>
>>
>>
>>
>>     _______________________________________________
>>     cdi-dev mailing list
>>     cdi-dev at lists.jboss.org  <mailto:cdi-dev at lists.jboss.org>
>>     https://lists.jboss.org/mailman/listinfo/cdi-dev
>>
>>     Note that for all code provided on this list, the provider licenses the code under the Apache License, Version 2 (http://www.apache.org/licenses/LICENSE-2.0.html). For all other ideas provided on this list, the provider waives all patent and other intellectual property rights inherent in such information.
>
>

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/cdi-dev/attachments/20150119/6007a8d0/attachment.html 


More information about the cdi-dev mailing list