[jboss-remoting-commits] JBoss Remoting SVN: r4679 - in remoting3/trunk: api/src/test/java/org/jboss/remoting/spi and 5 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Fri Nov 14 01:41:47 EST 2008


Author: david.lloyd at jboss.com
Date: 2008-11-14 01:41:47 -0500 (Fri, 14 Nov 2008)
New Revision: 4679

Added:
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java
Removed:
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java
   remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java
   remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java
   remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
   remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
   remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java
   remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java
   remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java
   remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java
Log:
Complete changes for multiplex transport...

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -29,7 +29,6 @@
 import java.lang.ref.WeakReference;
 import org.jboss.remoting.RemotingException;
 import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.HandleableCloseable;
 import org.jboss.xnio.log.Logger;
 import org.jboss.xnio.IoUtils;
 import org.jboss.xnio.WeakCloseable;
@@ -40,11 +39,11 @@
  */
 public abstract class AbstractAutoCloseable<T> extends AbstractHandleableCloseable<T> implements AutoCloseable<T> {
 
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.resource");
+
     private final AtomicInteger refcount = new AtomicInteger(0);
     private final Executor executor;
 
-    private static final Logger log = Logger.getLogger(AbstractAutoCloseable.class);
-
     /**
      * Basic constructor.
      *

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -42,7 +42,7 @@
  */
 public abstract class AbstractHandleableCloseable<T> implements HandleableCloseable<T> {
 
-    private static final Logger log = Logger.getLogger(AbstractHandleableCloseable.class);
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.resource");
 
     protected final Executor executor;
     private final Object closeLock = new Object();

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/NamedServiceRegistry.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -23,6 +23,9 @@
 package org.jboss.remoting.spi;
 
 import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Collections;
 import java.io.IOException;
 import org.jboss.remoting.util.QualifiedName;
 import org.jboss.remoting.util.CollectionUtil;
@@ -75,6 +78,10 @@
         return map.get(path);
     }
 
+    public Set<Map.Entry<QualifiedName, Handle<RequestHandlerSource>>> getEntrySet() {
+        return Collections.unmodifiableSet(map.entrySet());
+    }
+
     public String toString() {
         return "named service registry <" + Integer.toHexString(hashCode()) + ">";
     }

Modified: remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java
===================================================================
--- remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -30,6 +30,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
 import org.jboss.remoting.CloseHandler;
 import org.jboss.remoting.test.support.LoggingHelper;
 
@@ -41,6 +42,8 @@
         LoggingHelper.init();
     }
 
