Author: david.lloyd(a)jboss.com
Date: 2010-03-05 01:40:29 -0500 (Fri, 05 Mar 2010)
New Revision: 5803
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundClient.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
Log:
Async client close support (from the server side)
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundClient.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundClient.java 2010-03-05
06:22:57 UTC (rev 5802)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundClient.java 2010-03-05
06:40:29 UTC (rev 5803)
@@ -22,15 +22,25 @@
package org.jboss.remoting3.remote;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.remoting3.CloseHandler;
import org.jboss.remoting3.spi.RequestHandler;
final class InboundClient {
private final RequestHandler handler;
- private RemoteConnectionHandler remoteConnectionHandler;
+ private final RemoteConnectionHandler remoteConnectionHandler;
+ private final int id;
- InboundClient(final RemoteConnectionHandler remoteConnectionHandler, final
RequestHandler handler) {
+ InboundClient(final RemoteConnectionHandler remoteConnectionHandler, final
RequestHandler handler, final int id) {
this.remoteConnectionHandler = remoteConnectionHandler;
this.handler = handler;
+ this.id = id;
+ handler.addCloseHandler(new CloseHandler<RequestHandler>() {
+ public void handleClose(final RequestHandler closed) {
+ close();
+ }
+ });
}
RequestHandler getHandler() {
@@ -40,4 +50,22 @@
RemoteConnectionHandler getRemoteConnectionHandler() {
return remoteConnectionHandler;
}
+
+ void close() {
+ final RemoteConnection remoteConnection =
remoteConnectionHandler.getRemoteConnection();
+ final ByteBuffer buffer = remoteConnection.allocate();
+ try {
+ buffer.position(4);
+ buffer.put(RemoteProtocol.CLIENT_ASYNC_CLOSE);
+ buffer.putInt(id);
+ buffer.flip();
+ try {
+ remoteConnection.sendBlocking(buffer, true);
+ } catch (IOException e) {
+ // irrelevant
+ }
+ } finally {
+ remoteConnection.free(buffer);
+ }
+ }
}
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java 2010-03-05
06:22:57 UTC (rev 5802)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java 2010-03-05
06:40:29 UTC (rev 5803)
@@ -112,10 +112,10 @@
}
public RequestHandlerConnector createConnector(final RequestHandler localHandler) {
- final InboundClient inboundClient = new InboundClient(this, localHandler);
int id;
synchronized (inboundClients) {
while (inboundClients.containsKey(id = random.nextInt() & ~1));
+ final InboundClient inboundClient = new InboundClient(this, localHandler,
id);
inboundClients.put(id, inboundClient);
}
return new UnsentRequestHandlerConnector(id, this);
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java 2010-03-05
06:22:57 UTC (rev 5802)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteMessageHandler.java 2010-03-05
06:40:29 UTC (rev 5803)
@@ -71,7 +71,7 @@
outBuf.put(RemoteProtocol.SERVICE_NOT_FOUND);
} else {
// service opened locally, now register the success
- final InboundClient inboundClient = new
InboundClient(connectionHandler, handler);
+ final InboundClient inboundClient = new
InboundClient(connectionHandler, handler, id);
final IntKeyMap<InboundClient> inboundClients =
connectionHandler.getInboundClients();
synchronized (inboundClients) {
inboundClients.put(id, inboundClient);
@@ -145,6 +145,23 @@
}
return;
}
+ case RemoteProtocol.CLIENT_ASYNC_CLOSE: {
+ final int id = buffer.getInt();
+
+ final OutboundClient client;
+ final IntKeyMap<OutboundClient> outboundClients =
connectionHandler.getOutboundClients();
+ synchronized (outboundClients) {
+ client = outboundClients.remove(id);
+ }
+ if (client == null) {
+ log.trace("Received client-closed for unknown client %d",
Integer.valueOf(id));
+ return;
+ }
+ synchronized (client) {
+ IoUtils.safeClose(client.getRequestHandler());
+ }
+ return;
+ }
case RemoteProtocol.REQUEST: {
final int rid = buffer.getInt();
final byte flags = buffer.get();
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-03-05
06:22:57 UTC (rev 5802)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java 2010-03-05
06:40:29 UTC (rev 5803)
@@ -58,7 +58,7 @@
static final byte SERVICE_NOT_FOUND = 17;
static final byte SERVICE_CLIENT_OPENED = 18;
static final byte CLIENT_CLOSE = 19;
- // todo CLIENT_ASYNC_CLOSE!
+ static final byte CLIENT_ASYNC_CLOSE = 20; // close from the server side
static final byte STREAM_DATA = 32; // from source -> sink side
static final byte STREAM_EXCEPTION = 33; // from source -> sink side