]
Radim Vansa commented on ISPN-1541:
-----------------------------------
I believe that the synchronization between notifyDone() and setNetworkFuture has to be
done internally in NotifyingFutureImpl and that the notifyDone() interface should be
enriched by the actual return value.
NotifyingNotifiableFuture's FutureListener can not invoke Future
API on FutureListener#futureDone callback
----------------------------------------------------------------------------------------------------------
Key: ISPN-1541
URL:
https://issues.jboss.org/browse/ISPN-1541
Project: Infinispan
Issue Type: Bug
Components: RPC
Affects Versions: 5.0.1.FINAL, 5.1.0.BETA4
Reporter: Vladimir Blagojevic
Assignee: Vladimir Blagojevic
Fix For: 5.1.0.CR1
Invoking Future#get from a FutureListener attached to a NotifyingNotifiableFuture passed
as a parameter to RpcManager#invokeRemotelyInFuture throws InterruptedException! Sounds
more complicated than it really is! In another words, the following code throws
InterruptedException
{code}NotifyingFutureImpl f = ...
f.attachListener(new FutureListener<Object>() {
@Override
public void futureDone(Future<Object> future) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
});
CommandsFactory cf =
cache1.getAdvancedCache().getComponentRegistry().getComponent(CommandsFactory.class);
cache1.getAdvancedCache().getRpcManager().invokeRemotelyInFuture(null,
cf.buildPutKeyValueCommand("k","v", -1, -1, null), f);
{code}
the reason why we get InterruptedException is that we invoke notifyDone which in turn
invokes futureDone from the same thread invoking the future's callable. In effect we
invoke future#get on the same *the same thread* that is running the callable for the
associated future!
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients,
final ReplicableCommand rpc, final boolean usePriorityQueue, final
NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace) log.tracef("%s invoking in future call %s to recipient list
%s", t.getAddress(), rpc, recipients);
final CountDownLatch futureSet = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue,
timeout);
} finally {
try {
futureSet.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
l.notifyDone();
}
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
futureSet.countDown();
}
{code}
The solution is to wait for callable to finish and invoke notifyDone from the other
thread. We have two choices here:
a) invoke notifyDone from the same thread invoking invokeRemotelyInFuture. This in turn
changes invokeRemotelyInFuture to be less async than it should be
b) invoke callback on another thread
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients,
final ReplicableCommand rpc, final boolean usePriorityQueue,
final NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace)
log.tracef("%s invoking in future call %s to recipient list %s",
t.getAddress(), rpc,
recipients);
final CountDownLatch callableCompleted = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue,
timeout);
} finally {
callableCompleted.countDown();
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
asyncExecutor.submit(new Runnable() {
@Override
public void run() {
try {
callableCompleted.await();
} catch (Exception e) {
} finally {
l.notifyDone();
}
}
});
}
{code}
We can of course, leave things as they are right now but in such case we have to warn the
implementer of FutureListener that he/she can not call future.get() from a FutureListener!
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: