[infinispan-issues] [JBoss JIRA] (ISPN-1541) NotifyingNotifiableFuture's FutureListener can not invoke Future API on FutureListener#futureDone callback
Vladimir Blagojevic (Updated) (JIRA)
jira-events at lists.jboss.org
Thu Nov 24 12:45:41 EST 2011
[ https://issues.jboss.org/browse/ISPN-1541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vladimir Blagojevic updated ISPN-1541:
--------------------------------------
Git Pull Request: https://github.com/infinispan/infinispan/pull/660 (was: https://github.com/infinispan/infinispan/pull/660)
Description:
Invoking Future#get from a FutureListener attached to a NotifyingNotifiableFuture passed as a parameter to RpcManager#invokeRemotelyInFuture throws InterruptedException! Sounds more complicated than it really is! In another words, the following code throws InterruptedException
{code}NotifyingFutureImpl f = ...
f.attachListener(new FutureListener<Object>() {
@Override
public void futureDone(Future<Object> future) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
});
CommandsFactory cf = cache1.getAdvancedCache().getComponentRegistry().getComponent(CommandsFactory.class);
cache1.getAdvancedCache().getRpcManager().invokeRemotelyInFuture(null, cf.buildPutKeyValueCommand("k","v", -1, -1, null), f);
{code}
the reason why we get InterruptedException is that we invoke notifyDone which in turn invokes futureDone from the same thread invoking the future's callable. In effect we invoke future#get followed by future#get on the same future instance using *the same thread*!
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace) log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc, recipients);
final CountDownLatch futureSet = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
} finally {
try {
futureSet.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
l.notifyDone();
}
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
futureSet.countDown();
}
{code}
The solution is to wait for callable to finish and invoke notifyDone from the other thread. We have two choices here:
a) invoke notifyDone from the same thread invoking invokeRemotelyInFuture. This in turn changes invokeRemotelyInFuture to be less async than it should be
b) invoke callback on another thread
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients,
final ReplicableCommand rpc, final boolean usePriorityQueue,
final NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace)
log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc,
recipients);
final CountDownLatch callableCompleted = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
} finally {
callableCompleted.countDown();
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
asyncExecutor.submit(new Runnable() {
@Override
public void run() {
try {
callableCompleted.await();
} catch (Exception e) {
} finally {
l.notifyDone();
}
}
});
}
{code}
We can of course, leave things as they are right now but in such case we have to warn the implementer of FutureListener that he/she can not call future.get() from a FutureListener!
was:
Invoking Future#get from a FutureListener attached to a NotifyingNotifiableFuture passed as a parameter to RpcManager#invokeRemotelyInFuture throws InterruptedException! Sounds more complicated than it really is! In another words, the following code throws InterruptedException
{code}NotifyingFutureImpl f = ...
f.attachListener(new FutureListener<Object>() {
@Override
public void futureDone(Future<Object> future) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
});
CommandsFactory cf = cache1.getAdvancedCache().getComponentRegistry().getComponent(CommandsFactory.class);
cache1.getAdvancedCache().getRpcManager().invokeRemotelyInFuture(null, cf.buildPutKeyValueCommand("k","v", -1, -1, null), f);
{code}
the reason why we get InterruptedException is that we invoke notifyDone which in turn invokes futureDone from *the same thread* invoking the future's callable.
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace) log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc, recipients);
final CountDownLatch futureSet = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
} finally {
try {
futureSet.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
l.notifyDone();
}
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
futureSet.countDown();
}
{code}
The solution is to wait for callable to finish and invoke notifyDone from the other thread. We have two choices here:
a) invoke notifyDone from the same thread invoking invokeRemotelyInFuture. This in turn changes invokeRemotelyInFuture to be less async than it should be
b) invoke callback on another thread
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients,
final ReplicableCommand rpc, final boolean usePriorityQueue,
final NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace)
log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc,
recipients);
final CountDownLatch callableCompleted = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
} finally {
callableCompleted.countDown();
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
asyncExecutor.submit(new Runnable() {
@Override
public void run() {
try {
callableCompleted.await();
} catch (Exception e) {
} finally {
l.notifyDone();
}
}
});
}
{code}
We can of course, leave things as they are right now but in such case we have to warn the implementer of FutureListener that he/she can not call future.get() from a FutureListener!
> NotifyingNotifiableFuture's FutureListener can not invoke Future API on FutureListener#futureDone callback
> ----------------------------------------------------------------------------------------------------------
>
> Key: ISPN-1541
> URL: https://issues.jboss.org/browse/ISPN-1541
> Project: Infinispan
> Issue Type: Bug
> Components: RPC
> Affects Versions: 5.0.1.FINAL, 5.1.0.BETA4
> Reporter: Vladimir Blagojevic
> Assignee: Vladimir Blagojevic
> Fix For: 5.1.0.FINAL
>
>
> Invoking Future#get from a FutureListener attached to a NotifyingNotifiableFuture passed as a parameter to RpcManager#invokeRemotelyInFuture throws InterruptedException! Sounds more complicated than it really is! In another words, the following code throws InterruptedException
> {code}NotifyingFutureImpl f = ...
> f.attachListener(new FutureListener<Object>() {
>
> @Override
> public void futureDone(Future<Object> future) {
> try {
> future.get();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> });
> CommandsFactory cf = cache1.getAdvancedCache().getComponentRegistry().getComponent(CommandsFactory.class);
> cache1.getAdvancedCache().getRpcManager().invokeRemotelyInFuture(null, cf.buildPutKeyValueCommand("k","v", -1, -1, null), f);
> {code}
> the reason why we get InterruptedException is that we invoke notifyDone which in turn invokes futureDone from the same thread invoking the future's callable. In effect we invoke future#get followed by future#get on the same future instance using *the same thread*!
> {code}
> public final void invokeRemotelyInFuture(final Collection<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> l, final long timeout) {
> if (trace) log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc, recipients);
> final CountDownLatch futureSet = new CountDownLatch(1);
> Callable<Object> c = new Callable<Object>() {
> public Object call() throws Exception {
> Object result = null;
> try {
> result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
> } finally {
> try {
> futureSet.await();
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> } finally {
> l.notifyDone();
> }
> }
> return result;
> }
> };
> l.setNetworkFuture(asyncExecutor.submit(c));
> futureSet.countDown();
> }
> {code}
> The solution is to wait for callable to finish and invoke notifyDone from the other thread. We have two choices here:
> a) invoke notifyDone from the same thread invoking invokeRemotelyInFuture. This in turn changes invokeRemotelyInFuture to be less async than it should be
> b) invoke callback on another thread
> {code}
> public final void invokeRemotelyInFuture(final Collection<Address> recipients,
> final ReplicableCommand rpc, final boolean usePriorityQueue,
> final NotifyingNotifiableFuture<Object> l, final long timeout) {
> if (trace)
> log.tracef("%s invoking in future call %s to recipient list %s", t.getAddress(), rpc,
> recipients);
> final CountDownLatch callableCompleted = new CountDownLatch(1);
> Callable<Object> c = new Callable<Object>() {
> public Object call() throws Exception {
> Object result = null;
> try {
> result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
> } finally {
> callableCompleted.countDown();
> }
> return result;
> }
> };
> l.setNetworkFuture(asyncExecutor.submit(c));
> asyncExecutor.submit(new Runnable() {
>
> @Override
> public void run() {
> try {
> callableCompleted.await();
> } catch (Exception e) {
> } finally {
> l.notifyDone();
> }
> }
> });
> }
> {code}
> We can of course, leave things as they are right now but in such case we have to warn the implementer of FutureListener that he/she can not call future.get() from a FutureListener!
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.jboss.org/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira
More information about the infinispan-issues
mailing list