[jboss-remoting-commits] JBoss Remoting SVN: r4379 - in remoting3/trunk: protocol and 18 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Thu Jul 17 12:21:03 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-07-17 12:21:03 -0400 (Thu, 17 Jul 2008)
New Revision: 4379

Added:
   remoting3/trunk/protocol/
   remoting3/trunk/protocol/basic/
   remoting3/trunk/protocol/basic/src/
   remoting3/trunk/protocol/basic/src/main/
   remoting3/trunk/protocol/basic/src/main/java/
   remoting3/trunk/protocol/basic/src/main/java/org/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java
   remoting3/trunk/protocol/basic/src/test/
   remoting3/trunk/protocol/basic/src/test/java/
   remoting3/trunk/protocol/basic/src/test/java/org/
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
Log:
Basic protocol support.  Needs more work though


Property changes on: remoting3/trunk/protocol/basic
___________________________________________________________________
Name: svn:ignore
   + target


Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,618 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import static org.jboss.xnio.Buffers.*;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.ReplyHandler;
+import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
+import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
+import org.jboss.cx.remoting.spi.marshal.Marshaller;
+import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.IdentityResolver;
+import org.jboss.cx.remoting.spi.SpiUtils;
+import org.jboss.cx.remoting.spi.AbstractAutoCloseable;
+import static org.jboss.cx.remoting.util.CollectionUtil.concurrentMap;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_ONEWAY;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REPLY;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.CLIENT_CLOSE;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.CLIENT_OPEN;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_CLOSE;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
+import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
+import org.jboss.cx.remoting.RemotingException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
+
+    private static final Logger log = Logger.getLogger(BasicHandler.class);
+
+    // clients whose requests get forwarded to the remote side
+    private final ConcurrentMap<Integer, RemoteClientEndpoint<?, ?>> remoteClients = concurrentMap();
+    // running on remote node
+    private final ConcurrentMap<Integer, ReplyHandler<?>> outstandingRequests = concurrentMap();
+    // forwarded to remote side
+    private final ConcurrentMap<Integer, Handle<RemoteClientEndpoint<?, ?>>> forwardedClients = concurrentMap();
+    // forwarded to remote side
+    private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint<?, ?>>> forwardedServices = concurrentMap();
+
+    private final boolean server;
+    private final BufferAllocator<ByteBuffer> allocator;
+
+    private final AtomicBoolean isnew = new AtomicBoolean(true);
+    private volatile AllocatedMessageChannel channel;
+    private final Executor executor;
+    private final MarshallerFactory<ByteBuffer> marshallerFactory;
+    private final ObjectResolver resolver;
+    private final ClassLoader classLoader;
+
+    @SuppressWarnings({ "unchecked" })
+    public <I, O> BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpoint<I, O> root, final Executor executor, final RemoteClientEndpointListener remoteListener, final MarshallerFactory<ByteBuffer> marshallerFactory) throws RemotingException {
+        this.server = server;
+        this.allocator = allocator;
+        this.executor = executor;
+        forwardedClients.put(Integer.valueOf(0), ((RemoteClientEndpoint)root).getHandle());
+        final RemoteClientEndpointImpl<Object, Object> endpoint = new RemoteClientEndpointImpl<Object, Object>(0, marshallerFactory, allocator);
+        remoteClients.put(Integer.valueOf(0), endpoint);
+        if (remoteListener != null) {
+            remoteListener.notifyCreated(endpoint);
+        }
+        this.marshallerFactory = marshallerFactory;
+        // todo
+        resolver = IdentityResolver.getInstance();
+        classLoader = getClass().getClassLoader();
+    }
+
+    /**
+     * Sequence number of requests originating locally.
+     */
+    private final AtomicInteger localRequestIdSeq = new AtomicInteger();
+    /**
+     * Sequence number of local clients forwarded to the remote side.
+     */
+    private final AtomicInteger localClientIdSeq = new AtomicInteger(1);
+    /**
+     * Sequence number of remote clients opened locally from services from the remote side.
+     */
+    private final AtomicInteger remoteClientIdSeq = new AtomicInteger(1);
+    /**
+     * Sequence number of services forwarded to the remote side.
+     */
+    private final AtomicInteger localServiceIdSeq = new AtomicInteger();
+
+    public void handleOpened(final AllocatedMessageChannel channel) {
+        if (isnew.getAndSet(false)) {
+            this.channel = channel;
+        }
+        channel.resumeReads();
+    }
+
+    public void handleReadable(final AllocatedMessageChannel channel) {
+        for (;;) try {
+            final ByteBuffer buffer = channel.receive();
+            if (buffer == null) {
+                // todo release all handles...
+                IoUtils.safeClose(channel);
+                return;
+            }
+            if (! buffer.hasRemaining()) {
+                // would block
+                channel.resumeReads();
+                return;
+            }
+            int msgType = buffer.get() & 0xff;
+            log.trace("Received message %s, type %d", buffer, Integer.valueOf(msgType));
+            switch (msgType) {
+                case REQUEST_ONEWAY: {
+                    final int clientId = buffer.getInt();
+                    final Handle<RemoteClientEndpoint<?, ?>> handle = getForwardedClient(clientId);
+                    if (handle == null) {
+                        log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
+                        return;
+                    }
+                    final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
+                    if (! unmarshaller.unmarshal(buffer)) {
+                        log.trace("Incomplete one-way request for client ID %d", Integer.valueOf(clientId));
+                        break;
+                    }
+                    final Object payload;
+                    try {
+                        payload = unmarshaller.get();
+                    } catch (ClassNotFoundException e) {
+                        log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
+                        break;
+                    }
+                    final RemoteClientEndpoint<?, ?> clientEndpoint = handle.getResource();
+                    receiveRequest(clientEndpoint, payload);
+                    break;
+                }
+                case REQUEST: {
+                    final int clientId = buffer.getInt();
+                    final Handle<RemoteClientEndpoint<?, ?>> handle = getForwardedClient(clientId);
+                    if (handle == null) {
+                        log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
+                        break;
+                    }
+                    final int requestId = buffer.getInt();
+                    final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
+                    if (! unmarshaller.unmarshal(buffer)) {
+                        log.trace("Incomplete request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+                        new ReplyHandlerImpl(channel, requestId, allocator).handleException("Incomplete request", null);
+                        break;
+                    }
+                    final Object payload;
+                    try {
+                        payload = unmarshaller.get();
+                    } catch (ClassNotFoundException e) {
+                        log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+                        break;
+                    }
+                    final RemoteClientEndpoint<?, ?> clientEndpoint = handle.getResource();
+                    receiveRequest(clientEndpoint, new ReplyHandlerImpl(channel, requestId, allocator), payload);
+                    break;
+                }
+                case REPLY: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler<?> replyHandler = takeOutstandingReqeust(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
+                    if (! unmarshaller.unmarshal(buffer)) {
+                        replyHandler.handleException("Incomplete reply", null);
+                        log.trace("Incomplete reply to request ID %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final Object payload;
+                    try {
+                        payload = unmarshaller.get();
+                    } catch (ClassNotFoundException e) {
+                        replyHandler.handleException("Reply unmarshalling failed", e);
+                        log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    handleReply(replyHandler, payload);
+                    break;
+                }
+                case REQUEST_FAILED: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler<?> replyHandler = takeOutstandingReqeust(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
+                    if (! unmarshaller.unmarshal(buffer)) {
+                        replyHandler.handleException("Incomplete exception reply", null);
+                        log.trace("Incomplete exception reply to request ID %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final Object message;
+                    try {
+                        message = unmarshaller.get();
+                    } catch (ClassNotFoundException e) {
+                        replyHandler.handleException("Exception reply unmarshalling failed", e);
+                        log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final Object cause;
+                    try {
+                        cause = unmarshaller.get();
+                    } catch (ClassNotFoundException e) {
+                        replyHandler.handleException("Exception reply unmarshalling failed", e);
+                        log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    handleException(replyHandler, message, cause);
+                    break;
+                }
+                case CLIENT_CLOSE: {
+                    break;
+                }
+                case CLIENT_OPEN: {
+                    final int serviceId = buffer.getInt();
+                    final int clientId = buffer.getInt();
+                    final Handle<RemoteServiceEndpoint<?, ?>> handle = getForwardedService(serviceId);
+                    if (handle == null) {
+                        // todo log invalid request
+                        break;
+                    }
+                    final RemoteServiceEndpoint<?, ?> serviceEndpoint = handle.getResource();
+                    final RemoteClientEndpoint<?, ?> clientEndpoint = serviceEndpoint.createClientEndpoint();
+
+                    break;
+                }
+                case SERVICE_CLOSE: {
+                    break;
+                }
+                default: {
+                    log.trace("Received invalid message type %d", Integer.valueOf(msgType));
+                }
+            }
+        } catch (IOException e) {
+            log.error(e, "I/O error in protocol channel");
+            IoUtils.safeClose(channel);
+            return;
+        } catch (BufferUnderflowException e) {
+            log.error(e, "Malformed packet");
+        } catch (Throwable t) {
+            log.error(t, "Handler failed");
+        }
+    }
+
+    public void handleWritable(final AllocatedMessageChannel channel) {
+        for (;;) {
+            final WriteHandler handler = outputQueue.peek();
+            if (handler == null) {
+                return;
+            }
+            try {
+                if (handler.handleWrite(channel)) {
+                    log.trace("Handled write with handler %s", handler);
+                    pending.decrementAndGet();
+                    outputQueue.remove();
+                } else {
+                    channel.resumeWrites();
+                    return;
+                }
+            } catch (Throwable t) {
+                pending.decrementAndGet();
+                outputQueue.remove();
+            }
+        }
+    }
+
+    public void handleClosed(final AllocatedMessageChannel channel) {
+    }
+
+    private <I, O> ReplyHandler<O> createReplyHandler(final AllocatedMessageChannel channel, final int requestId) {
+        return new ReplyHandlerImpl<O>(channel, requestId, allocator);
+    }
+
+    RemoteClientEndpoint<?, ?> getRemoteClient(final int i) {
+        return remoteClients.get(Integer.valueOf(i));
+    }
+
+    private final class ReplyHandlerImpl<O> implements ReplyHandler<O> {
+
+        private final AllocatedMessageChannel channel;
+        private final int requestId;
+        private final BufferAllocator<ByteBuffer> allocator;
+
+        private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int requestId, final BufferAllocator<ByteBuffer> allocator) {
+            if (channel == null) {
+                throw new NullPointerException("channel is null");
+            }
+            if (allocator == null) {
+                throw new NullPointerException("allocator is null");
+            }
+            this.channel = channel;
+            this.requestId = requestId;
+            this.allocator = allocator;
+        }
+
+        public void handleReply(final O reply) {
+            ByteBuffer buffer = allocator.allocate();
+            buffer.put((byte) REPLY);
+            buffer.putInt(requestId);
+            try {
+                final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(resolver);
+                marshaller.start(reply);
+                final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+                while (! marshaller.marshal(buffer)) {
+                    bufferList.add(flip(buffer));
+                    buffer = allocator.allocate();
+                }
+                bufferList.add(flip(buffer));
+                registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+            } catch (IOException e) {
+                // todo log
+            } catch (InterruptedException e) {
+                // todo log
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        public void handleException(final String msg, final Throwable cause) {
+            ByteBuffer buffer = allocator.allocate();
+            buffer.put((byte) REQUEST_FAILED);
+            buffer.putInt(requestId);
+            try {
+                final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(resolver);
+                final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+                marshaller.start(msg);
+                while (! marshaller.marshal(buffer)) {
+                    bufferList.add(flip(buffer));
+                    buffer = allocator.allocate();
+                }
+                marshaller.start(cause);
+                while (! marshaller.marshal(buffer)) {
+                    bufferList.add(flip(buffer));
+                    buffer = allocator.allocate();
+                }
+                bufferList.add(flip(buffer));
+                registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+            } catch (IOException e) {
+                // todo log
+            } catch (InterruptedException e) {
+                // todo log
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        public void handleCancellation() {
+            final ByteBuffer buffer = allocator.allocate();
+            buffer.put((byte) CANCEL_ACK);
+            buffer.putInt(requestId);
+            buffer.flip();
+            try {
+                registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+            } catch (InterruptedException e) {
+                // todo log
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    // Session mgmt
+
+    public int openRequest(ReplyHandler<?> handler) {
+        int id;
+        do {
+            id = localRequestIdSeq.getAndIncrement();
+        } while (outstandingRequests.putIfAbsent(Integer.valueOf(id), handler) != null);
+        return id;
+    }
+
+    public int openClientFromService() {
+        int id;
+        do {
+            id = remoteClientIdSeq.getAndIncrement() << 1 | (server ? 1 : 0);
+        } while (remoteClients.putIfAbsent(Integer.valueOf(id), new RemoteClientEndpointImpl<Object, Object>(id, null, allocator)) != null);
+        return id;
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    public void openClientForForwardedService(int id, RemoteClientEndpoint<?, ?> clientEndpoint) {
+        try {
+            forwardedClients.put(Integer.valueOf(id), ((RemoteClientEndpoint)clientEndpoint).getHandle());
+        } catch (RemotingException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public Handle<RemoteClientEndpoint<?, ?>> getForwardedClient(int id) {
+        return forwardedClients.get(Integer.valueOf(id));
+    }
+
+    public ReplyHandler<?> takeOutstandingReqeust(int id) {
+        return outstandingRequests.remove(Integer.valueOf(id));
+    }
+
+    public Handle<RemoteServiceEndpoint<?, ?>> getForwardedService(final int id) {
+        return forwardedServices.get(Integer.valueOf(id));
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private static <I, O> void receiveRequest(RemoteClientEndpoint<I, O> clientEndpoint, Object request) {
+        clientEndpoint.receiveRequest((I) request);
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private static <I, O> RemoteRequestContext receiveRequest(RemoteClientEndpoint<I, O> clientEndpoint, ReplyHandler<O> replyHandler, Object request) {
+        return clientEndpoint.receiveRequest((I) request, replyHandler);
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private static <O> void handleReply(final ReplyHandler<O> replyHandler, final Object reply) {
+        SpiUtils.safeHandleReply(replyHandler, (O) reply);
+    }
+
+    private static void handleException(final ReplyHandler<?> handler, final Object message, final Object cause) {
+        SpiUtils.safeHandleException(handler, message == null ? null : message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
+    }
+
+    // Writer members
+
+    private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
+    private final AtomicInteger pending = new AtomicInteger();
+
+    private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler writeHandler) throws InterruptedException {
+        outputQueue.put(writeHandler);
+        if (pending.getAndIncrement() == 0) {
+            channel.resumeWrites();
+        }
+    }
+
+    // client endpoint
+
+    private final class RemoteClientEndpointImpl<I, O> extends AbstractAutoCloseable<RemoteClientEndpoint<I, O>> implements RemoteClientEndpoint<I, O> {
+
+        private final int identifier;
+        private final MarshallerFactory<ByteBuffer> marshallerFactory;
+        private final BufferAllocator<ByteBuffer> allocator;
+
+        public RemoteClientEndpointImpl(final int identifier, final MarshallerFactory<ByteBuffer> marshallerFactory, final BufferAllocator<ByteBuffer> allocator) {
+            super(executor);
+            if (marshallerFactory == null) {
+                throw new NullPointerException("marshallerFactory is null");
+            }
+            if (allocator == null) {
+                throw new NullPointerException("allocator is null");
+            }
+            this.identifier = identifier;
+            this.marshallerFactory = marshallerFactory;
+            this.allocator = allocator;
+        }
+
+        public void receiveRequest(final I request) {
+            log.trace("Received one-way request of type %s", request == null ? "null" : request.getClass());
+            try {
+                final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(null);
+                final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+                ByteBuffer buffer = allocator.allocate();
+                buffer.put((byte) MessageType.REQUEST_ONEWAY);
+                buffer.putInt(identifier);
+                marshaller.start(request);
+                while (! marshaller.marshal(buffer)) {
+                    bufferList.add(flip(buffer));
+                    buffer = allocator.allocate();
+                }
+                bufferList.add(flip(buffer));
+                try {
+                    registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+                } catch (InterruptedException e) {
+                    log.trace(e, "receiveRequest was interrupted");
+                    Thread.currentThread().interrupt();
+                    return;
+                }
+            } catch (Throwable t) {
+                // ignore
+                log.trace(t, "receiveRequest failed with an exception");
+                return;
+            }
+        }
+
+        public RemoteRequestContext receiveRequest(final I request, final ReplyHandler<O> handler) {
+            log.trace("Received request of type %s", request == null ? "null" : request.getClass());
+            try {
+                final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(null);
+                final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+                ByteBuffer buffer = allocator.allocate();
+                buffer.put((byte) MessageType.REQUEST);
+                buffer.putInt(identifier);
+                final int id = openRequest(handler);
+                buffer.putInt(id);
+                marshaller.start(request);
+                while (! marshaller.marshal(buffer)) {
+                    bufferList.add(flip(buffer));
+                    buffer = allocator.allocate();
+                }
+                bufferList.add(flip(buffer));
+                try {
+                    registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    SpiUtils.safeHandleCancellation(handler);
+                    return SpiUtils.getBlankRemoteRequestContext();
+                }
+                log.trace("Sent request %s", request);
+                return new RemoteRequestContextImpl(id, allocator, channel);
+            } catch (Throwable t) {
+                log.trace(t, "receiveRequest failed with an exception");
+                SpiUtils.safeHandleException(handler, "Failed to build request", t);
+                return SpiUtils.getBlankRemoteRequestContext();
+            }
+        }
+    }
+
+    public final class RemoteRequestContextImpl implements RemoteRequestContext {
+
+        private final BufferAllocator<ByteBuffer> allocator;
+        private final int id;
+        private final AllocatedMessageChannel channel;
+
+        public RemoteRequestContextImpl(final int id, final BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
+            this.id = id;
+            this.allocator = allocator;
+            this.channel = channel;
+        }
+
+        public void cancel(final boolean mayInterrupt) {
+            try {
+                final ByteBuffer buffer = allocator.allocate();
+                buffer.put((byte) MessageType.CANCEL_REQUEST);
+                buffer.putInt(id);
+                buffer.put((byte) (mayInterrupt ? 1 : 0));
+                buffer.flip();
+                registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+            } catch (InterruptedException e) {
+                // todo log that cancel attempt failed
+                Thread.currentThread().interrupt();
+            } catch (Throwable t) {
+                // todo log that cancel attempt failed
+            }
+        }
+    }
+
+    public final class RemoteServiceEndpointImpl<I, O> extends AbstractAutoCloseable<RemoteServiceEndpoint<I, O>> implements RemoteServiceEndpoint<I, O> {
+
+        private final MarshallerFactory<ByteBuffer> marshallerFactory;
+        private final BufferAllocator<ByteBuffer> allocator;
+        private final int identifier;
+
+        protected RemoteServiceEndpointImpl(final MarshallerFactory<ByteBuffer> marshallerFactory, final BufferAllocator<ByteBuffer> allocator, final int identifier) {
+            super(executor);
+            this.marshallerFactory = marshallerFactory;
+            this.allocator = allocator;
+            this.identifier = identifier;
+        }
+
+        public RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException {
+            final int id = openClientFromService();
+            final ByteBuffer buffer = allocator.allocate();
+            buffer.putInt(identifier);
+            buffer.putInt(openClientFromService());
+            buffer.flip();
+            boolean intr = false;
+            for (;;) {
+                try {
+                    registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+                    try {
+                        return new RemoteClientEndpointImpl<I,O>(id, marshallerFactory, allocator);
+                    } finally {
+                        if (intr) {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    intr = true;
+                }
+            }
+        }
+    }
+}

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java	2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,106 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.core.marshal.JBossSerializationMarshallerFactory;
+import org.jboss.cx.remoting.core.marshal.JavaSerializationMarshallerFactory;
+import org.jboss.xnio.IoHandlerFactory;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.AbstractConvertingIoFuture;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class BasicProtocol {
+
+    private static final Logger log = Logger.getLogger(BasicProtocol.class);
+
+    private BasicProtocol() {
+    }
+
+    /**
+     * Create a request server for the basic protocol.
+     *
+     * @param executor the executor to use for invocations
+     * @param localRootSource the service to draw client endpoints from for root clients on inbound connections
+     * @param allocator the buffer allocator to use
+     * @param remoteListener a listener which receives notification of the remote root client of the incoming connection
+     * @return a handler factory for passing to an XNIO server
+     */
+    public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final RemoteServiceEndpoint<?, ?> localRootSource, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpointListener remoteListener) {
+        return new IoHandlerFactory<AllocatedMessageChannel>() {
+            public IoHandler<? super AllocatedMessageChannel> createHandler() {
+                try {
+                    final RemoteClientEndpoint<?, ?> remoteClientEndpoint = localRootSource.createClientEndpoint();
+                    try {
+                        return new BasicHandler(true, allocator, remoteClientEndpoint, executor, remoteListener, new JavaSerializationMarshallerFactory(executor));
+                    } finally {
+                        try {
+                            remoteClientEndpoint.autoClose();
+                        } catch (RemotingException e) {
+                            log.error(e, "Error setting auto-close mode");
+                        }
+                    }
+                } catch (RemotingException e) {
+                    throw new IllegalStateException("The local root endpoint is unusable", e);
+                }
+            }
+        };
+    }
+
+    /**
+     * Create a request client for the basic protocol.
+     *
+     * @param <I> the request type of the new remote root service endpoint
+     * @param <O> the reply type of the new remote root service endpoint
+     * @param executor the executor to use for invocations
+     * @param localRoot the client endpoint to use as the local root client
+     * @param channelSource the XNIO channel source to use to establish the connection
+     * @param allocator the buffer allocator to use
+     * @return the future client endpoint of the remote side's root client
+     * @throws IOException if an error occurs
+     */
+    public static <I, O> IoFuture<RemoteClientEndpoint<I, O>> connect(final Executor executor, final RemoteClientEndpoint<?, ?> localRoot, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
+        final BasicHandler basicHandler = new BasicHandler(false, allocator, localRoot, executor, null, new JavaSerializationMarshallerFactory(executor));
+        final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(basicHandler);
+        return new AbstractConvertingIoFuture<RemoteClientEndpoint<I, O>, AllocatedMessageChannel>(futureChannel) {
+            @SuppressWarnings({ "unchecked" })
+            protected RemoteClientEndpoint<I, O> convert(final AllocatedMessageChannel channel) throws RemotingException {
+                final RemoteClientEndpoint<?, ?> remoteClientEndpoint = basicHandler.getRemoteClient(0);
+                return (RemoteClientEndpoint) remoteClientEndpoint;
+            }
+        };
+    }
+}

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java	2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+/**
+ *
+ */
+public final class MessageType {
+    //
+    public static final int REQUEST_ONEWAY     = 0;
+    public static final int REQUEST            = 1;
+    public static final int REPLY              = 2;
+    public static final int CANCEL_REQUEST     = 3;
+    public static final int CANCEL_ACK         = 4;
+    public static final int REQUEST_FAILED     = 5;
+    // Remote side called .close() on a forwarded RemoteClientEndpoint
+    public static final int CLIENT_CLOSE       = 6;
+    // Remote side called .close() on a forwarded RemoteClientEndpoint
+    public static final int CLIENT_OPEN        = 7;
+    public static final int SERVICE_CLOSE      = 8;
+
+    private MessageType() {
+    }
+}

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/SimpleWriteHandler.java	2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public final class SimpleWriteHandler implements WriteHandler {
+    private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
+
+    private final BufferAllocator<ByteBuffer> allocator;
+    private final ByteBuffer[] buffers;
+
+    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final List<ByteBuffer> buffers) {
+        this.allocator = allocator;
+        this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
+        logBufferSize();
+    }
+
+    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer[] buffers) {
+        this.allocator = allocator;
+        this.buffers = buffers;
+        logBufferSize();
+    }
+
+    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer buffer) {
+        this.allocator = allocator;
+        buffers = new ByteBuffer[] { buffer };
+        logBufferSize();
+    }
+
+    private void logBufferSize() {
+        if (log.isTrace()) {
+            long t = 0L;
+            for (ByteBuffer buf : buffers) {
+                t += (long)buf.remaining();
+            }
+            log.trace("Writing a message of size %d", Long.valueOf(t));
+        }
+    }
+
+    public boolean handleWrite(final WritableMessageChannel channel) {
+        boolean done = true;
+        try {
+            return (done = channel.send(buffers));
+        } catch (IOException e) {
+            log.trace(e, "Write failed");
+            return true;
+        } finally {
+            if (done) {
+                for (ByteBuffer buffer : buffers) {
+                    allocator.free(buffer);
+                }
+            }
+        }
+    }
+}

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/WriteHandler.java	2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public interface WriteHandler {
+    boolean handleWrite(WritableMessageChannel channel);
+}

Added: remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-07-17 16:21:03 UTC (rev 4379)
@@ -0,0 +1,177 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import junit.framework.TestCase;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.ConfigurableFactory;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.TcpClient;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.cx.remoting.core.EndpointImpl;
+import org.jboss.cx.remoting.RequestContext;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.ClientContext;
+import org.jboss.cx.remoting.ServiceContext;
+import org.jboss.cx.remoting.Client;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.test.support.LoggingHelper;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.nio.ByteBuffer;
+import java.net.InetSocketAddress;
+import java.io.Closeable;
+
+/**
+ *
+ */
+public final class ConnectionTestCase extends TestCase {
+    static {
+        LoggingHelper.init();
+    }
+
+    public void testConnection() throws Throwable {
+        final AtomicBoolean clientOpened = new AtomicBoolean(false);
+        final AtomicBoolean serviceOpened = new AtomicBoolean(false);
+        final AtomicBoolean clientClosed = new AtomicBoolean(false);
+        final AtomicBoolean serviceClosed = new AtomicBoolean(false);
+        final CountDownLatch clientCloseLatch = new CountDownLatch(1);
+        final ExecutorService executorService = Executors.newCachedThreadPool();
+        try {
+            final BufferAllocator<ByteBuffer> allocator = new BufferAllocator<ByteBuffer>() {
+                public ByteBuffer allocate() {
+                    return ByteBuffer.allocate(1024);
+                }
+
+                public void free(final ByteBuffer buffer) {
+                }
+            };
+            final Xnio xnio = Xnio.createNio();
+            try {
+                final EndpointImpl endpoint = new EndpointImpl();
+                endpoint.setExecutor(executorService);
+                endpoint.start();
+                try {
+                    final RemoteServiceEndpoint<Object,Object> serverServiceEndpoint = endpoint.createServiceEndpoint(new RequestListener<Object, Object>() {
+                        public void handleClientOpen(final ClientContext context) {
+                            clientOpened.set(true);
+                        }
+
+                        public void handleServiceOpen(final ServiceContext context) {
+                            serviceOpened.set(true);
+                        }
+
+                        public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
+                            try {
+                                System.out.println("Received request; sending response!");
+                                context.sendReply("response");
+                            } catch (RemotingException e) {
+                                try {
+                                    context.sendFailure("failed", e);
+                                } catch (RemotingException e1) {
+                                    System.out.println("Double fault!");
+                                }
+                            }
+                        }
+
+                        public void handleServiceClose(final ServiceContext context) {
+                            serviceClosed.set(true);
+                        }
+
+                        public void handleClientClose(final ClientContext context) {
+                            clientClosed.set(true);
+                            clientCloseLatch.countDown();
+                        }
+                    });
+                    try {
+                        final Handle<RemoteServiceEndpoint<Object,Object>> handle = serverServiceEndpoint.getHandle();
+                        serverServiceEndpoint.autoClose();
+                        try {
+                            final RemoteClientEndpointListener remoteListener = new RemoteClientEndpointListener() {
+
+                                public <I, O> void notifyCreated(final RemoteClientEndpoint<I, O> endpoint) {
+
+                                }
+                            };
+                            final ConfigurableFactory<Closeable> tcpServer = xnio.createTcpServer(executorService, Channels.convertStreamToAllocatedMessage(BasicProtocol.createServer(executorService, serverServiceEndpoint, allocator, remoteListener), 32768, 32768), new InetSocketAddress(12345));
+                            final Closeable tcpServerCloseable = tcpServer.create();
+                            try {
+                                // now create a client to connect to it
+                                final RemoteClientEndpoint<?,?> localRoot = serverServiceEndpoint.createClientEndpoint();
+                                final InetSocketAddress destAddr = new InetSocketAddress("localhost", 12345);
+                                final TcpClient tcpClient = xnio.createTcpConnector().create().createChannelSource(destAddr);
+                                final ChannelSource<AllocatedMessageChannel> messageChannelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
+                                final IoFuture<RemoteClientEndpoint<Object,Object>> futureClient = BasicProtocol.connect(executorService, localRoot, messageChannelSource, allocator);
+                                final RemoteClientEndpoint<Object, Object> clientEndpoint = futureClient.get();
+                                try {
+                                    final Client<Object,Object> client = endpoint.createClient(clientEndpoint);
+                                    try {
+                                        clientEndpoint.autoClose();
+                                        final Object result = client.send("Test").get();
+                                        assertEquals("response", result);
+                                        client.close();
+                                        tcpServerCloseable.close();
+                                        handle.close();
+                                    } finally {
+                                        IoUtils.safeClose(client);
+                                        clientCloseLatch.await(500L, TimeUnit.MILLISECONDS);
+                                    }
+                                } finally {
+                                    IoUtils.safeClose(clientEndpoint);
+                                }
+                            } finally {
+                                IoUtils.safeClose(tcpServerCloseable);
+                            }
+                        } finally {
+                            IoUtils.safeClose(handle);
+                        }
+                    } finally {
+                        IoUtils.safeClose(serverServiceEndpoint);
+                    }
+                } finally {
+                    endpoint.stop();
+                }
+            } finally {
+                IoUtils.safeClose(xnio);
+            }
+        } finally {
+            executorService.shutdownNow();
+        }
+        assertTrue(serviceOpened.get());
+        assertTrue(clientOpened.get());
+        assertTrue(clientClosed.get());
+        assertTrue(serviceClosed.get());
+    }
+}




More information about the jboss-remoting-commits mailing list