Author: david.lloyd(a)jboss.com
Date: 2008-07-17 12:21:03 -0400 (Thu, 17 Jul 2008)
New Revision: 4379
Added:
remoting3/trunk/protocol/
remoting3/trunk/protocol/basic/
remoting3/trunk/protocol/basic/src/
remoting3/trunk/protocol/basic/src/main/
remoting3/trunk/protocol/basic/src/main/java/
remoting3/trunk/protocol/basic/src/main/java/org/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java
remoting3/trunk/protocol/basic/src/test/
remoting3/trunk/protocol/basic/src/test/java/
remoting3/trunk/protocol/basic/src/test/java/org/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
Log:
Basic protocol support. Needs more work though
Property changes on: remoting3/trunk/protocol/basic
___________________________________________________________________
Name: svn:ignore
+ target
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-17
16:21:03 UTC (rev 4379)
@@ -0,0 +1,618 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import static org.jboss.xnio.Buffers.*;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.ReplyHandler;
+import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.IdentityResolver;
+import org.jboss.cx.remoting.spi.SpiUtils;
+import org.jboss.cx.remoting.spi.AbstractAutoCloseable;
+import static org.jboss.cx.remoting.util.CollectionUtil.concurrentMap;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_ONEWAY;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REPLY;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.CLIENT_CLOSE;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.CLIENT_OPEN;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_CLOSE;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
+import org.jboss.cx.remoting.RemotingException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
+
+ private static final Logger log = Logger.getLogger(BasicHandler.class);
+
+ // clients whose requests get forwarded to the remote side
+ private final ConcurrentMap<Integer, RemoteClientEndpoint<?, ?>>
remoteClients = concurrentMap();
+ // running on remote node
+ private final ConcurrentMap<Integer, ReplyHandler<?>> outstandingRequests
= concurrentMap();
+ // forwarded to remote side
+ private final ConcurrentMap<Integer, Handle<RemoteClientEndpoint<?,
?>>> forwardedClients = concurrentMap();
+ // forwarded to remote side
+ private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint<?,
?>>> forwardedServices = concurrentMap();
+
+ private final boolean server;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ private final AtomicBoolean isnew = new AtomicBoolean(true);
+ private volatile AllocatedMessageChannel channel;
+ private final Executor executor;
+ private final MarshallerFactory<ByteBuffer> marshallerFactory;
+ private final ObjectResolver resolver;
+ private final ClassLoader classLoader;
+
+ @SuppressWarnings({ "unchecked" })
+ public <I, O> BasicHandler(final boolean server, final
BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpoint<I, O> root,
final Executor executor, final RemoteClientEndpointListener remoteListener, final
MarshallerFactory<ByteBuffer> marshallerFactory) throws RemotingException {
+ this.server = server;
+ this.allocator = allocator;
+ this.executor = executor;
+ forwardedClients.put(Integer.valueOf(0),
((RemoteClientEndpoint)root).getHandle());
+ final RemoteClientEndpointImpl<Object, Object> endpoint = new
RemoteClientEndpointImpl<Object, Object>(0, marshallerFactory, allocator);
+ remoteClients.put(Integer.valueOf(0), endpoint);
+ if (remoteListener != null) {
+ remoteListener.notifyCreated(endpoint);
+ }
+ this.marshallerFactory = marshallerFactory;
+ // todo
+ resolver = IdentityResolver.getInstance();
+ classLoader = getClass().getClassLoader();
+ }
+
+ /**
+ * Sequence number of requests originating locally.
+ */
+ private final AtomicInteger localRequestIdSeq = new AtomicInteger();
+ /**
+ * Sequence number of local clients forwarded to the remote side.
+ */
+ private final AtomicInteger localClientIdSeq = new AtomicInteger(1);
+ /**
+ * Sequence number of remote clients opened locally from services from the remote
side.
+ */
+ private final AtomicInteger remoteClientIdSeq = new AtomicInteger(1);
+ /**
+ * Sequence number of services forwarded to the remote side.
+ */
+ private final AtomicInteger localServiceIdSeq = new AtomicInteger();
+
+ public void handleOpened(final AllocatedMessageChannel channel) {
+ if (isnew.getAndSet(false)) {
+ this.channel = channel;
+ }
+ channel.resumeReads();
+ }
+
+ public void handleReadable(final AllocatedMessageChannel channel) {
+ for (;;) try {
+ final ByteBuffer buffer = channel.receive();
+ if (buffer == null) {
+ // todo release all handles...
+ IoUtils.safeClose(channel);
+ return;
+ }
+ if (! buffer.hasRemaining()) {
+ // would block
+ channel.resumeReads();
+ return;
+ }
+ int msgType = buffer.get() & 0xff;
+ log.trace("Received message %s, type %d", buffer,
Integer.valueOf(msgType));
+ switch (msgType) {
+ case REQUEST_ONEWAY: {
+ final int clientId = buffer.getInt();
+ final Handle<RemoteClientEndpoint<?, ?>> handle =
getForwardedClient(clientId);
+ if (handle == null) {
+ log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
+ return;
+ }
+ final Unmarshaller<ByteBuffer> unmarshaller =
marshallerFactory.createUnmarshaller(resolver, classLoader);
+ if (! unmarshaller.unmarshal(buffer)) {
+ log.trace("Incomplete one-way request for client ID
%d", Integer.valueOf(clientId));
+ break;
+ }
+ final Object payload;
+ try {
+ payload = unmarshaller.get();
+ } catch (ClassNotFoundException e) {
+ log.trace("Class not found in one-way request for client ID
%d", Integer.valueOf(clientId));
+ break;
+ }
+ final RemoteClientEndpoint<?, ?> clientEndpoint =
handle.getResource();
+ receiveRequest(clientEndpoint, payload);
+ break;
+ }
+ case REQUEST: {
+ final int clientId = buffer.getInt();
+ final Handle<RemoteClientEndpoint<?, ?>> handle =
getForwardedClient(clientId);
+ if (handle == null) {
+ log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
+ break;
+ }
+ final int requestId = buffer.getInt();
+ final Unmarshaller<ByteBuffer> unmarshaller =
marshallerFactory.createUnmarshaller(resolver, classLoader);
+ if (! unmarshaller.unmarshal(buffer)) {
+ log.trace("Incomplete request ID %d for client ID %d",
Integer.valueOf(requestId), Integer.valueOf(clientId));
+ new ReplyHandlerImpl(channel, requestId,
allocator).handleException("Incomplete request", null);
+ break;
+ }
+ final Object payload;
+ try {
+ payload = unmarshaller.get();
+ } catch (ClassNotFoundException e) {
+ log.trace("Class not found in request ID %d for client ID
%d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+ break;
+ }
+ final RemoteClientEndpoint<?, ?> clientEndpoint =
handle.getResource();
+ receiveRequest(clientEndpoint, new ReplyHandlerImpl(channel,
requestId, allocator), payload);
+ break;
+ }
+ case REPLY: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler<?> replyHandler =
takeOutstandingReqeust(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
+ break;
+ }
+ final Unmarshaller<ByteBuffer> unmarshaller =
marshallerFactory.createUnmarshaller(resolver, classLoader);
+ if (! unmarshaller.unmarshal(buffer)) {
+ replyHandler.handleException("Incomplete reply",
null);
+ log.trace("Incomplete reply to request ID %d",
Integer.valueOf(requestId));
+ break;
+ }
+ final Object payload;
+ try {
+ payload = unmarshaller.get();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException("Reply unmarshalling
failed", e);
+ log.trace("Class not found in reply to request ID %d",
Integer.valueOf(requestId));
+ break;
+ }
+ handleReply(replyHandler, payload);
+ break;
+ }
+ case REQUEST_FAILED: {
+ final int requestId = buffer.getInt();
+ final ReplyHandler<?> replyHandler =
takeOutstandingReqeust(requestId);
+ if (replyHandler == null) {
+ log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
+ break;
+ }
+ final Unmarshaller<ByteBuffer> unmarshaller =
marshallerFactory.createUnmarshaller(resolver, classLoader);
+ if (! unmarshaller.unmarshal(buffer)) {
+ replyHandler.handleException("Incomplete exception
reply", null);
+ log.trace("Incomplete exception reply to request ID
%d", Integer.valueOf(requestId));
+ break;
+ }
+ final Object message;
+ try {
+ message = unmarshaller.get();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException("Exception reply unmarshalling
failed", e);
+ log.trace("Class not found in exception reply to request ID
%d", Integer.valueOf(requestId));
+ break;
+ }
+ final Object cause;
+ try {
+ cause = unmarshaller.get();
+ } catch (ClassNotFoundException e) {
+ replyHandler.handleException("Exception reply unmarshalling
failed", e);
+ log.trace("Class not found in exception reply to request ID
%d", Integer.valueOf(requestId));
+ break;
+ }
+ handleException(replyHandler, message, cause);
+ break;
+ }
+ case CLIENT_CLOSE: {
+ break;
+ }
+ case CLIENT_OPEN: {
+ final int serviceId = buffer.getInt();
+ final int clientId = buffer.getInt();
+ final Handle<RemoteServiceEndpoint<?, ?>> handle =
getForwardedService(serviceId);
+ if (handle == null) {
+ // todo log invalid request
+ break;
+ }
+ final RemoteServiceEndpoint<?, ?> serviceEndpoint =
handle.getResource();
+ final RemoteClientEndpoint<?, ?> clientEndpoint =
serviceEndpoint.createClientEndpoint();
+
+ break;
+ }
+ case SERVICE_CLOSE: {
+ break;
+ }
+ default: {
+ log.trace("Received invalid message type %d",
Integer.valueOf(msgType));
+ }
+ }
+ } catch (IOException e) {
+ log.error(e, "I/O error in protocol channel");
+ IoUtils.safeClose(channel);
+ return;
+ } catch (BufferUnderflowException e) {
+ log.error(e, "Malformed packet");
+ } catch (Throwable t) {
+ log.error(t, "Handler failed");
+ }
+ }
+
+ public void handleWritable(final AllocatedMessageChannel channel) {
+ for (;;) {
+ final WriteHandler handler = outputQueue.peek();
+ if (handler == null) {
+ return;
+ }
+ try {
+ if (handler.handleWrite(channel)) {
+ log.trace("Handled write with handler %s", handler);
+ pending.decrementAndGet();
+ outputQueue.remove();
+ } else {
+ channel.resumeWrites();
+ return;
+ }
+ } catch (Throwable t) {
+ pending.decrementAndGet();
+ outputQueue.remove();
+ }
+ }
+ }
+
+ public void handleClosed(final AllocatedMessageChannel channel) {
+ }
+
+ private <I, O> ReplyHandler<O> createReplyHandler(final
AllocatedMessageChannel channel, final int requestId) {
+ return new ReplyHandlerImpl<O>(channel, requestId, allocator);
+ }
+
+ RemoteClientEndpoint<?, ?> getRemoteClient(final int i) {
+ return remoteClients.get(Integer.valueOf(i));
+ }
+
+ private final class ReplyHandlerImpl<O> implements ReplyHandler<O> {
+
+ private final AllocatedMessageChannel channel;
+ private final int requestId;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int
requestId, final BufferAllocator<ByteBuffer> allocator) {
+ if (channel == null) {
+ throw new NullPointerException("channel is null");
+ }
+ if (allocator == null) {
+ throw new NullPointerException("allocator is null");
+ }
+ this.channel = channel;
+ this.requestId = requestId;
+ this.allocator = allocator;
+ }
+
+ public void handleReply(final O reply) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) REPLY);
+ buffer.putInt(requestId);
+ try {
+ final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(resolver);
+ marshaller.start(reply);
+ final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
+ while (! marshaller.marshal(buffer)) {
+ bufferList.add(flip(buffer));
+ buffer = allocator.allocate();
+ }
+ bufferList.add(flip(buffer));
+ registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+ } catch (IOException e) {
+ // todo log
+ } catch (InterruptedException e) {
+ // todo log
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void handleException(final String msg, final Throwable cause) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) REQUEST_FAILED);
+ buffer.putInt(requestId);
+ try {
+ final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(resolver);
+ final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
+ marshaller.start(msg);
+ while (! marshaller.marshal(buffer)) {
+ bufferList.add(flip(buffer));
+ buffer = allocator.allocate();
+ }
+ marshaller.start(cause);
+ while (! marshaller.marshal(buffer)) {
+ bufferList.add(flip(buffer));
+ buffer = allocator.allocate();
+ }
+ bufferList.add(flip(buffer));
+ registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+ } catch (IOException e) {
+ // todo log
+ } catch (InterruptedException e) {
+ // todo log
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void handleCancellation() {
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) CANCEL_ACK);
+ buffer.putInt(requestId);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ // todo log
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ // Session mgmt
+
+ public int openRequest(ReplyHandler<?> handler) {
+ int id;
+ do {
+ id = localRequestIdSeq.getAndIncrement();
+ } while (outstandingRequests.putIfAbsent(Integer.valueOf(id), handler) != null);
+ return id;
+ }
+
+ public int openClientFromService() {
+ int id;
+ do {
+ id = remoteClientIdSeq.getAndIncrement() << 1 | (server ? 1 : 0);
+ } while (remoteClients.putIfAbsent(Integer.valueOf(id), new
RemoteClientEndpointImpl<Object, Object>(id, null, allocator)) != null);
+ return id;
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ public void openClientForForwardedService(int id, RemoteClientEndpoint<?, ?>
clientEndpoint) {
+ try {
+ forwardedClients.put(Integer.valueOf(id),
((RemoteClientEndpoint)clientEndpoint).getHandle());
+ } catch (RemotingException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public Handle<RemoteClientEndpoint<?, ?>> getForwardedClient(int id) {
+ return forwardedClients.get(Integer.valueOf(id));
+ }
+
+ public ReplyHandler<?> takeOutstandingReqeust(int id) {
+ return outstandingRequests.remove(Integer.valueOf(id));
+ }
+
+ public Handle<RemoteServiceEndpoint<?, ?>> getForwardedService(final int
id) {
+ return forwardedServices.get(Integer.valueOf(id));
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static <I, O> void receiveRequest(RemoteClientEndpoint<I, O>
clientEndpoint, Object request) {
+ clientEndpoint.receiveRequest((I) request);
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static <I, O> RemoteRequestContext
receiveRequest(RemoteClientEndpoint<I, O> clientEndpoint, ReplyHandler<O>
replyHandler, Object request) {
+ return clientEndpoint.receiveRequest((I) request, replyHandler);
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static <O> void handleReply(final ReplyHandler<O> replyHandler,
final Object reply) {
+ SpiUtils.safeHandleReply(replyHandler, (O) reply);
+ }
+
+ private static void handleException(final ReplyHandler<?> handler, final Object
message, final Object cause) {
+ SpiUtils.safeHandleException(handler, message == null ? null :
message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
+ }
+
+ // Writer members
+
+ private final BlockingQueue<WriteHandler> outputQueue =
CollectionUtil.blockingQueue(64);
+ private final AtomicInteger pending = new AtomicInteger();
+
+ private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler
writeHandler) throws InterruptedException {
+ outputQueue.put(writeHandler);
+ if (pending.getAndIncrement() == 0) {
+ channel.resumeWrites();
+ }
+ }
+
+ // client endpoint
+
+ private final class RemoteClientEndpointImpl<I, O> extends
AbstractAutoCloseable<RemoteClientEndpoint<I, O>> implements
RemoteClientEndpoint<I, O> {
+
+ private final int identifier;
+ private final MarshallerFactory<ByteBuffer> marshallerFactory;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ public RemoteClientEndpointImpl(final int identifier, final
MarshallerFactory<ByteBuffer> marshallerFactory, final
BufferAllocator<ByteBuffer> allocator) {
+ super(executor);
+ if (marshallerFactory == null) {
+ throw new NullPointerException("marshallerFactory is null");
+ }
+ if (allocator == null) {
+ throw new NullPointerException("allocator is null");
+ }
+ this.identifier = identifier;
+ this.marshallerFactory = marshallerFactory;
+ this.allocator = allocator;
+ }
+
+ public void receiveRequest(final I request) {
+ log.trace("Received one-way request of type %s", request == null ?
"null" : request.getClass());
+ try {
+ final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(null);
+ final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.REQUEST_ONEWAY);
+ buffer.putInt(identifier);
+ marshaller.start(request);
+ while (! marshaller.marshal(buffer)) {
+ bufferList.add(flip(buffer));
+ buffer = allocator.allocate();
+ }
+ bufferList.add(flip(buffer));
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
+ } catch (InterruptedException e) {
+ log.trace(e, "receiveRequest was interrupted");
+ Thread.currentThread().interrupt();
+ return;
+ }
+ } catch (Throwable t) {
+ // ignore
+ log.trace(t, "receiveRequest failed with an exception");
+ return;
+ }
+ }
+
+ public RemoteRequestContext receiveRequest(final I request, final
ReplyHandler<O> handler) {
+ log.trace("Received request of type %s", request == null ?
"null" : request.getClass());
+ try {
+ final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(null);
+ final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.REQUEST);
+ buffer.putInt(identifier);
+ final int id = openRequest(handler);
+ buffer.putInt(id);
+ marshaller.start(request);
+ while (! marshaller.marshal(buffer)) {
+ bufferList.add(flip(buffer));
+ buffer = allocator.allocate();
+ }
+ bufferList.add(flip(buffer));
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator,
bufferList));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ SpiUtils.safeHandleCancellation(handler);
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ log.trace("Sent request %s", request);
+ return new RemoteRequestContextImpl(id, allocator, channel);
+ } catch (Throwable t) {
+ log.trace(t, "receiveRequest failed with an exception");
+ SpiUtils.safeHandleException(handler, "Failed to build
request", t);
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ }
+ }
+
+ public final class RemoteRequestContextImpl implements RemoteRequestContext {
+
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final int id;
+ private final AllocatedMessageChannel channel;
+
+ public RemoteRequestContextImpl(final int id, final
BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
+ this.id = id;
+ this.allocator = allocator;
+ this.channel = channel;
+ }
+
+ public void cancel(final boolean mayInterrupt) {
+ try {
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CANCEL_REQUEST);
+ buffer.putInt(id);
+ buffer.put((byte) (mayInterrupt ? 1 : 0));
+ buffer.flip();
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ } catch (InterruptedException e) {
+ // todo log that cancel attempt failed
+ Thread.currentThread().interrupt();
+ } catch (Throwable t) {
+ // todo log that cancel attempt failed
+ }
+ }
+ }
+
+ public final class RemoteServiceEndpointImpl<I, O> extends
AbstractAutoCloseable<RemoteServiceEndpoint<I, O>> implements
RemoteServiceEndpoint<I, O> {
+
+ private final MarshallerFactory<ByteBuffer> marshallerFactory;
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final int identifier;
+
+ protected RemoteServiceEndpointImpl(final MarshallerFactory<ByteBuffer>
marshallerFactory, final BufferAllocator<ByteBuffer> allocator, final int
identifier) {
+ super(executor);
+ this.marshallerFactory = marshallerFactory;
+ this.allocator = allocator;
+ this.identifier = identifier;
+ }
+
+ public RemoteClientEndpoint<I, O> createClientEndpoint() throws
RemotingException {
+ final int id = openClientFromService();
+ final ByteBuffer buffer = allocator.allocate();
+ buffer.putInt(identifier);
+ buffer.putInt(openClientFromService());
+ buffer.flip();
+ boolean intr = false;
+ for (;;) {
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+ try {
+ return new RemoteClientEndpointImpl<I,O>(id,
marshallerFactory, allocator);
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ }
+ }
+ }
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-17
16:21:03 UTC (rev 4379)
@@ -0,0 +1,106 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.core.marshal.JBossSerializationMarshallerFactory;
+import org.jboss.cx.remoting.core.marshal.JavaSerializationMarshallerFactory;
+import org.jboss.xnio.IoHandlerFactory;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.AbstractConvertingIoFuture;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class BasicProtocol {
+
+ private static final Logger log = Logger.getLogger(BasicProtocol.class);
+
+ private BasicProtocol() {
+ }
+
+ /**
+ * Create a request server for the basic protocol.
+ *
+ * @param executor the executor to use for invocations
+ * @param localRootSource the service to draw client endpoints from for root clients
on inbound connections
+ * @param allocator the buffer allocator to use
+ * @param remoteListener a listener which receives notification of the remote root
client of the incoming connection
+ * @return a handler factory for passing to an XNIO server
+ */
+ public static IoHandlerFactory<AllocatedMessageChannel> createServer(final
Executor executor, final RemoteServiceEndpoint<?, ?> localRootSource, final
BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpointListener
remoteListener) {
+ return new IoHandlerFactory<AllocatedMessageChannel>() {
+ public IoHandler<? super AllocatedMessageChannel> createHandler() {
+ try {
+ final RemoteClientEndpoint<?, ?> remoteClientEndpoint =
localRootSource.createClientEndpoint();
+ try {
+ return new BasicHandler(true, allocator, remoteClientEndpoint,
executor, remoteListener, new JavaSerializationMarshallerFactory(executor));
+ } finally {
+ try {
+ remoteClientEndpoint.autoClose();
+ } catch (RemotingException e) {
+ log.error(e, "Error setting auto-close mode");
+ }
+ }
+ } catch (RemotingException e) {
+ throw new IllegalStateException("The local root endpoint is
unusable", e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Create a request client for the basic protocol.
+ *
+ * @param <I> the request type of the new remote root service endpoint
+ * @param <O> the reply type of the new remote root service endpoint
+ * @param executor the executor to use for invocations
+ * @param localRoot the client endpoint to use as the local root client
+ * @param channelSource the XNIO channel source to use to establish the connection
+ * @param allocator the buffer allocator to use
+ * @return the future client endpoint of the remote side's root client
+ * @throws IOException if an error occurs
+ */
+ public static <I, O> IoFuture<RemoteClientEndpoint<I, O>>
connect(final Executor executor, final RemoteClientEndpoint<?, ?> localRoot, final
ChannelSource<AllocatedMessageChannel> channelSource, final
BufferAllocator<ByteBuffer> allocator) throws IOException {
+ final BasicHandler basicHandler = new BasicHandler(false, allocator, localRoot,
executor, null, new JavaSerializationMarshallerFactory(executor));
+ final IoFuture<AllocatedMessageChannel> futureChannel =
channelSource.open(basicHandler);
+ return new AbstractConvertingIoFuture<RemoteClientEndpoint<I, O>,
AllocatedMessageChannel>(futureChannel) {
+ @SuppressWarnings({ "unchecked" })
+ protected RemoteClientEndpoint<I, O> convert(final
AllocatedMessageChannel channel) throws RemotingException {
+ final RemoteClientEndpoint<?, ?> remoteClientEndpoint =
basicHandler.getRemoteClient(0);
+ return (RemoteClientEndpoint) remoteClientEndpoint;
+ }
+ };
+ }
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java 2008-07-17
16:21:03 UTC (rev 4379)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+/**
+ *
+ */
+public final class MessageType {
+ //
+ public static final int REQUEST_ONEWAY = 0;
+ public static final int REQUEST = 1;
+ public static final int REPLY = 2;
+ public static final int CANCEL_REQUEST = 3;
+ public static final int CANCEL_ACK = 4;
+ public static final int REQUEST_FAILED = 5;
+ // Remote side called .close() on a forwarded RemoteClientEndpoint
+ public static final int CLIENT_CLOSE = 6;
+ // Remote side called .close() on a forwarded RemoteClientEndpoint
+ public static final int CLIENT_OPEN = 7;
+ public static final int SERVICE_CLOSE = 8;
+
+ private MessageType() {
+ }
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java 2008-07-17
16:21:03 UTC (rev 4379)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public final class SimpleWriteHandler implements WriteHandler {
+ private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
+
+ private final BufferAllocator<ByteBuffer> allocator;
+ private final ByteBuffer[] buffers;
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
List<ByteBuffer> buffers) {
+ this.allocator = allocator;
+ this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
+ logBufferSize();
+ }
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
ByteBuffer[] buffers) {
+ this.allocator = allocator;
+ this.buffers = buffers;
+ logBufferSize();
+ }
+
+ public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final
ByteBuffer buffer) {
+ this.allocator = allocator;
+ buffers = new ByteBuffer[] { buffer };
+ logBufferSize();
+ }
+
+ private void logBufferSize() {
+ if (log.isTrace()) {
+ long t = 0L;
+ for (ByteBuffer buf : buffers) {
+ t += (long)buf.remaining();
+ }
+ log.trace("Writing a message of size %d", Long.valueOf(t));
+ }
+ }
+
+ public boolean handleWrite(final WritableMessageChannel channel) {
+ boolean done = true;
+ try {
+ return (done = channel.send(buffers));
+ } catch (IOException e) {
+ log.trace(e, "Write failed");
+ return true;
+ } finally {
+ if (done) {
+ for (ByteBuffer buffer : buffers) {
+ allocator.free(buffer);
+ }
+ }
+ }
+ }
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java 2008-07-17
16:21:03 UTC (rev 4379)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public interface WriteHandler {
+ boolean handleWrite(WritableMessageChannel channel);
+}
Added:
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
---
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-17
16:21:03 UTC (rev 4379)
@@ -0,0 +1,177 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import junit.framework.TestCase;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.ConfigurableFactory;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.TcpClient;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.cx.remoting.core.EndpointImpl;
+import org.jboss.cx.remoting.RequestContext;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.ClientContext;
+import org.jboss.cx.remoting.ServiceContext;
+import org.jboss.cx.remoting.Client;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.test.support.LoggingHelper;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.nio.ByteBuffer;
+import java.net.InetSocketAddress;
+import java.io.Closeable;
+
+/**
+ *
+ */
+public final class ConnectionTestCase extends TestCase {
+ static {
+ LoggingHelper.init();
+ }
+
+ public void testConnection() throws Throwable {
+ final AtomicBoolean clientOpened = new AtomicBoolean(false);
+ final AtomicBoolean serviceOpened = new AtomicBoolean(false);
+ final AtomicBoolean clientClosed = new AtomicBoolean(false);
+ final AtomicBoolean serviceClosed = new AtomicBoolean(false);
+ final CountDownLatch clientCloseLatch = new CountDownLatch(1);
+ final ExecutorService executorService = Executors.newCachedThreadPool();
+ try {
+ final BufferAllocator<ByteBuffer> allocator = new
BufferAllocator<ByteBuffer>() {
+ public ByteBuffer allocate() {
+ return ByteBuffer.allocate(1024);
+ }
+
+ public void free(final ByteBuffer buffer) {
+ }
+ };
+ final Xnio xnio = Xnio.createNio();
+ try {
+ final EndpointImpl endpoint = new EndpointImpl();
+ endpoint.setExecutor(executorService);
+ endpoint.start();
+ try {
+ final RemoteServiceEndpoint<Object,Object>
serverServiceEndpoint = endpoint.createServiceEndpoint(new RequestListener<Object,
Object>() {
+ public void handleClientOpen(final ClientContext context) {
+ clientOpened.set(true);
+ }
+
+ public void handleServiceOpen(final ServiceContext context) {
+ serviceOpened.set(true);
+ }
+
+ public void handleRequest(final RequestContext<Object>
context, final Object request) throws RemoteExecutionException {
+ try {
+ System.out.println("Received request; sending
response!");
+ context.sendReply("response");
+ } catch (RemotingException e) {
+ try {
+ context.sendFailure("failed", e);
+ } catch (RemotingException e1) {
+ System.out.println("Double fault!");
+ }
+ }
+ }
+
+ public void handleServiceClose(final ServiceContext context) {
+ serviceClosed.set(true);
+ }
+
+ public void handleClientClose(final ClientContext context) {
+ clientClosed.set(true);
+ clientCloseLatch.countDown();
+ }
+ });
+ try {
+ final Handle<RemoteServiceEndpoint<Object,Object>>
handle = serverServiceEndpoint.getHandle();
+ serverServiceEndpoint.autoClose();
+ try {
+ final RemoteClientEndpointListener remoteListener = new
RemoteClientEndpointListener() {
+
+ public <I, O> void notifyCreated(final
RemoteClientEndpoint<I, O> endpoint) {
+
+ }
+ };
+ final ConfigurableFactory<Closeable> tcpServer =
xnio.createTcpServer(executorService,
Channels.convertStreamToAllocatedMessage(BasicProtocol.createServer(executorService,
serverServiceEndpoint, allocator, remoteListener), 32768, 32768), new
InetSocketAddress(12345));
+ final Closeable tcpServerCloseable = tcpServer.create();
+ try {
+ // now create a client to connect to it
+ final RemoteClientEndpoint<?,?> localRoot =
serverServiceEndpoint.createClientEndpoint();
+ final InetSocketAddress destAddr = new
InetSocketAddress("localhost", 12345);
+ final TcpClient tcpClient =
xnio.createTcpConnector().create().createChannelSource(destAddr);
+ final ChannelSource<AllocatedMessageChannel>
messageChannelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
+ final
IoFuture<RemoteClientEndpoint<Object,Object>> futureClient =
BasicProtocol.connect(executorService, localRoot, messageChannelSource, allocator);
+ final RemoteClientEndpoint<Object, Object>
clientEndpoint = futureClient.get();
+ try {
+ final Client<Object,Object> client =
endpoint.createClient(clientEndpoint);
+ try {
+ clientEndpoint.autoClose();
+ final Object result =
client.send("Test").get();
+ assertEquals("response", result);
+ client.close();
+ tcpServerCloseable.close();
+ handle.close();
+ } finally {
+ IoUtils.safeClose(client);
+ clientCloseLatch.await(500L,
TimeUnit.MILLISECONDS);
+ }
+ } finally {
+ IoUtils.safeClose(clientEndpoint);
+ }
+ } finally {
+ IoUtils.safeClose(tcpServerCloseable);
+ }
+ } finally {
+ IoUtils.safeClose(handle);
+ }
+ } finally {
+ IoUtils.safeClose(serverServiceEndpoint);
+ }
+ } finally {
+ endpoint.stop();
+ }
+ } finally {
+ IoUtils.safeClose(xnio);
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ assertTrue(serviceOpened.get());
+ assertTrue(clientOpened.get());
+ assertTrue(clientClosed.get());
+ assertTrue(serviceClosed.get());
+ }
+}