+    public static final Logger log = Logger.getLogger(CloseableTestCase.class);
+
     public void testBasic() throws Throwable {
         final ExecutorService executorService = Executors.newCachedThreadPool();
         try {

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientContextImpl.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -25,12 +25,15 @@
 import java.util.concurrent.Executor;
 import org.jboss.remoting.ClientContext;
 import org.jboss.remoting.ServiceContext;
+import org.jboss.xnio.log.Logger;
 
 /**
  *
  */
 public final class ClientContextImpl extends AbstractContextImpl<ClientContext> implements ClientContext {
 
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.client-context");
+
     private final ServiceContextImpl serviceContext;
 
     ClientContextImpl(final Executor executor) {

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -53,7 +53,7 @@
     }
 
     private <I, O> ClientImpl<I, O> doCreateExternal(Class<I> requestClass, Class<O> replyClass, RequestHandler handler) throws IOException {
-        return new ClientImpl<I, O>(handler.getHandle(), endpoint.getExecutor(), requestClass, replyClass);
+        return ClientImpl.create(handler.getHandle(), endpoint.getExecutor(), requestClass, replyClass);
     }
 
     public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -27,29 +27,44 @@
 import org.jboss.remoting.Client;
 import org.jboss.remoting.IndeterminateOutcomeException;
 import org.jboss.remoting.RemoteRequestException;
+import org.jboss.remoting.CloseHandler;
 import org.jboss.remoting.core.util.QueueExecutor;
 import org.jboss.remoting.spi.Handle;
 import org.jboss.remoting.spi.RemoteRequestContext;
 import org.jboss.remoting.spi.ReplyHandler;
 import org.jboss.remoting.spi.RequestHandler;
 import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
 
 /**
  *
  */
 public final class ClientImpl<I, O> extends AbstractContextImpl<Client<I, O>> implements Client<I, O> {
 
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.client");
+
     private final Handle<RequestHandler> handle;
     private final Class<I> requestClass;
     private final Class<O> replyClass;
 
-    ClientImpl(final Handle<RequestHandler> handle, final Executor executor, final Class<I> requestClass, final Class<O> replyClass) {
+    private ClientImpl(final Handle<RequestHandler> handle, final Executor executor, final Class<I> requestClass, final Class<O> replyClass) {
         super(executor);
         this.handle = handle;
         this.requestClass = requestClass;
         this.replyClass = replyClass;
     }
 
+    static <I, O> ClientImpl<I, O> create(final Handle<RequestHandler> handle, final Executor executor, final Class<I> requestClass, final Class<O> replyClass) {
+        final ClientImpl<I, O> ci = new ClientImpl<I, O>(handle, executor, requestClass, replyClass);
+        handle.addCloseHandler(new CloseHandler<Handle<RequestHandler>>() {
+            public void handleClose(final Handle<RequestHandler> closed) {
+                IoUtils.safeClose(ci);
+            }
+        });
+        return ci;
+    }
+
     protected void closeAction() throws IOException {
         handle.close();
     }

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -53,7 +53,7 @@
     }
 
     private <I, O> ClientSourceImpl<I, O> doCreateExternal(Class<I> requestClass, Class<O> replyClass, RequestHandlerSource handlerSource) throws IOException {
-        return new ClientSourceImpl<I, O>(handlerSource.getHandle(), endpoint, requestClass, replyClass);
+        return ClientSourceImpl.create(handlerSource.getHandle(), endpoint, requestClass, replyClass);
     }
 
     public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -26,23 +26,27 @@
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ClientSource;
 import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.CloseHandler;
 import org.jboss.remoting.spi.AbstractHandleableCloseable;
 import org.jboss.remoting.spi.Handle;
 import org.jboss.remoting.spi.RequestHandler;
 import org.jboss.remoting.spi.RequestHandlerSource;
 import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
 
 /**
  *
  */
 public final class ClientSourceImpl<I, O> extends AbstractHandleableCloseable<ClientSource<I, O>> implements ClientSource<I, O> {
 
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.client-source"); 
+
     private final Handle<RequestHandlerSource> handle;
     private final Endpoint endpoint;
     private final Class<I> requestClass;
     private final Class<O> replyClass;
 
-    ClientSourceImpl(final Handle<RequestHandlerSource> handle, final EndpointImpl endpoint, final Class<I> requestClass, final Class<O> replyClass) {
+    private ClientSourceImpl(final Handle<RequestHandlerSource> handle, final EndpointImpl endpoint, final Class<I> requestClass, final Class<O> replyClass) {
         super(endpoint.getExecutor());
         this.handle = handle;
         this.endpoint = endpoint;
@@ -50,6 +54,16 @@
         this.replyClass = replyClass;
     }
 
+    static <I, O> ClientSourceImpl<I, O> create(final Handle<RequestHandlerSource> handle, final EndpointImpl endpoint, final Class<I> requestClass, final Class<O> replyClass) {
+        final ClientSourceImpl<I, O> csi = new ClientSourceImpl<I, O>(handle, endpoint, requestClass, replyClass);
+        handle.addCloseHandler(new CloseHandler<Handle<RequestHandlerSource>>() {
+            public void handleClose(final Handle<RequestHandlerSource> closed) {
+                IoUtils.safeClose(csi);
+            }
+        });
+        return csi;
+    }
+
     protected void closeAction() throws IOException {
         handle.close();
     }

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -46,7 +46,7 @@
         Logger.getLogger("org.jboss.remoting").info("JBoss Remoting version %s", Version.VERSION);
     }
 
-    private static final Logger log = Logger.getLogger(Endpoint.class);
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.endpoint");
 
     private String name;
 
@@ -234,12 +234,7 @@
         boolean ok = false;
         final Handle<RequestHandler> handle = requestHandler.getHandle();
         try {
-            final ClientImpl<I, O> client = new ClientImpl<I, O>(handle, executor, requestType, replyType);
-            client.addCloseHandler(new CloseHandler<Client<I, O>>() {
-                public void handleClose(final Client<I, O> closed) {
-                    IoUtils.safeClose(handle);
-                }
-            });
+            final ClientImpl<I, O> client = ClientImpl.create(handle, executor, requestType, replyType);
             ok = true;
             return client;
         } finally {
@@ -257,7 +252,7 @@
         boolean ok = false;
         final Handle<RequestHandlerSource> handle = requestHandlerSource.getHandle();
         try {
-            final ClientSourceImpl<I, O> clientSource = new ClientSourceImpl<I, O>(handle, this, requestClass, replyClass);
+            final ClientSourceImpl<I, O> clientSource = ClientSourceImpl.create(handle, this, requestClass, replyClass);
             ok = true;
             return clientSource;
         } finally {

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -25,7 +25,6 @@
 import java.io.IOException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
-import org.jboss.remoting.CloseHandler;
 import org.jboss.remoting.RemoteExecutionException;
 import org.jboss.remoting.RequestListener;
 import org.jboss.remoting.RemoteRequestException;
@@ -46,7 +45,7 @@
     private final Class<I> requestClass;
     private final Class<O> replyClass;
 
-    private static final Logger log = Logger.getLogger(LocalRequestHandler.class);
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.listener");
 
     LocalRequestHandler(Config<I, O> config) {
         super(config.getExecutor());
@@ -88,18 +87,17 @@
         };
     }
 
+    protected void closeAction() throws IOException {
+        try {
+            requestListener.handleClientClose(clientContext);
+        } catch (Throwable t) {
+            log.error(t, "Unexpected exception in request listener client close handler method");
+        }
+    }
+
     void open() throws IOException {
         try {
             requestListener.handleClientOpen(clientContext);
-            addCloseHandler(new CloseHandler<RequestHandler>() {
-                public void handleClose(final RequestHandler closed) {
-                    try {
-                        requestListener.handleClientClose(clientContext);
-                    } catch (Throwable t) {
-                        log.error(t, "Unexpected exception in request listener client close handler method");
-                    }
-                }
-            });
         } catch (Throwable t) {
             final IOException ioe = new IOException("Failed to open client context");
             ioe.initCause(t);

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -43,7 +43,7 @@
     private final Class<I> requestClass;
     private final Class<O> replyClass;
 
-    private static final Logger log = Logger.getLogger(LocalRequestHandlerSource.class);
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.listener-source");
 
     LocalRequestHandlerSource(final Config<I, O> config) {
         super(config.getExecutor());

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ServiceContextImpl.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -24,11 +24,14 @@
 
 import java.util.concurrent.Executor;
 import org.jboss.remoting.ServiceContext;
+import org.jboss.xnio.log.Logger;
 
 /**
  *
  */
 public final class ServiceContextImpl extends AbstractContextImpl<ServiceContext> implements ServiceContext {
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.service-context");
+
     protected ServiceContextImpl(final Executor executor) {
         super(executor);
     }

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicRequestHandler.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -42,7 +42,7 @@
  */
 final class BasicRequestHandler extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
 
-    private static final Logger log = Logger.getLogger(BasicRequestHandler.class);
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.basic");
 
     private final AtomicInteger requestSequence;
     private final Lock reqLock;
@@ -77,6 +77,8 @@
                     } catch (IOException e) {
                         log.error(e, "Error writing cancel request");
                         IoUtils.safeClose(BasicRequestHandler.this);
+                    } finally {
+                        reqLock.unlock();
                     }
                 }
             };

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BufferByteOutput.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.xnio.BufferAllocator;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+/**
+ *
+ */
+public class BufferByteOutput implements ByteOutput {
+
+    private ByteBuffer current;
+    private final BufferAllocator<ByteBuffer> allocator;
+    private final Collection<ByteBuffer> target;
+
+    public BufferByteOutput(final BufferAllocator<ByteBuffer> allocator, final Collection<ByteBuffer> target) {
+        this.allocator = allocator;
+        this.target = target;
+    }
+
+    private ByteBuffer getCurrent() {
+        final ByteBuffer buffer = current;
+        return buffer == null ? (current = allocator.allocate()) : buffer;
+    }
+
+    public void write(final int i) {
+        final ByteBuffer buffer = getCurrent();
+        buffer.put((byte) i);
+        if (! buffer.hasRemaining()) {
+            buffer.flip();
+            target.add(buffer);
+            current = null;
+        }
+    }
+
+    public void write(final byte[] bytes) {
+        write(bytes, 0, bytes.length);
+    }
+
+    public void write(final byte[] bytes, int offs, int len) {
+        while (len > 0) {
+            final ByteBuffer buffer = getCurrent();
+            final int c = Math.min(len, buffer.remaining());
+            buffer.put(bytes, offs, c);
+            offs += c;
+            len -= c;
+            if (! buffer.hasRemaining()) {
+                buffer.flip();
+                target.add(buffer);
+                current = null;
+            }
+        }
+    }
+
+    public void close() {
+        flush();
+    }
+
+    public void flush() {
+        final ByteBuffer buffer = current;
+        if (buffer != null) {
+            buffer.flip();
+            target.add(buffer);
+            current = null;
+        }
+    }
+}

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/FutureRemoteRequestHandlerSource.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class FutureRemoteRequestHandlerSource extends AbstractIoFuture<RequestHandlerSource> {
+
+    public IoFuture<RequestHandlerSource> cancel() {
+        return this;
+    }
+
+    protected boolean setException(final IOException exception) {
+        return super.setException(exception);
+    }
+
+    protected boolean setResult(final RequestHandlerSource result) {
+        return super.setResult(result);
+    }
+}

Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -24,6 +24,8 @@
 
 import java.util.HashMap;
 import java.util.IdentityHashMap;
+import java.util.Set;
+import java.util.Collections;
 
 /**
  *
@@ -59,6 +61,17 @@
         leftMap.remove(oldKey1Obj);
     }
 
+    public boolean putIfAbsent(final int key1, final T key2) {
+        final Integer key1Obj = Integer.valueOf(key1);
+        if (leftMap.containsKey(key1Obj)) {
+            return false;
+        }
+        final T oldKey2 = leftMap.put(key1Obj, key2);
+        rightMap.put(key2, key1Obj);
+        rightMap.remove(oldKey2);
+        return true;
+    }
+
     public T remove(final int key) {
         final T oldRightKey = leftMap.remove(Integer.valueOf(key));
         rightMap.remove(oldRightKey);
@@ -69,6 +82,10 @@
         leftMap.remove(rightMap.remove(key));
     }
 
+    public Set<T> getKeys() {
+        return Collections.unmodifiableSet(rightMap.keySet());
+    }
+
     public static <T> IntegerBiMap<T> create() {
         return new IdentityHashIntegerBiMap<T>();
     }

Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -26,6 +26,8 @@
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Collections;
+import java.util.Collection;
 import org.jboss.remoting.spi.AutoCloseable;
 import org.jboss.remoting.spi.Handle;
 
@@ -73,6 +75,10 @@
         leftMap.remove(rightMap.remove(key));
     }
 
+    public Collection<Handle<T>> getKeys() {
+        return Collections.unmodifiableCollection(leftMap.values());
+    }
+
     public static <T extends AutoCloseable<T>> IntegerResourceBiMap<T> create() {
         return new IdentityHashIntegerResourceBiMap<T>();
     }

Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -22,6 +22,9 @@
 
 package org.jboss.remoting.protocol.multiplex;
 
+import org.jboss.remoting.util.SynchronizedSet;
+import java.util.Set;
+
 /**
  *
  */
@@ -32,10 +35,14 @@
 
     void put(int key1, T key2);
 
+    boolean putIfAbsent(int key1, T key2);
+
     T remove(int key);
 
     void remove(T key);
 
+    Set<T> getKeys();
+
     class Util {
 
         private Util() {
@@ -69,6 +76,12 @@
                 }
             }
 
+            public boolean putIfAbsent(final int key1, final T key2) {
+                synchronized (lock) {
+                    return orig.putIfAbsent(key1, key2);
+                }
+            }
+
             public T remove(final int key) {
                 synchronized (lock) {
                     return orig.remove(key);
@@ -80,6 +93,10 @@
                     orig.remove(key);
                 }
             }
+
+            public Set<T> getKeys() {
+                return new SynchronizedSet<T>(orig.getKeys(), lock);
+            }
         }
 
         public static <T> IntegerBiMap<T> synchronizing(IntegerBiMap<T> orig) {

Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -24,7 +24,10 @@
 
 import org.jboss.remoting.spi.AutoCloseable;
 import org.jboss.remoting.spi.Handle;
+import org.jboss.remoting.util.SynchronizedSet;
+import org.jboss.remoting.util.SynchronizedCollection;
 import java.util.Iterator;
+import java.util.Collection;
 
 /**
  *
@@ -40,6 +43,8 @@
 
     void remove(T key);
 
+    Collection<Handle<T>> getKeys();
+
     class Util {
 
         private Util() {
@@ -85,6 +90,10 @@
                 }
             }
 
+            public Collection<Handle<T>> getKeys() {
+                return new SynchronizedCollection<Handle<T>>(orig.getKeys(), lock);
+            }
+
             public Iterator<Handle<T>> iterator() {
                 return null;
             }

Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -28,40 +28,50 @@
 enum MessageType {
 
     /**
+     * Signals that the connection should be closed in an orderly fashion.  After this message is sent, no further
+     * requests or service advertisements may be sent.
+     */
+    CONNECTION_CLOSE(0x00),
+    /**
      * The request part of a request-response sequence, sent from the Client to the RequestListener.
      */
-    REQUEST(2),
+    REQUEST(0x10),
     /**
      * The reply part of a request-response sequence, sent from the RequestListener to the Client.
      */
-    REPLY(3),
+    REPLY(0x11),
     /**
      * A cancellation request for an outstanding request, sent from the Client to the RequestListener.
      */
-    CANCEL_REQUEST(4),
+    CANCEL_REQUEST(0x12),
     /**
      * Acknowlegement that a request was cancelled, sent from the RequestListener to the Client.
      */
-    CANCEL_ACK(5),
+    CANCEL_ACK(0x13),
     /**
      * Message that the request could not be received on the remote end, sent from to the Client from the
-     * protocol handler as a 
+     * protocol handler.
      */
-    REQUEST_RECEIVE_FAILED(6),
+    REQUEST_RECEIVE_FAILED(0x14),
     // Request failed due to exception
-    REQUEST_FAILED(7),
-    // Request completed but no reply or exception was sent
-    REQUEST_OUTCOME_UNKNOWN(8),
+    REQUEST_FAILED(0x15),
     // Remote side called .close() on a forwarded RequestHandler
-    CLIENT_CLOSE(9),
+    CLIENT_CLOSE(0x20),
     // Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
-    CLIENT_OPEN(10),
-    // Remote side called .close() on a forwarded RequestHandlerSource
-    SERVICE_CLOSE(11),
-    // Remote side brought a new service online
-    SERVICE_ADVERTISE(12),
-    // Remote side's service is no longer available
-    SERVICE_UNADVERTISE(13),
+    CLIENT_OPEN(0x21),
+    // Request to open a service at a path
+    SERVICE_OPEN_REQUEST(0x30),
+    // Reply for a successful service open
+    SERVICE_OPEN_REPLY(0x31),
+    // Reply for a generally failed service open
+    SERVICE_OPEN_FAILED(0x32),
+    SERVICE_OPEN_NOT_FOUND(0x33),
+    SERVICE_OPEN_FORBIDDEN(0x34),
+
+    // Notify the remote side that the service will no longer be used
+    SERVICE_CLOSE_REQUEST(0x3e),
+    // The service channel is closed; no further clients may be opened
+    SERVICE_CLOSE_NOTIFY(0x3f),
     ;
     private final int id;
 
@@ -81,18 +91,22 @@
      */
     public static MessageType getMessageType(final int id) {
         switch (id) {
-            case 2: return REQUEST;
-            case 3: return REPLY;
-            case 4: return CANCEL_REQUEST;
-            case 5: return CANCEL_ACK;
-            case 6: return REQUEST_RECEIVE_FAILED;
-            case 7: return REQUEST_FAILED;
-            case 8: return REQUEST_OUTCOME_UNKNOWN;
-            case 9: return CLIENT_CLOSE;
-            case 10: return CLIENT_OPEN;
-            case 11: return SERVICE_CLOSE;
-            case 12: return SERVICE_ADVERTISE;
-            case 13: return SERVICE_UNADVERTISE;
+            case 0x00: return CONNECTION_CLOSE;
+            case 0x10: return REQUEST;
+            case 0x11: return REPLY;
+            case 0x12: return CANCEL_REQUEST;
+            case 0x13: return CANCEL_ACK;
+            case 0x14: return REQUEST_RECEIVE_FAILED;
+            case 0x15: return REQUEST_FAILED;
+            case 0x20: return CLIENT_CLOSE;
+            case 0x21: return CLIENT_OPEN;
+            case 0x30: return SERVICE_OPEN_REQUEST;
+            case 0x31: return SERVICE_OPEN_REPLY;
+            case 0x32: return SERVICE_OPEN_FAILED;
+            case 0x33: return SERVICE_OPEN_NOT_FOUND;
+            case 0x34: return SERVICE_OPEN_FORBIDDEN;
+            case 0x3e: return SERVICE_CLOSE_REQUEST;
+            case 0x3f: return SERVICE_CLOSE_NOTIFY;
             default: throw new IllegalArgumentException("Invalid message type ID");
         }
     }

Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -27,6 +27,7 @@
 import org.jboss.xnio.BufferAllocator;
 import org.jboss.marshalling.MarshallerFactory;
 import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.remoting.spi.NamedServiceRegistry;
 
 /**
  * A configuration object for the multiplex protocol.
@@ -37,6 +38,7 @@
     private int linkMetric;
     private Executor executor;
     private BufferAllocator<ByteBuffer> allocator;
+    private NamedServiceRegistry namedServiceRegistry;
 
     /**
      * Construct a new instance.
@@ -133,4 +135,22 @@
     public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
         this.allocator = allocator;
     }
+
+    /**
+     * Get the named service registry for this connection.
+     *
+     * @return the registry
+     */
+    public NamedServiceRegistry getNamedServiceRegistry() {
+        return namedServiceRegistry;
+    }
+
+    /**
+     * Set the named service registry for this connection.
+     *
+     * @param namedServiceRegistry the registry
+     */
+    public void setNamedServiceRegistry(final NamedServiceRegistry namedServiceRegistry) {
+        this.namedServiceRegistry = namedServiceRegistry;
+    }
 }

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConnection.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,444 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.remoting.spi.NamedServiceRegistry;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.AbstractHandleableCloseable;
+import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.IndeterminateOutcomeException;
+import org.jboss.remoting.util.QualifiedName;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.Closeable;
+
+/**
+ *
+ */
+public final class MultiplexConnection extends AbstractHandleableCloseable<MultiplexConnection> {
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex");
+
+    //--== Connection configuration items ==--
+    private final MarshallerFactory marshallerFactory;
+    private final MarshallingConfiguration marshallingConfiguration;
+    private final int linkMetric;
+    private final Executor executor;
+    // buffer allocator for outbound message assembly
+    private final BufferAllocator<ByteBuffer> allocator;
+
+    // running on remote node
+    private final IntegerBiMap<ReplyHandler> remoteRequests = IdentityHashIntegerBiMap.createSynchronizing();
+    // running on local node
+    private final IntegerBiMap<RemoteRequestContext> localRequests = IdentityHashIntegerBiMap.createSynchronizing();
+    // sequence for remote requests
+    private final AtomicInteger requestSequence = new AtomicInteger();
+
+    // clients whose requests get forwarded to the remote side
+    // even #s were opened from services forwarded to us (our sequence)
+    // odd #s were forwarded directly to us (remote sequence)
+    private final IntegerBiMap<RequestHandler> remoteClients = IdentityHashIntegerBiMap.createSynchronizing();
+    // forwarded to remote side (handled on this side)
+    private final IntegerResourceBiMap<RequestHandler> forwardedClients = IdentityHashIntegerResourceBiMap.createSynchronizing();
+    // sequence for forwarded clients (shift left one bit, add one, limit is 2^30)
+    private final AtomicInteger forwardedClientSequence = new AtomicInteger();
+    // sequence for clients created from services forwarded to us (shift left one bit, limit is 2^30)
+    private final AtomicInteger remoteClientSequence = new AtomicInteger();
+
+    // services on the remote side
+    private final IntegerBiMap<FutureRemoteRequestHandlerSource> remoteServices = IdentityHashIntegerBiMap.createSynchronizing();
+    // forwarded to remote side (handled on this side)
+    private final IntegerResourceBiMap<RequestHandlerSource> forwardedServices = IdentityHashIntegerResourceBiMap.createSynchronizing();
+    // sequence for remote services
+    private final AtomicInteger remoteServiceSequence = new AtomicInteger();
+
+    // registered services by path
+    private final NamedServiceRegistry namedServiceRegistry;
+
+    private final Endpoint endpoint;
+
+    private final AllocatedMessageChannel channel;
+
+    public MultiplexConnection(final Endpoint endpoint, final AllocatedMessageChannel channel, final MultiplexConfiguration configuration) {
+        super(configuration.getExecutor());
+        this.endpoint = endpoint;
+        this.channel = channel;
+        marshallerFactory = configuration.getMarshallerFactory();
+        if (marshallerFactory == null) {
+            throw new NullPointerException("marshallerFactory is null");
+        }
+        marshallingConfiguration = configuration.getMarshallingConfiguration();
+        if (marshallingConfiguration == null) {
+            throw new NullPointerException("marshallingConfiguration is null");
+        }
+        linkMetric = configuration.getLinkMetric();
+        executor = configuration.getExecutor();
+        if (executor == null) {
+            throw new NullPointerException("executor is null");
+        }
+        allocator = configuration.getAllocator();
+        if (allocator == null) {
+            throw new NullPointerException("allocator is null");
+        }
+        namedServiceRegistry = configuration.getNamedServiceRegistry();
+        if (namedServiceRegistry == null) {
+            throw new NullPointerException("namedServiceRegistry is null");
+        }
+    }
+
+    // sequence methods
+
+    int nextRequest() {
+        return requestSequence.getAndIncrement() & 0x7fffffff;
+    }
+
+    int nextForwardedClient() {
+        return (forwardedClientSequence.getAndIncrement() << 1 | 1) & 0x7fffffff;
+    }
+
+    int nextRemoteClient() {
+        return remoteClientSequence.getAndIncrement() << 1 & 0x7fffffff;
+    }
+
+    int nextRemoteService() {
+        return remoteServiceSequence.getAndIncrement() & 0x7fffffff;
+    }
+
+    void doBlockingWrite(ByteBuffer... buffers) throws IOException {
+        if (buffers.length == 1) doBlockingWrite(buffers[0]); else for (;;) {
+            if (channel.send(buffers)) {
+                return;
+            }
+            channel.awaitWritable();
+        }
+    }
+
+    void doBlockingWrite(ByteBuffer buffer) throws IOException {
+        log.trace("Sending message:\n%s", Buffers.createDumper(buffer, 8, 1));
+        for (;;) {
+            if (channel.send(buffer)) {
+                return;
+            }
+            channel.awaitWritable();
+        }
+    }
+
+    void doBlockingWrite(List<ByteBuffer> buffers) throws IOException {
+        doBlockingWrite(buffers.toArray(new ByteBuffer[buffers.size()]));
+    }
+
+    MarshallerFactory getMarshallerFactory() {
+        return marshallerFactory;
+    }
+
+    MarshallingConfiguration getMarshallingConfiguration() {
+        return marshallingConfiguration;
+    }
+
+    int getLinkMetric() {
+        return linkMetric;
+    }
+
+    protected Executor getExecutor() {
+        return executor;
+    }
+
+    BufferAllocator<ByteBuffer> getAllocator() {
+        return allocator;
+    }
+
+    Endpoint getEndpoint() {
+        return endpoint;
+    }
+
+    AllocatedMessageChannel getChannel() {
+        return channel;
+    }
+
+    void removeRemoteClient(final int identifier) {
+        remoteClients.remove(identifier);
+    }
+
+    void addRemoteRequest(final int id, final ReplyHandler handler) {
+        remoteRequests.put(id, handler);
+    }
+
+    void addRemoteClient(final int id, final RequestHandler handler) {
+        remoteClients.put(id, handler);
+    }
+
+    Handle<RequestHandler> getForwardedClient(final int id) {
+        return forwardedClients.get(id);
+    }
+
+    ReplyHandler removeRemoteRequest(final int id) {
+        return remoteRequests.remove(id);
+    }
+
+    RemoteRequestContext getLocalRequest(final int id) {
+        return localRequests.get(id);
+    }
+
+    ReplyHandler getRemoteRequest(final int id) {
+        return remoteRequests.get(id);
+    }
+
+    Handle<RequestHandler> removeForwardedClient(final int id) {
+        return forwardedClients.remove(id);
+    }
+
+    Handle<RequestHandlerSource> getForwardedService(final int id) {
+        return forwardedServices.get(id);
+    }
+
+    void addForwardedClient(final int id, final Handle<RequestHandler> handle) {
+        forwardedClients.put(id, handle);
+    }
+
+    void addForwadedService(final int id, final Handle<RequestHandlerSource> service) {
+        forwardedServices.put(id, service);
+    }
+
+    Handle<RequestHandlerSource> removeForwardedService(final int id) {
+        return forwardedServices.remove(id);
+    }
+
+    Handle<RequestHandlerSource> getServiceByPath(String path) {
+        return getService(QualifiedName.parse(path));
+    }
+
+    Handle<RequestHandlerSource> getService(final QualifiedName name) {
+        return namedServiceRegistry.lookupService(name);
+    }
+
+    FutureRemoteRequestHandlerSource getFutureRemoteService(final int id) {
+        return remoteServices.get(id);
+    }
+
+    FutureRemoteRequestHandlerSource removeFutureRemoteService(final int id) {
+        return remoteServices.remove(id);
+    }
+
+    public Handle<RequestHandlerSource> openRemoteService(final QualifiedName name) throws IOException {
+        final FutureRemoteRequestHandlerSource future = new FutureRemoteRequestHandlerSource();
+        int id;
+        for (;;) {
+            id = nextRemoteService();
+            if (remoteServices.putIfAbsent(id, future)) {
+                break;
+            }
+        }
+        ByteBuffer buffer = ByteBuffer.allocate(5 + getByteLength(name));
+        buffer.put((byte) MessageType.SERVICE_OPEN_REQUEST.getId());
+        buffer.putInt(id);
+        putQualifiedName(buffer, name);
+        buffer.flip();
+        doBlockingWrite(buffer);
+        try {
+            final Handle<RequestHandlerSource> handle = future.getInterruptibly().getHandle();
+            log.trace("Opened %s to %s", handle, this);
+            return handle;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new InterruptedIOException("Interrupted while waiting for remote service");
+        }
+    }
+
+    static int getByteLength(QualifiedName name) {
+        int cnt = 2; // short header
+        for (String s : name) {
+            cnt += getByteLength(s);
+        }
+        return cnt;
+    }
+
+    static int getByteLength(String s) {
+        final int len = s.length();
+        int cnt = 0;
+        for (int i = 0; i < len; i++) {
+            char ch = s.charAt(i);
+            if (ch > 0 && ch <= 0x7f) {
+                cnt ++;
+            } else if (ch <= 0x07ff) {
+                cnt += 2;
+            } else {
+                cnt += 3;
+            }
+        }
+        // null terminator...
+        cnt ++;
+        return cnt;
+    }
+
+    static String getString(final ByteBuffer buffer) {
+        StringBuilder builder = new StringBuilder();
+        int state = 0, a = 0;
+        while (buffer.hasRemaining()) {
+            final int v = buffer.get() & 0xff;
+            switch (state) {
+                case 0: {
+                    if (v == 0) {
+                        return builder.toString();
+                    } else if (v < 128) {
+                        builder.append((char) v);
+                    } else if (192 <= v && v < 224) {
+                        a = v << 6;
+                        state = 1;
+                    } else if (224 <= v && v < 232) {
+                        a = v << 12;
+                        state = 2;
+                    } else {
+                        builder.append('?');
+                    }
+                    break;
+                }
+                case 1: {
+                    if (v == 0) {
+                        builder.append('?');
+                        return builder.toString();
+                    } else if (128 <= v && v < 192) {
+                        a |= v & 0x3f;
+                        builder.append((char) a);
+                    } else {
+                        builder.append('?');
+                    }
+                    state = 0;
+                    break;
+                }
+                case 2: {
+                    if (v == 0) {
+                        builder.append('?');
+                        return builder.toString();
+                    } else if (128 <= v && v < 192) {
+                        a |= (v & 0x3f) << 6;
+                        state = 1;
+                    } else {
+                        builder.append('?');
+                        state = 0;
+                    }
+                    break;
+                }
+                default:
+                    throw new IllegalStateException("wrong state");
+            }
+        }
+        return builder.toString();
+    }
+
+    static void putString(final ByteBuffer buffer, final String string) {
+        final int len = string.length();
+        for (int i = 0; i < len; i ++) {
+            char ch = string.charAt(i);
+            if (ch > 0 && ch <= 0x7f) {
+                buffer.put((byte) ch);
+            } else if (ch <= 0x07ff) {
+                buffer.put((byte) (0xc0 | 0x1f & ch >> 6));
+                buffer.put((byte) (0x80 | 0x3f & ch));
+            } else {
+                buffer.put((byte) (0xe0 | 0x0f & ch >> 12));
+                buffer.put((byte) (0x80 | 0x3f & ch >> 6));
+                buffer.put((byte) (0x80 | 0x3f & ch));
+            }
+        }
+        buffer.put((byte) 0);
+    }
+
+    static QualifiedName getQualifiedName(final ByteBuffer buffer) {
+        final int len = buffer.getShort() & 0xffff;
+        final String[] segs = new String[len];
+        for (int i = 0; i < len; i++) {
+            segs[i] = getString(buffer);
+        }
+        return new QualifiedName(segs);
+    }
+
+    static void putQualifiedName(final ByteBuffer buffer, final QualifiedName qualifiedName) {
+        final int len = qualifiedName.length();
+        if (len > 0xffff) {
+            throw new IllegalArgumentException("Qualified name is too long");
+        }
+        buffer.putShort((short) len);
+        for (String seg : qualifiedName) {
+            putString(buffer, seg);
+        }
+    }
+
+    protected void closeAction() {
+        // just to make sure...
+        IoUtils.safeClose(channel);
+        final IndeterminateOutcomeException ioe = new IndeterminateOutcomeException("The connection was closed");
+        // Things running remotely
+        for (ReplyHandler x : remoteRequests.getKeys()) {
+            SpiUtils.safeHandleException(x, ioe);
+        }
+        for (RequestHandler x : remoteClients.getKeys()) {
+            IoUtils.safeClose(x);
+        }
+        for (FutureRemoteRequestHandlerSource future : remoteServices.getKeys()) {
+            future.addNotifier(MultiplexConnection.<RequestHandlerSource>closingNotifier());
+        }
+        // Things running locally
+        for (RemoteRequestContext localRequest : localRequests.getKeys()) {
+            localRequest.cancel();
+        }
+        for (Handle<RequestHandler> client : forwardedClients.getKeys()) {
+            IoUtils.safeClose(client);
+        }
+        for (Handle<RequestHandlerSource> service : forwardedServices.getKeys()) {
+            IoUtils.safeClose(service);
+        }
+    }
+
+    public String toString() {
+        return "multiplex connection <" + Integer.toHexString(hashCode()) + "> on " + channel;
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private static <T extends Closeable> IoFuture.Notifier<T> closingNotifier() {
+        return (IoFuture.Notifier<T>) CLOSING_NOTIFIER;
+    }
+
+    private static final ClosingNotifier CLOSING_NOTIFIER = new ClosingNotifier();
+
+    private static class ClosingNotifier extends IoFuture.HandlingNotifier<Closeable> {
+        public void handleDone(final Closeable result) {
+            IoUtils.safeClose(result);
+        }
+    }
+}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -1,879 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.multiplex;
-
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import org.jboss.xnio.IoHandler;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.log.Logger;
-import org.jboss.remoting.spi.RequestHandler;
-import org.jboss.remoting.spi.RequestHandlerSource;
-import org.jboss.remoting.spi.ReplyHandler;
-import org.jboss.remoting.spi.RemoteRequestContext;
-import org.jboss.remoting.spi.Handle;
-import org.jboss.remoting.spi.SpiUtils;
-import org.jboss.remoting.spi.AbstractAutoCloseable;
-import org.jboss.remoting.util.CollectionUtil;
-import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.Endpoint;
-import org.jboss.remoting.SimpleCloseable;
-import org.jboss.remoting.RemoteExecutionException;
-import org.jboss.remoting.IndeterminateOutcomeException;
-import org.jboss.remoting.ReplyException;
-import org.jboss.remoting.RemoteServiceConfiguration;
-import org.jboss.marshalling.MarshallerFactory;
-import org.jboss.marshalling.Unmarshaller;
-import org.jboss.marshalling.ByteOutput;
-import org.jboss.marshalling.Marshaller;
-import org.jboss.marshalling.MarshallingConfiguration;
-import org.jboss.marshalling.ObjectTable;
-import org.jboss.marshalling.Marshalling;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.nio.ByteBuffer;
-import java.nio.BufferUnderflowException;
-import java.io.IOException;
-import java.io.InvalidClassException;
-import java.io.InterruptedIOException;
-import java.io.InvalidObjectException;
-
-/**
- * Protocol handler for the basic message-oriented Remoting protocol.
- */
-final class MultiplexHandler implements IoHandler<AllocatedMessageChannel> {
-
-    private static final Logger log = Logger.getLogger(MultiplexHandler.class);
-
-    //--== Connection configuration items ==--
-    private final MarshallerFactory marshallerFactory;
-    private final MarshallingConfiguration marshallingConfiguration;
-    private final int linkMetric;
-    private final Executor executor;
-    // buffer allocator for outbound message assembly
-    private final BufferAllocator<ByteBuffer> allocator;
-
-    // running on remote node
-    private final IntegerBiMap<ReplyHandler> remoteRequests = IdentityHashIntegerBiMap.createSynchronizing();
-    // running on local node
-    private final IntegerBiMap<RemoteRequestContext> localRequests = IdentityHashIntegerBiMap.createSynchronizing();
-    // sequence for remote requests
-    private final AtomicInteger requestSequence = new AtomicInteger();
-
-    // clients whose requests get forwarded to the remote side
-    // even #s were opened from services forwarded to us (our sequence)
-    // odd #s were forwarded directly to us (remote sequence)
-    private final IntegerBiMap<RequestHandler> remoteClients = IdentityHashIntegerBiMap.createSynchronizing();
-    // forwarded to remote side (handled on this side)
-    private final IntegerResourceBiMap<RequestHandler> forwardedClients = IdentityHashIntegerResourceBiMap.createSynchronizing();
-    // sequence for forwarded clients (shift left one bit, add one, limit is 2^30)
-    private final AtomicInteger forwardedClientSequence = new AtomicInteger();
-    // sequence for clients created from services forwarded to us (shift left one bit, limit is 2^30)
-    private final AtomicInteger remoteClientSequence = new AtomicInteger();
-
-    // services forwarded to us
-    private final IntegerBiMap<RequestHandlerSource> remoteServices = IdentityHashIntegerBiMap.createSynchronizing();
-    // forwarded to remote side (handled on this side)
-    private final IntegerResourceBiMap<RequestHandlerSource> forwardedServices = IdentityHashIntegerResourceBiMap.createSynchronizing();
-    // sequence for forwarded services
-    private final AtomicInteger forwardedServiceSequence = new AtomicInteger();
-
-    private final Endpoint endpoint;
-
-    private volatile AllocatedMessageChannel channel;
-    private static final StackTraceElement[] emptyStackTraceElements = new StackTraceElement[0];
-
-    public MultiplexHandler(final Endpoint endpoint, final MultiplexConfiguration configuration) {
-        this.endpoint = endpoint;
-        allocator = configuration.getAllocator();
-        executor = configuration.getExecutor();
-        marshallerFactory = configuration.getMarshallerFactory();
-        marshallingConfiguration = configuration.getMarshallingConfiguration();
-        linkMetric = configuration.getLinkMetric();
-    }
-
-    // sequence methods
-
-    int nextRequest() {
-        return requestSequence.getAndIncrement() & 0x7fffffff;
-    }
-
-    int nextForwardedClient() {
-        return (forwardedClientSequence.getAndIncrement() << 1 | 1) & 0x7fffffff;
-    }
-
-    int nextRemoteClient() {
-        return remoteClientSequence.getAndIncrement() << 1 & 0x7fffffff;
-    }
-
-    int nextForwardedService() {
-        return forwardedServiceSequence.getAndIncrement() & 0x7fffffff;
-    }
-
-    void setChannel(final AllocatedMessageChannel channel) {
-        this.channel = channel;
-    }
-
-    public void handleOpened(final AllocatedMessageChannel channel) {
-        channel.resumeReads();
-    }
-
-    public void handleReadable(final AllocatedMessageChannel channel) {
-        for (;;) try {
-            final ByteBuffer buffer;
-            try {
-                buffer = channel.receive();
-            } catch (IOException e) {
-                log.error(e, "I/O error in protocol channel; closing channel");
-                IoUtils.safeClose(channel);
-                return;
-            }
-            if (buffer == null) {
-                // todo release all handles...
-                // todo what if the write queue is not empty?
-                IoUtils.safeClose(channel);
-                return;
-            }
-            if (! buffer.hasRemaining()) {
-                // would block
-                channel.resumeReads();
-                return;
-            }
-            final MessageType msgType;
-            try {
-                msgType = MessageType.getMessageType(buffer.get() & 0xff);
-            } catch (IllegalArgumentException ex) {
-                log.trace("Received invalid message type");
-                return;
-            }
-            log.trace("Received message %s, type %s", buffer, msgType);
-            switch (msgType) {
-                case REQUEST: {
-                    final int clientId = buffer.getInt();
-                    final Handle<RequestHandler> handle = forwardedClients.get(clientId);
-                    if (handle == null) {
-                        log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
-                        break;
-                    }
-                    final int requestId = buffer.getInt();
-                    final Object payload;
-                    try {
-                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
-                        try {
-                            unmarshaller.start(Marshalling.createByteInput(buffer));
-                            try {
-                                payload = unmarshaller.readObject();
-                                unmarshaller.finish();
-                            } catch (ClassNotFoundException e) {
-                                break;
-                            }
-                        } finally {
-                            IoUtils.safeClose(unmarshaller);
-                        }
-                    } catch (Exception ex) {
-                        // IOException | ClassNotFoundException
-                        log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
-                        try {
-                            final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
-                            try {
-                                List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
-                                marshaller.start(createByteOutput(allocator, buffers));
-                                marshaller.write(MessageType.REQUEST_RECEIVE_FAILED.getId());
-                                final IOException ioe = new IOException("Request receive failed");
-                                ex.setStackTrace(emptyStackTraceElements);
-                                ioe.initCause(ex);
-                                ioe.setStackTrace(emptyStackTraceElements);
-                                marshaller.writeObject(ioe);
-                                marshaller.finish();
-                                registerWriter(channel, new SimpleWriteHandler(allocator, buffers));
-                            } catch (InterruptedException e1) {
-                                Thread.currentThread().interrupt();
-                                log.debug("Remoting channel handler thread interrupted; closing channel");
-                                IoUtils.safeClose(channel);
-                            } finally {
-                                IoUtils.safeClose(marshaller);
-                            }
-                        } catch (IOException ioe) {
-                            log.warn("Failed to send notification of failure to unmarshal a request: %s", ioe);
-                        }
-                        break;
-                    }
-                    // request received OK
-                    final RequestHandler requestHandler = handle.getResource();
-                    requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel, requestId, allocator));
-                    break;
-                }
-                case REPLY: {
-                    final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
-                    if (replyHandler == null) {
-                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
-                        break;
-                    }
-                    final Object payload;
-                    try {
-                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
-                        try {
-                            unmarshaller.start(Marshalling.createByteInput(buffer));
-                            try {
-                                payload = unmarshaller.readObject();
-                                unmarshaller.finish();
-                            } catch (ClassNotFoundException e) {
-                                replyHandler.handleException(new InvalidClassException("Class not found: " + e.toString()));
-                                log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
-                                break;
-                            }
-                        } finally {
-                            IoUtils.safeClose(unmarshaller);
-                        }
-                    } catch (IOException ex) {
-                        log.trace("Failed to unmarshal a reply (%s), sending a ReplyException", ex);
-                        // todo
-                        SpiUtils.safeHandleException(replyHandler, new ReplyException("Unmarshal failed", ex));
-                        break;
-                    }
-                    SpiUtils.safeHandleReply(replyHandler, payload);
-                    break;
-                }
-                case CANCEL_REQUEST: {
-                    final int requestId = buffer.getInt();
-                    final RemoteRequestContext context = localRequests.get(requestId);
-                    if (context != null) {
-                        context.cancel();
-                    }
-                    break;
-                }
-                case CANCEL_ACK: {
-                    final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = remoteRequests.get(requestId);
-                    if (replyHandler != null) try {
-                        replyHandler.handleCancellation();
-                    } catch (IOException e) {
-                        log.trace("Failed to forward a cancellation acknowledgement (%s)", e);
-                    }
-                    break;
-                }
-                case REQUEST_RECEIVE_FAILED: {
-                    final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
-                    if (replyHandler == null) {
-                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
-                        break;
-                    }
-                    final IOException cause;
-                    try {
-                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
-                        try {
-                            unmarshaller.start(Marshalling.createByteInput(buffer));
-                            cause = (IOException) unmarshaller.readObject();
-                            unmarshaller.finish();
-                        } finally {
-                            IoUtils.safeClose(unmarshaller);
-                        }
-                    } catch (IOException e) {
-                        SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote operation failed; the remote exception could not be read", e));
-                        break;
-                    } catch (ClassNotFoundException e) {
-                        SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote operation failed; the remote exception could not be read", e));
-                        break;
-                    }
-                    SpiUtils.safeHandleException(replyHandler, cause);
-                    break;
-                }
-                case REQUEST_FAILED: {
-                    final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
-                    if (replyHandler == null) {
-                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
-                        break;
-                    }
-                    final IOException cause;
-                    try {
-                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
-                        try {
-                            unmarshaller.start(Marshalling.createByteInput(buffer));
-                            try {
-                                cause = (IOException) unmarshaller.readObject();
-                            } catch (ClassNotFoundException e) {
-                                replyHandler.handleException(new InvalidClassException("Class not found: " + e.toString()));
-                                log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
-                                break;
-                            } catch (ClassCastException e) {
-                                SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred when attempting to unmarshal the cause)"));
-                                break;
-                            }
-                        } finally {
-                            IoUtils.safeClose(unmarshaller);
-                        }
-                    } catch (IOException ex) {
-                        log.trace("Failed to unmarshal an exception reply (%s), sending a generic execution exception");
-                        SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred when attempting to read the cause)"));
-                        break;
-                    }
-                    SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote execution failed", cause));
-                    break;
-                }
-                case REQUEST_OUTCOME_UNKNOWN: {
-                    final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
-                    if (replyHandler == null) {
-                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
-                        break;
-                    }
-                    final String reason = readUTFZ(buffer);
-                    SpiUtils.safeHandleException(replyHandler, new IndeterminateOutcomeException(reason));
-                    break;
-                }
-                case CLIENT_CLOSE: {
-                    final int clientId = buffer.getInt();
-                    final Handle<RequestHandler> handle = forwardedClients.remove(clientId);
-                    if (handle == null) {
-                        log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
-                        break;
-                    }
-                    IoUtils.safeClose(handle);
-                    break;
-                }
-                case CLIENT_OPEN: {
-                    final int serviceId = buffer.getInt();
-                    final int clientId = buffer.getInt();
-                    final Handle<RequestHandlerSource> handle = forwardedServices.get(serviceId);
-                    if (handle == null) {
-                        log.warn("Received client open message for unknown service %d", Integer.valueOf(serviceId));
-                        break;
-                    }
-                    try {
-                        final RequestHandlerSource requestHandlerSource = handle.getResource();
-                        final Handle<RequestHandler> clientHandle = requestHandlerSource.createRequestHandler();
-                        // todo check for duplicate
-                        // todo validate the client ID
-                        log.trace("Opening client %d from service %d", Integer.valueOf(clientId), Integer.valueOf(serviceId));
-                        forwardedClients.put(clientId, clientHandle);
-                    } catch (IOException ex) {
-                        log.error(ex, "Failed to create a request handler for client ID %d", Integer.valueOf(clientId));
-                        break;
-                    } finally {
-                        IoUtils.safeClose(handle);
-                    }
-                    break;
-                }
-                case SERVICE_CLOSE: {
-                    final Handle<RequestHandlerSource> handle = forwardedServices.remove(buffer.getInt());
-                    if (handle == null) {
-                        break;
-                    }
-                    IoUtils.safeClose(handle);
-                    break;
-                }
-                case SERVICE_ADVERTISE: {
-                    final int serviceId = buffer.getInt();
-                    final String serviceType = readUTFZ(buffer);
-                    final String groupName = readUTFZ(buffer);
-                    final String endpointName = readUTFZ(buffer);
-                    final int baseMetric = buffer.getInt();
-                    int id = -1;
-                    final RequestHandlerSource handlerSource = new RequestHandlerSourceImpl(allocator, id);
-                    final int calcMetric = baseMetric + linkMetric;
-                    if (calcMetric > 0) {
-                        try {
-                            final RemoteServiceConfiguration config = new RemoteServiceConfiguration();
-                            config.setServiceType(serviceType);
-                            config.setGroupName(groupName);
-                            config.setEndpointName(endpointName);
-                            final SimpleCloseable closeable = endpoint.registerRemoteService(config);
-                            // todo - something with that closeable
-                        } catch (IOException e) {
-                            log.error(e, "Unable to register remote service");
-                        }
-                    }
-                    break;
-                }
-                case SERVICE_UNADVERTISE: {
-                    final int serviceId = buffer.getInt();
-                    IoUtils.safeClose(remoteServices.get(serviceId));
-                    break;
-                }
-                default: {
-                    log.error("Malformed packet received (invalid message type %s)", msgType);
-                }
-            }
-        } catch (BufferUnderflowException e) {
-            log.error("Malformed packet received (buffer underflow)");
-        }
-    }
-
-    public void handleWritable(final AllocatedMessageChannel channel) {
-        for (;;) {
-            final WriteHandler handler = outputQueue.peek();
-            if (handler == null) {
-                return;
-            }
-            try {
-                if (handler.handleWrite(channel)) {
-                    log.trace("Handled write with handler %s", handler);
-                    pending.decrementAndGet();
-                    outputQueue.remove();
-                } else {
-                    channel.resumeWrites();
-                    return;
-                }
-            } catch (Throwable t) {
-                pending.decrementAndGet();
-                outputQueue.remove();
-            }
-        }
-    }
-
-    public void handleClosed(final AllocatedMessageChannel channel) {
-    }
-
-    RequestHandlerSource getRemoteService(final int id) {
-        return new RequestHandlerSourceImpl(allocator, id);
-    }
-
-    private final class ReplyHandlerImpl implements ReplyHandler {
-
-        private final AllocatedMessageChannel channel;
-        private final int requestId;
-        private final BufferAllocator<ByteBuffer> allocator;
-
-        private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int requestId, final BufferAllocator<ByteBuffer> allocator) {
-            if (channel == null) {
-                throw new NullPointerException("channel is null");
-            }
-            if (allocator == null) {
-                throw new NullPointerException("allocator is null");
-            }
-            this.channel = channel;
-            this.requestId = requestId;
-            this.allocator = allocator;
-        }
-
-        public void handleReply(final Object reply) throws IOException {
-            ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) MessageType.REPLY.getId());
-            buffer.putInt(requestId);
-            try {
-                final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
-                try {
-                    final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
-                    final ByteOutput output = createByteOutput(allocator, bufferList);
-                    try {
-                        marshaller.start(output);
-                        marshaller.writeObject(reply);
-                        marshaller.close();
-                        output.close();
-                        registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
-                    } finally {
-                        IoUtils.safeClose(output);
-                    }
-                } finally {
-                    IoUtils.safeClose(marshaller);
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new InterruptedIOException("Reply handler thread interrupted before a reply could be sent");
-            }
-        }
-
-        public void handleException(final IOException exception) throws IOException {
-            try {
-                final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
-                try {
-                    final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
-                    final ByteOutput output = createByteOutput(allocator, bufferList);
-                    try {
-                        marshaller.write(MessageType.REQUEST_FAILED.getId());
-                        marshaller.writeInt(requestId);
-                        marshaller.start(output);
-                        marshaller.writeObject(exception);
-                        marshaller.close();
-                        output.close();
-                        registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
-                    } finally {
-                        IoUtils.safeClose(output);
-                    }
-                } finally {
-                    IoUtils.safeClose(marshaller);
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new InterruptedIOException("Reply handler thread interrupted before an exception could be sent");
-            }
-        }
-
-        public void handleCancellation() throws InterruptedIOException {
-            final ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) MessageType.CANCEL_ACK.getId());
-            buffer.putInt(requestId);
-            buffer.flip();
-            try {
-                registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new InterruptedIOException("Reply handler thread interrupted before cancellation could be sent");
-            }
-        }
-    }
-
-    // Writer members
-
-    private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
-    private final AtomicInteger pending = new AtomicInteger();
-
-    private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler writeHandler) throws InterruptedException {
-        outputQueue.put(writeHandler);
-        if (pending.getAndIncrement() == 0) {
-            channel.resumeWrites();
-        }
-    }
-
-    // Reader utils
-
-    private String readUTFZ(ByteBuffer buffer) {
-        StringBuilder builder = new StringBuilder();
-        int state = 0, a = 0;
-        while (buffer.hasRemaining()) {
-            final int v = buffer.get() & 0xff;
-            switch (state) {
-                case 0: {
-                    if (v == 0) {
-                        return builder.toString();
-                    } else if (v < 128) {
-                        builder.append((char) v);
-                    } else if (192 <= v && v < 224) {
-                        a = v << 6;
-                        state = 1;
-                    } else if (224 <= v && v < 232) {
-                        a = v << 12;
-                        state = 2;
-                    } else {
-                        builder.append('?');
-                    }
-                    break;
-                }
-                case 1: {
-                    if (v == 0) {
-                        builder.append('?');
-                        return builder.toString();
-                    } else if (128 <= v && v < 192) {
-                        a |= v & 0x3f;
-                        builder.append((char) a);
-                    } else {
-                        builder.append('?');
-                    }
-                    state = 0;
-                    break;
-                }
-                case 2: {
-                    if (v == 0) {
-                        builder.append('?');
-                        return builder.toString();
-                    } else if (128 <= v && v < 192) {
-                        a |= (v & 0x3f) << 6;
-                        state = 1;
-                    } else {
-                        builder.append('?');
-                        state = 0;
-                    }
-                    break;
-                }
-                default:
-                    throw new IllegalStateException("wrong state");
-            }
-        }
-        return builder.toString();
-    }
-
-    // client endpoint
-
-    private final class RequestHandlerImpl extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
-
-        private final int identifier;
-        private final BufferAllocator<ByteBuffer> allocator;
-
-        public RequestHandlerImpl(final int identifier, final BufferAllocator<ByteBuffer> allocator) {
-            super(executor);
-            if (allocator == null) {
-                throw new NullPointerException("allocator is null");
-            }
-            this.identifier = identifier;
-            this.allocator = allocator;
-            addCloseHandler(new CloseHandler<RequestHandler>() {
-                public void handleClose(final RequestHandler closed) {
-                    remoteClients.remove(identifier);
-                    ByteBuffer buffer = allocator.allocate();
-                    buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
-                    buffer.putInt(identifier);
-                    buffer.flip();
-                    try {
-                        registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
-                    } catch (InterruptedException e) {
-                        log.warn("Client close notification was interrupted before it could be sent");
-                    }
-                }
-            });
-        }
-
-        public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
-            log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
-            try {
-                final List<ByteBuffer> bufferList;
-                final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
-                try {
-                    bufferList = new ArrayList<ByteBuffer>();
-                    final ByteOutput output = createByteOutput(allocator, bufferList);
-                    try {
-                        marshaller.write(MessageType.REQUEST.getId());
-                        marshaller.writeInt(identifier);
-
-                        final int id = nextRequest();
-                        remoteRequests.put(id, handler);
-                        marshaller.writeInt(id);
-                        marshaller.writeObject(request);
-                        marshaller.close();
-                        output.close();
-                        try {
-                            registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                            executor.execute(new Runnable() {
-                                public void run() {
-                                    SpiUtils.safeHandleCancellation(handler);
-                                }
-                            });
-                            return SpiUtils.getBlankRemoteRequestContext();
-                        }
-                        log.trace("Sent request %s", request);
-                        return new RemoteRequestContextImpl(id, allocator, channel);
-                    } finally {
-                        IoUtils.safeClose(output);
-                    }
-                } finally {
-                    IoUtils.safeClose(marshaller);
-                }
-            } catch (final IOException t) {
-                log.trace(t, "receiveRequest failed with an exception");
-                executor.execute(new Runnable() {
-                    public void run() {
-                        SpiUtils.safeHandleException(handler, t);
-                    }
-                });
-                return SpiUtils.getBlankRemoteRequestContext();
-            }
-        }
-
-        public String toString() {
-            return "forwarding request handler <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
-        }
-    }
-
-    public final class RemoteRequestContextImpl implements RemoteRequestContext {
-
-        private final BufferAllocator<ByteBuffer> allocator;
-        private final int id;
-        private final AllocatedMessageChannel channel;
-
-        public RemoteRequestContextImpl(final int id, final BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
-            this.id = id;
-            this.allocator = allocator;
-            this.channel = channel;
-        }
-
-        public void cancel() {
-            try {
-                final ByteBuffer buffer = allocator.allocate();
-                buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
-                buffer.putInt(id);
-                buffer.flip();
-                registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
-            } catch (InterruptedException e) {
-                // todo log that cancel attempt failed
-                Thread.currentThread().interrupt();
-            } catch (Throwable t) {
-                // todo log that cancel attempt failed
-            }
-        }
-    }
-
-    public final class RequestHandlerSourceImpl extends AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
-
-        private final BufferAllocator<ByteBuffer> allocator;
-        private final int identifier;
-
-        protected RequestHandlerSourceImpl(final BufferAllocator<ByteBuffer> allocator, final int identifier) {
-            super(executor);
-            this.allocator = allocator;
-            this.identifier = identifier;
-            addCloseHandler(new CloseHandler<RequestHandlerSource>() {
-                public void handleClose(final RequestHandlerSource closed) {
-                    ByteBuffer buffer = allocator.allocate();
-                    buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
-                    buffer.putInt(identifier);
-                    buffer.flip();
-                    try {
-                        registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
-                    } catch (InterruptedException e) {
-                        log.warn("Service close notification was interrupted before it could be sent");
-                    }
-                }
-            });
-        }
-
-        public Handle<RequestHandler> createRequestHandler() throws IOException {
-            final int id = nextRemoteClient();
-            final RequestHandler requestHandler = new RequestHandlerImpl(id, MultiplexHandler.this.allocator);
-            remoteClients.put(id, requestHandler);
-            final ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) MessageType.CLIENT_OPEN.getId());
-            buffer.putInt(identifier);
-            buffer.putInt(id);
-            buffer.flip();
-            // todo - probably should bail out if we're interrupted?
-            boolean intr = false;
-            for (;;) {
-                try {
-                    registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
-                    try {
-                        return new RequestHandlerImpl(id, allocator).getHandle();
-                    } finally {
-                        if (intr) {
-                            Thread.currentThread().interrupt();
-                        }
-                    }
-                } catch (InterruptedException e) {
-                    intr = true;
-                }
-            }
-        }
-
-        public String toString() {
-            return "forwarding request handler source <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
-        }
-    }
-
-    public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer> allocator, final Collection<ByteBuffer> target) {
-        return new ByteOutput() {
-            private ByteBuffer current;
-
-            private ByteBuffer getCurrent() {
-                final ByteBuffer buffer = current;
-                return buffer == null ? (current = allocator.allocate()) : buffer;
-            }
-
-            public void write(final int i) {
-                final ByteBuffer buffer = getCurrent();
-                buffer.put((byte) i);
-                if (! buffer.hasRemaining()) {
-                    buffer.flip();
-                    target.add(buffer);
-                    current = null;
-                }
-            }
-
-            public void write(final byte[] bytes) {
-                write(bytes, 0, bytes.length);
-            }
-
-            public void write(final byte[] bytes, int offs, int len) {
-                while (len > 0) {
-                    final ByteBuffer buffer = getCurrent();
-                    final int c = Math.min(len, buffer.remaining());
-                    buffer.put(bytes, offs, c);
-                    offs += c;
-                    len -= c;
-                    if (! buffer.hasRemaining()) {
-                        buffer.flip();
-                        target.add(buffer);
-                        current = null;
-                    }
-                }
-            }
-
-            public void close() {
-                flush();
-            }
-
-            public void flush() {
-                final ByteBuffer buffer = current;
-                if (buffer != null) {
-                    buffer.flip();
-                    target.add(buffer);
-                    current = null;
-                }
-            }
-        };
-    }
-
-    private final ProtocolObjectTableWriter protocolObjectTableWriter = new ProtocolObjectTableWriter();
-
-    public class ProtocolObjectTableWriter implements ObjectTable.Writer {
-
-        public void writeObject(final Marshaller marshaller, final Object o) throws IOException {
-            final RequestHandler requestHandler = (RequestHandler) o;
-            final int existingId = forwardedClients.get(requestHandler, -1);
-            marshaller.write(1);
-            if (existingId == -1) {
-                final int newId = nextForwardedClient();
-                forwardedClients.put(newId, requestHandler.getHandle());
-                marshaller.writeInt(newId);
-            } else {
-                marshaller.writeInt(existingId);
-            }
-        }
-    }
-
-    public class ProtocolObjectTable implements ObjectTable {
-
-        public Writer getObjectWriter(final Object o) throws IOException {
-            if (o instanceof RequestHandler) {
-                return protocolObjectTableWriter;
-            } else {
-                return null;
-            }
-        }
-
-        public Object readObject(final Unmarshaller unmarshaller) throws IOException, ClassNotFoundException {
-            switch (unmarshaller.readByte()) {
-                case 1: {
-                    // remote client
-                    final int id = unmarshaller.readInt();
-                    return remoteClients.get(id);
-                }
-                case 2: {
-                    // remote client source
-                    final int id = unmarshaller.readInt();
-                    return remoteServices.get(id);
-                }
-                default: {
-                    // invalid
-                    throw new InvalidObjectException("Invalid ID sent for protocol object table");
-                }
-            }
-        }
-    }
-}

Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -22,12 +22,7 @@
 
 package org.jboss.remoting.protocol.multiplex;
 
