On 01/17/2015 12:32 AM, arjan tijms wrote:
Hi,

On Fri, Jan 16, 2015 at 10:41 PM, Jozef Hartinger <jharting@redhat.com> wrote:
Hi Arjan,

I did some changes recently in Weld interceptors and this usecase now works smoothly. The code is not part of a release yet. See this test for a simple implementation of an @Async interceptor (basically the same as your initial attempt). Note that the chain is repeatable but at the same time it is not reset after dispatch to a different thread so you no longer need the ThreadLocal nor any other workaround.

That's quite a coincidence, it's indeed rather similar ;)

I wonder how it now works though, as the InvocationContext "ctx" does not seem to be made aware that it's been dispatched to a different thread from within the code. Does it use an internal thread local to keep state or so?
The most straightforward implementations use a mutable integer index to keep track of which interceptor of the chain is the one to be invoked next. This index is incremented after each interceptor invocation and once it reaches the size of the interceptor chain, the interceptor method is called. In addition, there is a spec requirement that says that calls to InvocationContext.proceed() should be repeatable i.e. you should be able implement retry such as:

try {
    return invocationContext.proceed();
} catch (Exception ignored) {
    return invocationContext.proceed(); // retry once again
}

In order to implement this requirement, InvocationContext's internal mutable index needs to be reset to the initial position for retry to work as expected. This clashes with the AsyncInterceptor as it returns from the interceptor method before the chain is finished. This causes the index to be reset prematurely and things get stuck in an infinite loop.

In order to support AsyncInterceptor we switched to an implementation where we use multiple InvocationContext implementations with immutable indexes. That way we do not need to reset the index and AsyncInterceptor works.
Note that we do not guard access to the state of parameters and context data and these are therefore expected to be published safely between threads (as your implementation does).



I'll also try to see what this does on OWB. Do you think this is something that should work, or just something that Weld happens to support regardless of the spec?
It's something that is currently not required by the spec but doable. Therefore, it is a good idea to implement it this way.

Kind regards,
Arjan


 

https://github.com/weld/core/blob/master/tests-arquillian/src/test/java/org/jboss/weld/tests/interceptors/thread/async/AsyncInterceptor.java

Jozef


On 01/16/2015 06:17 PM, arjan tijms wrote:
Hi,

I'm attempting to emulate EJB's @Asynchronous in CDI using interceptors.

Originally I had defined my interceptor as follows;

@Interceptor
@Asynchronous
@Priority(APPLICATION)
public class AsynchronousInterceptor implements Serializable {

    private static final long serialVersionUID = 1L;
   
    @Resource
    private ManagedExecutorService managedExecutorService;

    @AroundInvoke
    public Object submitAsync(InvocationContext ctx) throws Exception {
        return new FutureDelegator(managedExecutorService.submit( ()-> { return ctx.proceed(); } ));
    }

}

With FutureDelegator as follows:

public class FutureDelegator implements Future<Object> {
   
    private Future<?> future;
   
    public FutureDelegator(Future<?> future) {
        this.future = future;
    }

    @Override
    public Object get() throws InterruptedException, ExecutionException {
        AsyncResult<?> asyncResult = (AsyncResult<?>) future.get();
        if (asyncResult == null) {
            return null;
        }
       
        return asyncResult.get();
    }
   
    @Override
    public Object get(long timeout, TimeUnit unit) throws InterruptedException,    ExecutionException, TimeoutException {
        AsyncResult<?> asyncResult = (AsyncResult<?>) future.get(timeout, unit);
        if (asyncResult == null) {
            return null;
        }
       
        return asyncResult.get();
    }
   
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return future.cancel(mayInterruptIfRunning);
    }
   
    @Override
    public boolean isCancelled() {
        return future.isCancelled();
    }
    @Override
    public boolean isDone() {
        return future.isDone();
    }
   
}

This of course didn't quite work, as the InvocationContext will be reset after the @AroundInvoke method returns, and an infinite intercept loop results (on Weld).

I got it to work though on Weld by using a thread local check to break that loop:

@Interceptor
@Asynchronous
@Priority(PLATFORM_BEFORE)
public class AsynchronousInterceptor implements Serializable {

    private static final long serialVersionUID = 1L;
   
    @Resource
    private ManagedExecutorService managedExecutorService;
   
    private static final ThreadLocal<Boolean> asyncInvocation = new ThreadLocal<Boolean>();

    @AroundInvoke
    public synchronized Object submitAsync(InvocationContext ctx) throws Exception {
       
        if (TRUE.equals(asyncInvocation.get())) {
            return ctx.proceed();
        }
       
        return new FutureDelegator(managedExecutorService.submit( ()-> {
            try {
                asyncInvocation.set(TRUE);
                return ctx.proceed();
            } finally {
                 asyncInvocation.remove();
            }
        }));
    }

}

But I've got a feeling this works just by chance and not because the workaround is so clever.

What do you guys think, what would be the best way to support this with the current CDI version? Or would CDI/Interceptors need something like Servlet's async support, where the InvocationContext is put into async mode whereafter it "simply" allows an other thread to continue processing on it?

Kind regards,
Arjan Tijms







_______________________________________________
cdi-dev mailing list
cdi-dev@lists.jboss.org
https://lists.jboss.org/mailman/listinfo/cdi-dev

Note that for all code provided on this list, the provider licenses the code under the Apache License, Version 2 (http://www.apache.org/licenses/LICENSE-2.0.html). For all other ideas provided on this list, the provider waives all patent and other intellectual property rights inherent in such information.