[infinispan-issues] [JBoss JIRA] (ISPN-1541) NotifyingNotifiableFuture's FutureListener can not invoke Future API on FutureListener#futureDone callback

Radim Vansa (JIRA) issues at jboss.org
Thu Jan 9 10:39:33 EST 2014


    [ https://issues.jboss.org/browse/ISPN-1541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12934726#comment-12934726 ] 

Radim Vansa commented on ISPN-1541:
-----------------------------------

Hi, I was looking for this in relation with ISPN-3868, but I haven't really got your solution. Now you can wait for the future.get(), but the returned value is of no use: as in replication.InvokeRemotelyInFutureTest, the returned value has to be known before, and that's not really handy for the listener. It seems there's the DeferredReturnFuture but it does not seem to me as usable (and it is not) as it will always return null.
                
> NotifyingNotifiableFuture's FutureListener can not invoke Future API on FutureListener#futureDone callback
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: ISPN-1541
>                 URL: https://issues.jboss.org/browse/ISPN-1541
>             Project: Infinispan
>          Issue Type: Bug
>          Components: RPC
>    Affects Versions: 5.0.1.FINAL, 5.1.0.BETA4
>            Reporter: Vladimir Blagojevic
>            Assignee: Vladimir Blagojevic
>             Fix For: 5.1.0.CR1
>
>
> Invoking Future#get from a FutureListener attached to a NotifyingNotifiableFuture passed as a parameter to RpcManager#invokeRemotelyInFuture throws InterruptedException! Sounds more complicated than it really is! In another words, the following code throws InterruptedException 
> {code}NotifyingFutureImpl f = ...
>       f.attachListener(new FutureListener<Object>() {
>          
>          @Override
>          public void futureDone(Future<Object> future) {
>             try {      
>                future.get();
>             } catch (Exception e) {   
>                e.printStackTrace();
>             }             
>          }
>       });
>       CommandsFactory cf = cache1.getAdvancedCache().getComponentRegistry().getComponent(CommandsFactory.class);
>       cache1.getAdvancedCache().getRpcManager().invokeRemotelyInFuture(null, cf.buildPutKeyValueCommand("k","v", -1, -1, null), f);
> {code}
> the reason why we get InterruptedException is that we invoke notifyDone which in turn invokes futureDone from the same thread invoking the future's callable. In effect we invoke future#get on the same *the same thread* that is running the callable for the associated future! 
> {code}
>  public final void invokeRemotelyInFuture(final Collection<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> l, final long timeout) {
>       if (trace) log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc, recipients);
>       final CountDownLatch futureSet = new CountDownLatch(1);
>       Callable<Object> c = new Callable<Object>() {
>          public Object call() throws Exception {
>             Object result = null;
>             try {
>                result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
>             } finally {
>                try {
>                   futureSet.await();
>                } catch (InterruptedException e) {
>                   Thread.currentThread().interrupt();
>                } finally {
>                   l.notifyDone();
>                }
>             }
>             return result;
>          }
>       };
>       l.setNetworkFuture(asyncExecutor.submit(c));
>       futureSet.countDown();      
>    }
> {code}
> The solution is to wait for callable to finish and invoke notifyDone from the other thread. We have two choices here:
> a) invoke notifyDone from the same thread invoking invokeRemotelyInFuture. This in turn changes invokeRemotelyInFuture to be less async than it should be
> b) invoke callback on another thread
> {code}
> public final void invokeRemotelyInFuture(final Collection<Address> recipients,
>             final ReplicableCommand rpc, final boolean usePriorityQueue,
>             final NotifyingNotifiableFuture<Object> l, final long timeout) {
>       if (trace)
>          log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc,
>                   recipients);
>       final CountDownLatch callableCompleted = new CountDownLatch(1);
>       Callable<Object> c = new Callable<Object>() {
>          public Object call() throws Exception {
>             Object result = null;
>             try {
>                result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
>             } finally {
>                callableCompleted.countDown();
>             }
>             return result;
>          }
>       };
>       l.setNetworkFuture(asyncExecutor.submit(c));
>       asyncExecutor.submit(new Runnable() {
>          
>          @Override
>          public void run() {
>             try {
>                callableCompleted.await();
>             } catch (Exception e) {
>             } finally {
>                l.notifyDone();
>             }       
>          }
>       });     
>    }
> {code}
> We can of course, leave things as they are right now but in such case we have to warn the implementer of FutureListener that he/she can not call future.get() from a FutureListener!

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


More information about the infinispan-issues mailing list