[
https://issues.jboss.org/browse/ISPN-1541?page=com.atlassian.jira.plugin....
]
Vladimir Blagojevic updated ISPN-1541:
--------------------------------------
Git Pull Request:
https://github.com/infinispan/infinispan/pull/660 (was:
https://github.com/infinispan/infinispan/pull/660)
Description:
Invoking Future#get from a FutureListener attached to a NotifyingNotifiableFuture passed
as a parameter to RpcManager#invokeRemotelyInFuture throws InterruptedException! Sounds
more complicated than it really is! In another words, the following code throws
InterruptedException
{code}NotifyingFutureImpl f = ...
f.attachListener(new FutureListener<Object>() {
@Override
public void futureDone(Future<Object> future) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
});
CommandsFactory cf =
cache1.getAdvancedCache().getComponentRegistry().getComponent(CommandsFactory.class);
cache1.getAdvancedCache().getRpcManager().invokeRemotelyInFuture(null,
cf.buildPutKeyValueCommand("k","v", -1, -1, null), f);
{code}
the reason why we get InterruptedException is that we invoke notifyDone which in turn
invokes futureDone from the same thread invoking the future's callable. In effect we
invoke future#get followed by future#get on the same future instance using *the same
thread*!
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients,
final ReplicableCommand rpc, final boolean usePriorityQueue, final
NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace) log.tracef("%s invoking in future call %s to recipient list
%s", t.getAddress(), rpc, recipients);
final CountDownLatch futureSet = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
} finally {
try {
futureSet.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
l.notifyDone();
}
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
futureSet.countDown();
}
{code}
The solution is to wait for callable to finish and invoke notifyDone from the other
thread. We have two choices here:
a) invoke notifyDone from the same thread invoking invokeRemotelyInFuture. This in turn
changes invokeRemotelyInFuture to be less async than it should be
b) invoke callback on another thread
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients,
final ReplicableCommand rpc, final boolean usePriorityQueue,
final NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace)
log.tracef("%s invoking in future call %s to recipient list %s",
t.getAddress(), rpc,
recipients);
final CountDownLatch callableCompleted = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
} finally {
callableCompleted.countDown();
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
asyncExecutor.submit(new Runnable() {
@Override
public void run() {
try {
callableCompleted.await();
} catch (Exception e) {
} finally {
l.notifyDone();
}
}
});
}
{code}
We can of course, leave things as they are right now but in such case we have to warn the
implementer of FutureListener that he/she can not call future.get() from a
FutureListener!
was:
Invoking Future#get from a FutureListener attached to a NotifyingNotifiableFuture passed
as a parameter to RpcManager#invokeRemotelyInFuture throws InterruptedException! Sounds
more complicated than it really is! In another words, the following code throws
InterruptedException
{code}NotifyingFutureImpl f = ...
f.attachListener(new FutureListener<Object>() {
@Override
public void futureDone(Future<Object> future) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
});
CommandsFactory cf =
cache1.getAdvancedCache().getComponentRegistry().getComponent(CommandsFactory.class);
cache1.getAdvancedCache().getRpcManager().invokeRemotelyInFuture(null,
cf.buildPutKeyValueCommand("k","v", -1, -1, null), f);
{code}
the reason why we get InterruptedException is that we invoke notifyDone which in turn
invokes futureDone from *the same thread* invoking the future's callable.
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients,
final ReplicableCommand rpc, final boolean usePriorityQueue, final
NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace) log.tracef("%s invoking in future call %s to recipient list
%s", t.getAddress(), rpc, recipients);
final CountDownLatch futureSet = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
} finally {
try {
futureSet.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
l.notifyDone();
}
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
futureSet.countDown();
}
{code}
The solution is to wait for callable to finish and invoke notifyDone from the other
thread. We have two choices here:
a) invoke notifyDone from the same thread invoking invokeRemotelyInFuture. This in turn
changes invokeRemotelyInFuture to be less async than it should be
b) invoke callback on another thread
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients,
final ReplicableCommand rpc, final boolean usePriorityQueue,
final NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace)
log.tracef("%s invoking in future call %s to recipient list %s",
t.getAddress(), rpc,
recipients);
final CountDownLatch callableCompleted = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue, timeout);
} finally {
callableCompleted.countDown();
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
asyncExecutor.submit(new Runnable() {
@Override
public void run() {
try {
callableCompleted.await();
} catch (Exception e) {
} finally {
l.notifyDone();
}
}
});
}
{code}
We can of course, leave things as they are right now but in such case we have to warn the
implementer of FutureListener that he/she can not call future.get() from a
FutureListener!
NotifyingNotifiableFuture's FutureListener can not invoke Future
API on FutureListener#futureDone callback
----------------------------------------------------------------------------------------------------------
Key: ISPN-1541
URL:
https://issues.jboss.org/browse/ISPN-1541
Project: Infinispan
Issue Type: Bug
Components: RPC
Affects Versions: 5.0.1.FINAL, 5.1.0.BETA4
Reporter: Vladimir Blagojevic
Assignee: Vladimir Blagojevic
Fix For: 5.1.0.FINAL
Invoking Future#get from a FutureListener attached to a NotifyingNotifiableFuture passed
as a parameter to RpcManager#invokeRemotelyInFuture throws InterruptedException! Sounds
more complicated than it really is! In another words, the following code throws
InterruptedException
{code}NotifyingFutureImpl f = ...
f.attachListener(new FutureListener<Object>() {
@Override
public void futureDone(Future<Object> future) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
});
CommandsFactory cf =
cache1.getAdvancedCache().getComponentRegistry().getComponent(CommandsFactory.class);
cache1.getAdvancedCache().getRpcManager().invokeRemotelyInFuture(null,
cf.buildPutKeyValueCommand("k","v", -1, -1, null), f);
{code}
the reason why we get InterruptedException is that we invoke notifyDone which in turn
invokes futureDone from the same thread invoking the future's callable. In effect we
invoke future#get followed by future#get on the same future instance using *the same
thread*!
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients,
final ReplicableCommand rpc, final boolean usePriorityQueue, final
NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace) log.tracef("%s invoking in future call %s to recipient list
%s", t.getAddress(), rpc, recipients);
final CountDownLatch futureSet = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue,
timeout);
} finally {
try {
futureSet.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
l.notifyDone();
}
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
futureSet.countDown();
}
{code}
The solution is to wait for callable to finish and invoke notifyDone from the other
thread. We have two choices here:
a) invoke notifyDone from the same thread invoking invokeRemotelyInFuture. This in turn
changes invokeRemotelyInFuture to be less async than it should be
b) invoke callback on another thread
{code}
public final void invokeRemotelyInFuture(final Collection<Address> recipients,
final ReplicableCommand rpc, final boolean usePriorityQueue,
final NotifyingNotifiableFuture<Object> l, final long timeout) {
if (trace)
log.tracef("%s invoking in future call %s to recipient list %s",
t.getAddress(), rpc,
recipients);
final CountDownLatch callableCompleted = new CountDownLatch(1);
Callable<Object> c = new Callable<Object>() {
public Object call() throws Exception {
Object result = null;
try {
result = invokeRemotely(recipients, rpc, true, usePriorityQueue,
timeout);
} finally {
callableCompleted.countDown();
}
return result;
}
};
l.setNetworkFuture(asyncExecutor.submit(c));
asyncExecutor.submit(new Runnable() {
@Override
public void run() {
try {
callableCompleted.await();
} catch (Exception e) {
} finally {
l.notifyDone();
}
}
});
}
{code}
We can of course, leave things as they are right now but in such case we have to warn the
implementer of FutureListener that he/she can not call future.get() from a FutureListener!
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.jboss.org/secure/ContactAdministrators!default.jspa
For more information on JIRA, see:
http://www.atlassian.com/software/jira