-import org.jboss.remoting.RemotingException;
-import org.jboss.remoting.SimpleCloseable;
 import org.jboss.remoting.Endpoint;
-import org.jboss.remoting.spi.RequestHandlerSource;
-import org.jboss.remoting.spi.Handle;
-import org.jboss.remoting.spi.AbstractSimpleCloseable;
 import org.jboss.xnio.IoHandlerFactory;
 import org.jboss.xnio.ChannelSource;
 import org.jboss.xnio.IoFuture;
@@ -35,7 +30,6 @@
 import org.jboss.xnio.IoHandler;
 import org.jboss.xnio.channels.AllocatedMessageChannel;
 import java.io.IOException;
-import java.util.concurrent.Executor;
 
 /**
  *
@@ -55,7 +49,7 @@
     public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Endpoint endpoint, final MultiplexConfiguration configuration) {
         return new IoHandlerFactory<AllocatedMessageChannel>() {
             public IoHandler<? super AllocatedMessageChannel> createHandler() {
-                return new MultiplexHandler(endpoint, configuration);
+                return new SimpleMultiplexHandler(endpoint, configuration);
             }
         };
     }
@@ -69,29 +63,13 @@
      * @return a handle which may be used to close the connection
      * @throws IOException if an error occurs
      */
-    public static IoFuture<SimpleCloseable> connect(final Endpoint endpoint, final MultiplexConfiguration configuration, final ChannelSource<AllocatedMessageChannel> channelSource) throws IOException {
-        final MultiplexHandler multiplexHandler = new MultiplexHandler(endpoint, configuration);
-        final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(multiplexHandler);
-        return new AbstractConvertingIoFuture<SimpleCloseable, AllocatedMessageChannel>(futureChannel) {
-            protected SimpleCloseable convert(final AllocatedMessageChannel channel) throws RemotingException {
-                return new AbstractConnection(configuration.getExecutor()) {
-                    // todo - this method is not called by anyone?
-                    public Handle<RequestHandlerSource> getServiceForId(final int id) throws IOException {
-                        return multiplexHandler.getRemoteService(id).getHandle();
-                    }
-                };
+    public static IoFuture<MultiplexConnection> connect(final Endpoint endpoint, final MultiplexConfiguration configuration, final ChannelSource<AllocatedMessageChannel> channelSource) throws IOException {
+        final SimpleMultiplexHandler handler = new SimpleMultiplexHandler(endpoint, configuration);
+        final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(handler);
+        return new AbstractConvertingIoFuture<MultiplexConnection, AllocatedMessageChannel>(futureChannel) {
+            protected MultiplexConnection convert(final AllocatedMessageChannel channel) throws IOException {
+                return handler.getConnection().get();
             }
         };
     }
-
-    private abstract static class AbstractConnection extends AbstractSimpleCloseable {
-
-        protected AbstractConnection(final Executor executor) {
-            super(executor);
-        }
-
-        public String toString() {
-            return "Remoting multiplex connection <" + Integer.toString(hashCode()) + ">";
-        }
-    }
 }

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReadHandler.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,378 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.IoReadHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Buffers;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.ReplyException;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.ServiceRegistrationException;
+import org.jboss.remoting.util.QualifiedName;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.Marshalling;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ *
+ */
+public final class MultiplexReadHandler implements IoReadHandler<AllocatedMessageChannel> {
+
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex");
+    private static final StackTraceElement[] emptyStackTraceElements = new StackTraceElement[0];
+    private final MultiplexConnection connection;
+
+    public MultiplexReadHandler(final MultiplexConnection connection) {
+        this.connection = connection;
+    }
+
+    public void handleReadable(final AllocatedMessageChannel channel) {
+        final MultiplexConnection connection = this.connection;
+        final MarshallerFactory marshallerFactory = connection.getMarshallerFactory();
+        final MarshallingConfiguration marshallingConfiguration = connection.getMarshallingConfiguration();
+        for (;;) try {
+            final ByteBuffer buffer;
+            try {
+                buffer = channel.receive();
+            } catch (IOException e) {
+                log.error(e, "I/O error in protocol channel; closing channel");
+                IoUtils.safeClose(channel);
+                return;
+            }
+            if (buffer == null) {
+                IoUtils.safeClose(channel);
+                return;
+            }
+            if (! buffer.hasRemaining()) {
+                // would block
+                channel.resumeReads();
+                return;
+            }
+            final MessageType msgType;
+            try {
+                msgType = MessageType.getMessageType(buffer.get() & 0xff);
+            } catch (IllegalArgumentException ex) {
+                log.trace("Received invalid message type");
+                return;
+            }
+            log.trace("Received message type %s; dump:\n%s", msgType, Buffers.createDumper(buffer, 8, 1));
+            switch (msgType) {
+                case REQUEST: {
+                    final int clientId = buffer.getInt();
+                    final Handle<RequestHandler> handle = connection.getForwardedClient(clientId);
+                    if (handle == null) {
+                        log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
+                        break;
+                    }
+                    final int requestId = buffer.getInt();
+                    final Object payload;
+                    try {
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+                        try {
+                            unmarshaller.start(Marshalling.createByteInput(buffer));
+                            payload = unmarshaller.readObject();
+                            unmarshaller.finish();
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (Exception ex) {
+                        // IOException | ClassNotFoundException
+                        log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
+                        try {
+                            final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+                            try {
+                                List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+                                marshaller.start(new BufferByteOutput(connection.getAllocator(), buffers));
+                                marshaller.write(MessageType.REQUEST_RECEIVE_FAILED.getId());
+                                ex.setStackTrace(emptyStackTraceElements);
+                                final IOException ioe = new IOException("Request receive failed");
+                                ioe.initCause(ex);
+                                ioe.setStackTrace(emptyStackTraceElements);
+                                marshaller.writeObject(ioe);
+                                marshaller.finish();
+                                connection.doBlockingWrite(buffers);
+                            } finally {
+                                IoUtils.safeClose(marshaller);
+                            }
+                        } catch (IOException ioe) {
+                            log.warn("Failed to send notification of failure to unmarshal a request: %s", ioe);
+                        }
+                        break;
+                    }
+                    // request received OK
+                    final RequestHandler requestHandler = handle.getResource();
+                    requestHandler.receiveRequest(payload, new MultiplexReplyHandler(requestId, connection));
+                    break;
+                }
+                case REPLY: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = connection.removeRemoteRequest(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final Object payload;
+                    try {
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+                        try {
+                            unmarshaller.start(Marshalling.createByteInput(buffer));
+                            payload = unmarshaller.readObject();
+                            unmarshaller.finish();
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (Exception ex) {
+                        // IOException | ClassNotFoundException
+                        log.trace("Failed to unmarshal a reply (%s), sending a ReplyException", ex);
+                        SpiUtils.safeHandleException(replyHandler, new ReplyException("Unmarshal failed", ex));
+                        break;
+                    }
+                    SpiUtils.safeHandleReply(replyHandler, payload);
+                    break;
+                }
+                case CANCEL_REQUEST: {
+                    final int requestId = buffer.getInt();
+                    final RemoteRequestContext context = connection.getLocalRequest(requestId);
+                    if (context != null) {
+                        context.cancel();
+                    }
+                    break;
+                }
+                case CANCEL_ACK: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = connection.getRemoteRequest(requestId);
+                    if (replyHandler != null) {
+                        SpiUtils.safeHandleCancellation(replyHandler);
+                    }
+                    break;
+                }
+                case REQUEST_RECEIVE_FAILED: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = connection.removeRemoteRequest(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final IOException cause;
+                    try {
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+                        try {
+                            unmarshaller.start(Marshalling.createByteInput(buffer));
+                            cause = (IOException) unmarshaller.readObject();
+                            unmarshaller.finish();
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (IOException e) {
+                        SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote operation failed; the remote exception could not be read", e));
+                        break;
+                    } catch (ClassNotFoundException e) {
+                        SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote operation failed; the remote exception could not be read", e));
+                        break;
+                    }
+                    SpiUtils.safeHandleException(replyHandler, cause);
+                    break;
+                }
+                case REQUEST_FAILED: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = connection.removeRemoteRequest(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final IOException cause;
+                    try {
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+                        try {
+                            unmarshaller.start(Marshalling.createByteInput(buffer));
+                            try {
+                                cause = (IOException) unmarshaller.readObject();
+                            } catch (ClassNotFoundException e) {
+                                SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an ClassNotFoundException occurred when attempting to unmarshal the cause)"));
+                                log.trace(e, "Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+                                break;
+                            } catch (ClassCastException e) {
+                                SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an ClassCastException occurred when attempting to unmarshal the cause)"));
+                                log.trace(e, "Class cast exception in exception reply to request ID %d", Integer.valueOf(requestId));
+                                break;
+                            }
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (IOException ex) {
+                        log.trace("Failed to unmarshal an exception reply (%s), sending a generic execution exception");
+                        SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred when attempting to read the cause)"));
+                        break;
+                    }
+                    SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote execution failed", cause));
+                    break;
+                }
+                case CLIENT_CLOSE: {
+                    final int clientId = buffer.getInt();
+                    final Handle<RequestHandler> handle = connection.removeForwardedClient(clientId);
+                    if (handle == null) {
+                        log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
+                        break;
+                    }
+                    IoUtils.safeClose(handle);
+                    break;
+                }
+                case CLIENT_OPEN: {
+                    final int serviceId = buffer.getInt();
+                    final int clientId = buffer.getInt();
+                    final Handle<RequestHandlerSource> handle = connection.getForwardedService(serviceId);
+                    if (handle == null) {
+                        log.warn("Received client open message for unknown service %d", Integer.valueOf(serviceId));
+                        break;
+                    }
+                    try {
+                        final RequestHandlerSource requestHandlerSource = handle.getResource();
+                        final Handle<RequestHandler> clientHandle = requestHandlerSource.createRequestHandler();
+                        log.trace("Opening client %d from service %d", Integer.valueOf(clientId), Integer.valueOf(serviceId));
+                        connection.addForwardedClient(clientId, clientHandle);
+                    } catch (IOException ex) {
+                        log.error(ex, "Failed to create a request handler for client ID %d", Integer.valueOf(clientId));
+                        break;
+                    } finally {
+                        IoUtils.safeClose(handle);
+                    }
+                    break;
+                }
+                case SERVICE_OPEN_REQUEST: {
+                    final int serviceId = buffer.getInt();
+                    final QualifiedName qualifiedName = MultiplexConnection.getQualifiedName(buffer);
+                    final Handle<RequestHandlerSource> service = connection.getService(qualifiedName);
+                    if (service == null) {
+                        ByteBuffer replyBuffer = ByteBuffer.allocate(5);
+                        replyBuffer.put((byte) MessageType.SERVICE_OPEN_NOT_FOUND.getId());
+                        replyBuffer.putInt(serviceId);
+                        replyBuffer.flip();
+                        try {
+                            connection.doBlockingWrite(replyBuffer);
+                        } catch (IOException e) {
+                            log.error(e, "Failed to send an error reply to an invalid service open request");
+                        }
+                        break;
+                    }
+                    final Handle<RequestHandlerSource> ourHandle;
+                    try {
+                        ourHandle = service.getResource().getHandle();
+                    } catch (IOException e) {
+                        log.error("Failed to acquire a handle to registered service: %s", e);
+                        ByteBuffer replyBuffer = ByteBuffer.allocate(5);
+                        replyBuffer.put((byte) MessageType.SERVICE_OPEN_FAILED.getId());
+                        replyBuffer.putInt(serviceId);
+                        replyBuffer.flip();
+                        try {
+                            connection.doBlockingWrite(replyBuffer);
+                        } catch (IOException e2) {
+                            log.trace(e, "Failed to send an exception reply to a service open request");
+                        }
+                        break;
+                    }
+                    connection.addForwadedService(serviceId, ourHandle);
+                    ByteBuffer replyBuffer = ByteBuffer.allocate(5);
+                    replyBuffer.put((byte) MessageType.SERVICE_OPEN_REPLY.getId());
+                    replyBuffer.putInt(serviceId);
+                    replyBuffer.flip();
+                    try {
+                        connection.doBlockingWrite(replyBuffer);
+                    } catch (IOException e) {
+                        log.trace(e, "Failed to send a reply to a service open request");
+                    }
+                    break;
+                }
+                case SERVICE_OPEN_FAILED:
+                case SERVICE_OPEN_NOT_FOUND:
+                case SERVICE_OPEN_FORBIDDEN: {
+                    final int serviceId = buffer.getInt();
+                    final FutureRemoteRequestHandlerSource future = connection.removeFutureRemoteService(serviceId);
+                    if (future == null) {
+                        log.trace("Service open failure reply received for unknown service ID %d", Integer.valueOf(serviceId));
+                        break;
+                    }
+                    future.setException(
+                            msgType == MessageType.SERVICE_OPEN_NOT_FOUND ? new ServiceRegistrationException("Service not found") :
+                            msgType == MessageType.SERVICE_OPEN_FORBIDDEN ? new ServiceRegistrationException("Service open forbidden") :
+                            new ServiceRegistrationException("Service open failed")
+                    );
+                    break;
+                }
+                case SERVICE_OPEN_REPLY: {
+                    final int serviceId = buffer.getInt();
+                    final FutureRemoteRequestHandlerSource future = connection.getFutureRemoteService(serviceId);
+                    if (future == null) {
+                        log.trace("Service open reply received for unknown service ID %d", Integer.valueOf(serviceId));
+                        break;
+                    }
+                    final MultiplexRequestHandlerSource requestHandlerSource = new MultiplexRequestHandlerSource(serviceId, connection);
+                    future.setResult(requestHandlerSource);
+                    break;
+                }
+                case SERVICE_CLOSE_NOTIFY: {
+                    final int serviceId = buffer.getInt();
+                    final FutureRemoteRequestHandlerSource future = connection.removeFutureRemoteService(serviceId);
+                    future.addNotifier(new IoFuture.HandlingNotifier<RequestHandlerSource>() {
+                        public void handleDone(final RequestHandlerSource result) {
+                            IoUtils.safeClose(result);
+                        }
+                    });
+                    break;
+                }
+                case SERVICE_CLOSE_REQUEST: {
+                    final int serviceId = buffer.getInt();
+                    final Handle<RequestHandlerSource> handle = connection.removeForwardedService(serviceId);
+                    if (handle == null) {
+                        log.trace("Received service close request on unknown ID %d", Integer.valueOf(serviceId));
+                        break;
+                    }
+                    IoUtils.safeClose(handle);
+                    break;
+                }
+                default: {
+                    log.error("Malformed packet received (invalid message type %s)", msgType);
+                }
+                case CONNECTION_CLOSE:
+                    break;
+            }
+        } catch (BufferUnderflowException e) {
+            log.error(e, "Malformed packet received (buffer underflow)");
+        }
+    }
+}

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexReplyHandler.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.ByteOutput;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ *
+ */
+final class MultiplexReplyHandler implements ReplyHandler {
+
+    private final int requestId;
+    private final MultiplexConnection connection;
+
+    MultiplexReplyHandler(final int requestId, final MultiplexConnection connection) {
+        this.requestId = requestId;
+        this.connection = connection;
+    }
+
+    public void handleReply(final Object reply) throws IOException {
+        final MultiplexConnection connection = this.connection;
+        final Marshaller marshaller = connection.getMarshallerFactory().createMarshaller(connection.getMarshallingConfiguration());
+        try {
+            final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+            final ByteOutput output = new BufferByteOutput(connection.getAllocator(), bufferList);
+            try {
+                marshaller.start(output);
+                marshaller.write(MessageType.REPLY.getId());
+                marshaller.writeInt(requestId);
+                marshaller.writeObject(reply);
+                marshaller.close();
+                output.close();
+                connection.doBlockingWrite(bufferList);
+            } finally {
+                IoUtils.safeClose(output);
+            }
+        } finally {
+            IoUtils.safeClose(marshaller);
+        }
+    }
+
+    public void handleException(final IOException exception) throws IOException {
+        final MultiplexConnection connection = this.connection;
+        final Marshaller marshaller = connection.getMarshallerFactory().createMarshaller(connection.getMarshallingConfiguration());
+        try {
+            final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+            final ByteOutput output = new BufferByteOutput(connection.getAllocator(), bufferList);
+            try {
+                marshaller.start(output);
+                marshaller.write(MessageType.REQUEST_FAILED.getId());
+                marshaller.writeInt(requestId);
+                marshaller.writeObject(exception);
+                marshaller.close();
+                output.close();
+                connection.doBlockingWrite(bufferList);
+            } finally {
+                IoUtils.safeClose(output);
+            }
+        } finally {
+            IoUtils.safeClose(marshaller);
+        }
+    }
+
+    public void handleCancellation() throws IOException {
+        final ByteBuffer buffer = ByteBuffer.allocate(5);
+        buffer.put((byte) MessageType.CANCEL_ACK.getId());
+        buffer.putInt(requestId);
+        buffer.flip();
+        connection.doBlockingWrite(buffer);
+    }
+}

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandler.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,136 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.remoting.spi.RemoteRequestContext;
+import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.ByteOutput;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class MultiplexRequestHandler extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex.request-handler");
+
+    private final int identifier;
+    private final BufferAllocator<ByteBuffer> allocator;
+    private final MultiplexConnection connection;
+
+    public MultiplexRequestHandler(final int identifier, final MultiplexConnection connection) {
+        super(connection.getExecutor());
+        this.connection = connection;
+        this.identifier = identifier;
+        allocator = connection.getAllocator();
+    }
+
+    @Override
+    protected void closeAction() throws IOException {
+        connection.removeRemoteClient(identifier);
+        ByteBuffer buffer = allocator.allocate();
+        buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
+        buffer.putInt(identifier);
+        buffer.flip();
+        connection.doBlockingWrite(buffer);
+    }
+
+    public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
+        log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
+        final List<ByteBuffer> bufferList;
+        final MultiplexConnection connection = this.connection;
+        try {
+            final Marshaller marshaller = connection.getMarshallerFactory().createMarshaller(connection.getMarshallingConfiguration());
+            try {
+                bufferList = new ArrayList<ByteBuffer>();
+                final ByteOutput output = new BufferByteOutput(allocator, bufferList);
+                try {
+                    marshaller.start(output);
+                    marshaller.write(MessageType.REQUEST.getId());
+                    marshaller.writeInt(identifier);
+                    final int id = connection.nextRequest();
+                    connection.addRemoteRequest(id, handler);
+                    marshaller.writeInt(id);
+                    marshaller.writeObject(request);
+                    marshaller.close();
+                    output.close();
+                    connection.doBlockingWrite(bufferList);
+                    log.trace("Sent request %s", request);
+                    return new RemoteRequestContextImpl(id, connection);
+                } finally {
+                    IoUtils.safeClose(output);
+                }
+            } finally {
+                IoUtils.safeClose(marshaller);
+            }
+        } catch (final IOException t) {
+            log.trace(t, "receiveRequest failed with an exception");
+            SpiUtils.safeHandleException(handler, t);
+            return SpiUtils.getBlankRemoteRequestContext();
+        }
+    }
+
+    public String toString() {
+        return "forwarding request handler <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+    }
+}
+
+final class RemoteRequestContextImpl implements RemoteRequestContext {
+
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex.requesthandler.context");
+
+    private final int id;
+    private final MultiplexConnection connection;
+    private final AtomicBoolean cancelSent = new AtomicBoolean();
+
+    public RemoteRequestContextImpl(final int id, final MultiplexConnection connection) {
+        this.id = id;
+        this.connection = connection;
+    }
+
+    public void cancel() {
+        if (! cancelSent.getAndSet(true)) try {
+            final ByteBuffer buffer = ByteBuffer.allocate(5);
+            buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
+            buffer.putInt(id);
+            buffer.flip();
+            connection.doBlockingWrite(buffer);
+        } catch (Throwable t) {
+            log.warn("Sending cancel request failed: %s", t);
+        }
+    }
+
+    public String toString() {
+        return "remote request context (multiplex) <" + Integer.toString(hashCode(), 16) + "> (id = " + id + ")";
+    }
+}

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexRequestHandlerSource.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.remoting.spi.RequestHandler;
+import org.jboss.remoting.spi.Handle;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import java.nio.ByteBuffer;
+import java.io.IOException;
+
+/**
+ *
+ */
+final class MultiplexRequestHandlerSource extends AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
+
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.multiplex.request-handler-source");
+
+    private final int identifier;
+    private final MultiplexConnection connection;
+
+    MultiplexRequestHandlerSource(final int identifier, final MultiplexConnection connection) {
+        super(connection.getExecutor());
+        this.connection = connection;
+        this.identifier = identifier;
+    }
+
+    @Override
+    protected void closeAction() throws IOException {
+        ByteBuffer buffer = ByteBuffer.allocate(5);
+        buffer.put((byte) MessageType.SERVICE_CLOSE_REQUEST.getId());
+        buffer.putInt(identifier);
+        buffer.flip();
+        connection.doBlockingWrite(buffer);
+    }
+
+    public Handle<RequestHandler> createRequestHandler() throws IOException {
+        final int id = connection.nextRemoteClient();
+        final RequestHandler requestHandler = new MultiplexRequestHandler(id, connection);
+        boolean ok = false;
+        try {
+            connection.addRemoteClient(id, requestHandler);
+            try {
+                final ByteBuffer buffer = ByteBuffer.allocate(9);
+                buffer.put((byte) MessageType.CLIENT_OPEN.getId());
+                buffer.putInt(identifier);
+                buffer.putInt(id);
+                buffer.flip();
+                connection.doBlockingWrite(buffer);
+                final Handle<RequestHandler> handlerHandle = new MultiplexRequestHandler(id, connection).getHandle();
+                ok = true;
+                return handlerHandle;
+            } finally {
+                if (! ok) {
+                    connection.removeRemoteClient(id);
+                }
+            }
+        } finally {
+            if (! ok) {
+                IoUtils.safeClose(requestHandler);
+            }
+        }
+    }
+
+    public String toString() {
+        return "forwarding request handler source <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+    }
+}

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleMultiplexHandler.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.DelegatingIoHandler;
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.remoting.Endpoint;
+
+/**
+ *
+ */
+public final class SimpleMultiplexHandler extends DelegatingIoHandler<AllocatedMessageChannel> {
+
+    private volatile MultiplexConnection connection;
+    private final Endpoint endpoint;
+    private final MultiplexConfiguration configuration;
+    private final FutureConnection futureConnection = new FutureConnection();
+
+    public SimpleMultiplexHandler(final Endpoint endpoint, final MultiplexConfiguration configuration) {
+        this.endpoint = endpoint;
+        this.configuration = configuration;
+    }
+
+    public void handleOpened(final AllocatedMessageChannel channel) {
+        connection = new MultiplexConnection(endpoint, channel, configuration);
+        futureConnection.setResult(connection);
+        setReadHandler(new MultiplexReadHandler(connection));
+        channel.resumeReads();
+    }
+
+    public void handleClosed(final AllocatedMessageChannel channel) {
+        IoUtils.safeClose(connection);
+    }
+
+    public IoFuture<MultiplexConnection> getConnection() {
+        return futureConnection;
+    }
+
+    public static final class FutureConnection extends AbstractIoFuture<MultiplexConnection> {
+        public IoFuture<MultiplexConnection> cancel() {
+            return this;
+        }
+
+        protected boolean setResult(final MultiplexConnection result) {
+            return super.setResult(result);
+        }
+    }
+}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -1,84 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.multiplex;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.log.Logger;
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-final class SimpleWriteHandler implements WriteHandler {
-    private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
-
-    private final BufferAllocator<ByteBuffer> allocator;
-    private final ByteBuffer[] buffers;
-
-    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final List<ByteBuffer> buffers) {
-        this.allocator = allocator;
-        this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
-        logBufferSize();
-    }
-
-    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer[] buffers) {
-        this.allocator = allocator;
-        this.buffers = buffers;
-        logBufferSize();
-    }
-
-    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer buffer) {
-        this.allocator = allocator;
-        buffers = new ByteBuffer[] { buffer };
-        logBufferSize();
-    }
-
-    private void logBufferSize() {
-        if (log.isTrace()) {
-            long t = 0L;
-            for (ByteBuffer buf : buffers) {
-                t += (long)buf.remaining();
-            }
-            log.trace("Writing a message of size %d", Long.valueOf(t));
-        }
-    }
-
-    public boolean handleWrite(final WritableMessageChannel channel) {
-        boolean done = true;
-        try {
-            return (done = channel.send(buffers));
-        } catch (IOException e) {
-            log.trace(e, "Write failed");
-            return true;
-        } finally {
-            if (done) {
-                for (ByteBuffer buffer : buffers) {
-                    allocator.free(buffer);
-                }
-            }
-        }
-    }
-}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -1,32 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.multiplex;
-
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-interface WriteHandler {
-    boolean handleWrite(WritableMessageChannel channel);
-}

Modified: remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -29,12 +29,10 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.CountDownLatch;
-import java.net.URI;
 import java.io.IOException;
 import junit.framework.TestCase;
 import org.jboss.remoting.core.EndpointImpl;
 import org.jboss.remoting.test.support.LoggingHelper;
-import org.jboss.remoting.SimpleCloseable;
 import org.jboss.remoting.LocalServiceConfiguration;
 import org.jboss.remoting.RequestListener;
 import org.jboss.remoting.ClientContext;
@@ -43,6 +41,10 @@
 import org.jboss.remoting.RemoteExecutionException;
 import org.jboss.remoting.ClientSource;
 import org.jboss.remoting.Client;
+import org.jboss.remoting.util.QualifiedName;
+import org.jboss.remoting.spi.NamedServiceRegistry;
+import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.Handle;
 import org.jboss.xnio.BufferAllocator;
 import org.jboss.xnio.IoUtils;
 import org.jboss.xnio.Xnio;
@@ -91,12 +93,10 @@
                         configuration.setExecutor(closeableExecutor);
                         configuration.setLinkMetric(10);
                         configuration.setMarshallerFactory(new RiverMarshallerFactory());
+                        final NamedServiceRegistry registry = new NamedServiceRegistry();
+                        configuration.setNamedServiceRegistry(registry);
                         final MarshallingConfiguration marshallingConfiguration = new MarshallingConfiguration();
                         configuration.setMarshallingConfiguration(marshallingConfiguration);
-                        final IoHandlerFactory<AllocatedMessageChannel> handlerFactory = MultiplexProtocol.createServer(remoteEndpoint, configuration);
-                        final ChannelSource<AllocatedMessageChannel> channelSource = Channels.convertStreamToAllocatedMessage(xnio.createPipeServer(Channels.convertStreamToAllocatedMessage(handlerFactory, 16384, 16384)), 16384, 16384);
-                        final IoFuture<SimpleCloseable> future = MultiplexProtocol.connect(endpoint, configuration, channelSource);
-                        future.get();
                         final LocalServiceConfiguration<Object, Object> localServiceConfiguration = new LocalServiceConfiguration<Object, Object>(new RequestListener<Object, Object>() {
                             public void handleClientOpen(final ClientContext context) {
                                 log.debug("Client open");
@@ -125,15 +125,43 @@
                         localServiceConfiguration.setServiceType("connection.test");
                         localServiceConfiguration.setGroupName("testgroup");
                         localServiceConfiguration.setMetric(10);
-                        remoteEndpoint.registerService(localServiceConfiguration);
-                        final IoFuture<ClientSource<Object,Object>> futureClientSource = endpoint.locateService(new URI("jrs:connection.test::"), Object.class, Object.class);
-                        assertEquals(IoFuture.Status.DONE, futureClientSource.await(1L, TimeUnit.SECONDS));
-                        final ClientSource<Object, Object> clientSource = futureClientSource.get();
-                        final Client<Object,Object> client = clientSource.createClient();
-                        final IoFuture<Object> futureReply = client.send(REQUEST);
-                        assertEquals(IoFuture.Status.DONE, futureReply.await(1L, TimeUnit.SECONDS));
-                        assertEquals(REPLY, futureReply.get());
-                        assertTrue(latch.await(1L, TimeUnit.SECONDS));
+                        final Handle<RequestHandlerSource> requestHandlerSourceHandle = remoteEndpoint.registerService(localServiceConfiguration);
+                        try {
+                            registry.registerService(QualifiedName.parse("/test/connectiontest"), requestHandlerSourceHandle.getResource());
+                            final IoHandlerFactory<AllocatedMessageChannel> handlerFactory = MultiplexProtocol.createServer(remoteEndpoint, configuration);
+                            final ChannelSource<AllocatedMessageChannel> channelSource = Channels.convertStreamToAllocatedMessage(xnio.createPipeServer(Channels.convertStreamToAllocatedMessage(handlerFactory, 16384, 16384)), 16384, 16384);
+                            final IoFuture<MultiplexConnection> future = MultiplexProtocol.connect(endpoint, configuration, channelSource);
+                            final MultiplexConnection connection = future.get();
+                            try {
+                                final Handle<RequestHandlerSource> remoteHandlerSource = connection.openRemoteService(QualifiedName.parse("/test/connectiontest"));
+                                try {
+                                    final ClientSource<Object, Object> clientSource = endpoint.createClientSource(remoteHandlerSource.getResource(), Object.class, Object.class);
+                                    try {
+                                        final Client<Object,Object> client = clientSource.createClient();
+                                        try {
+                                            final IoFuture<Object> futureReply = client.send(REQUEST);
+                                            assertEquals(IoFuture.Status.DONE, futureReply.await(1L, TimeUnit.SECONDS));
+                                            assertEquals(REPLY, futureReply.get());
+                                            client.close();
+                                            clientSource.close();
+                                            remoteHandlerSource.close();
+                                            connection.close();
+                                            assertTrue(latch.await(1L, TimeUnit.SECONDS));
+                                        } finally {
+                                            IoUtils.safeClose(client);
+                                        }
+                                    } finally {
+                                        IoUtils.safeClose(clientSource);
+                                    }
+                                } finally {
+                                    IoUtils.safeClose(remoteHandlerSource);
+                                }
+                            } finally {
+                                IoUtils.safeClose(connection);
+                            }
+                        } finally {
+                            IoUtils.safeClose(requestHandlerSourceHandle);
+                        }
                     } finally {
                         endpoint.stop();
                     }

Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -363,7 +363,7 @@
                                 return subject.substring(position, nextDelim);
                             }
                         } finally {
-                            position = nextDelim;
+                            position = nextDelim == -1 ? -1 : nextDelim + 1;
                         }
                     }
 

Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/QualifiedName.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -100,9 +100,18 @@
 
     public static QualifiedName parse(String path) {
         List<String> decoded = new ArrayList<String>();
+        boolean first = true;
         for (String segment : CollectionUtil.split("/", path)) {
-            if (segment.length() == 0) {
-                throw new IllegalArgumentException("Empty segment in path");
+            if (first) {
+                if (segment.length() > 0) {
+                    throw new IllegalArgumentException("Relative paths are not allowed");
+                }
+                first = false;
+                continue;
+            } else {
+                if (segment.length() == 0) {
+                    throw new IllegalArgumentException("Empty segment in path");
+                }
             }
             try {
                 decoded.add(URLDecoder.decode(segment, "utf-8"));

Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedCollection.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -15,7 +15,7 @@
         monitor = this;
     }
 
-    protected SynchronizedCollection(final Collection<V> delegate, final Object monitor) {
+    public SynchronizedCollection(final Collection<V> delegate, final Object monitor) {
         this.delegate = delegate;
         this.monitor = monitor;
     }

Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java	2008-11-14 04:08:51 UTC (rev 4678)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/SynchronizedSet.java	2008-11-14 06:41:47 UTC (rev 4679)
@@ -11,7 +11,7 @@
         super(delegate);
     }
 
-    protected SynchronizedSet(final Set<K> delegate, final Object monitor) {
+    public SynchronizedSet(final Set<K> delegate, final Object monitor) {
         super(delegate, monitor);
     }
 }




More information about the jboss-remoting-commits mailing list