[jboss-remoting-commits] JBoss Remoting SVN: r4601 - in remoting3/trunk: api/src/main/java/org/jboss/remoting/spi and 11 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Mon Oct 20 22:39:25 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-10-20 22:39:24 -0400 (Mon, 20 Oct 2008)
New Revision: 4601

Added:
   remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java
   remoting3/trunk/protocol/multiplex/
   remoting3/trunk/protocol/multiplex/src/
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
   remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/
   remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
Removed:
   remoting3/trunk/protocol/basic/
   remoting3/trunk/protocol/multiplex/src/
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/basic/
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicHandler.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicProtocol.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
   remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
   remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/basic/
   remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java
   remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java
   remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java
   remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java
   remoting3/trunk/build.properties
   remoting3/trunk/build.xml
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
   remoting3/trunk/testing-support/src/main/resources/testing.policy
   remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java
Log:
The basic protocol is really a multiplex protocol.  Make room for a *real* basic protocol.

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/SpiUtils.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -28,6 +28,7 @@
 import org.jboss.remoting.RequestContext;
 import org.jboss.remoting.CloseHandler;
 import org.jboss.xnio.log.Logger;
+import java.io.IOException;
 
 /**
  * Utility methods for Remoting service providers.
@@ -41,12 +42,11 @@
      * Safely notify a reply handler of an exception.
      *
      * @param replyHandler the reply handler
-     * @param msg the message
-     * @param cause the cause
+     * @param exception
      */
-    public static void safeHandleException(final ReplyHandler replyHandler, final String msg, final Throwable cause) {
+    public static void safeHandleException(final ReplyHandler replyHandler, final IOException exception) {
         try {
-            replyHandler.handleException(msg, cause);
+            replyHandler.handleException(exception);
         } catch (Throwable t) {
             log.error(t, "Failed to properly handle exception");
         }

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/ReplyHandler.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -22,6 +22,8 @@
 
 package org.jboss.remoting.spi.remote;
 
+import java.io.IOException;
+
 /**
  * A handler for replies from a request.  The handler should respect the first invocation made on it, and ignore
  * any subsequent invocations.
@@ -36,13 +38,11 @@
     void handleReply(Object reply);
 
     /**
-     * Handle a remote exception.
+     * Handle an exception.
      *
-     * @param msg the message
-     * @param cause the cause
+     * @param exception an exception which describes the problem
      */
-    // TODO - change to accept a RemotingException instead?
-    void handleException(final String msg, Throwable cause);
+    void handleException(IOException exception);
 
     /**
      * Handle a cancellation request.

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/remote/RequestHandler.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -44,7 +44,7 @@
     /**
      * Receive a request from a remote system.  This method is intended to be called by protocol handlers.  If the
      * request cannot be accepted for some reason, the
-     * {@link ReplyHandler#handleException(String, Throwable)}
+     * {@link ReplyHandler#handleException(java.io.IOException)}
      * method is called immediately.
      *
      * @param request the request

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamContext.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -23,7 +23,9 @@
 package org.jboss.remoting.spi.stream;
 
 import java.util.concurrent.Executor;
-import org.jboss.marshalling.MarshallerFactory;
+import java.io.IOException;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
 
 /**
  * A context for stream serialization.
@@ -31,16 +33,25 @@
 public interface StreamContext {
 
     /**
-     * Get an executor which may be used for various asynchronous tasks.
+     * Get an executor which may be used by a stream serializer for various asynchronous tasks.
      *
      * @return an executor
      */
     Executor getExecutor();
 
     /**
-     * Get a marshaller factory which is configured compatibly with the channel.
+     * Create a marshaller which is configured compatibly with the channel.
      *
-     * @return the marshaller factory
+     * @return a marshaller
      */
-    MarshallerFactory getMarshallerFactory();
+    Marshaller createMarshaller() throws IOException;
+
+    /**
+     * Create an unmarshaller which is configured compatibly with the channel.
+     *
+     * @return an unmarshaller
+     */
+    Unmarshaller createUnmarshaller() throws IOException;
+
+    // todo - getter & setter for child ObjectTable, ClassTable, ExternalizerFactory, etc. for marshaller and unmarshaller
 }

Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/stream/StreamProvider.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.spi.stream;
+
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.Acceptor;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import java.io.IOException;
+
+/**
+ * A provider for streams.
+ *
+ * @param <A> the address type
+ */
+public interface StreamProvider<A> {
+    Connector<A, StreamChannel> getStreamChannelConnector();
+
+    Connector<A, AllocatedMessageChannel> getMessageChannelConnector();
+
+    Acceptor<A, StreamChannel> getStreamChannelAcceptor();
+
+    Acceptor<A, AllocatedMessageChannel> getMessageChannelAcceptor();
+}

Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/build.properties	2008-10-21 02:39:24 UTC (rev 4601)
@@ -115,7 +115,7 @@
 lib.jboss-managed.local=${local.repository}/${lib.jboss-managed.local-path}
 lib.jboss-managed.remote=${remote.repository}/${lib.jboss-managed.remote-path}
 
-lib.marshalling-api.version=1.0.0.Beta1
+lib.marshalling-api.version=1.0.0.Beta2
 lib.marshalling-api.name=marshalling-api.jar
 lib.marshalling-api.license=lgpl
 lib.marshalling-api.dir=jboss/marshalling/${lib.marshalling-api.version}/lib
@@ -179,7 +179,7 @@
 lib.trove.local=${local.repository}/${lib.trove.path}
 lib.trove.remote=${remote.repository}/${lib.trove.path}
 
-lib.xnio.version=1.1.0.CR1
+lib.xnio.version=1.2.0.Alpha2008101601
 
 lib.xnio-api.name=xnio-api-${lib.xnio.version}.jar
 lib.xnio-api.license=lgpl
@@ -188,10 +188,10 @@
 lib.xnio-api.local=${local.repository}/${lib.xnio-api.path}
 lib.xnio-api.remote=${remote.repository}/${lib.xnio-api.path}
 
-lib.xnio-standalone.name=xnio-standalone-${lib.xnio.version}.jar
-lib.xnio-standalone.license=lgpl
-lib.xnio-standalone.dir=maven2/org/jboss/xnio/xnio-standalone/${lib.xnio.version}
-lib.xnio-standalone.path=${lib.xnio-standalone.dir}/${lib.xnio-standalone.name}
-lib.xnio-standalone.local=${local.repository}/${lib.xnio-standalone.path}
-lib.xnio-standalone.remote=${remote.repository}/${lib.xnio-standalone.path}
+lib.xnio-nio.name=xnio-nio-${lib.xnio.version}.jar
+lib.xnio-nio.license=lgpl
+lib.xnio-nio.dir=maven2/org/jboss/xnio/xnio-nio/${lib.xnio.version}
+lib.xnio-nio.path=${lib.xnio-nio.dir}/${lib.xnio-nio.name}
+lib.xnio-nio.local=${local.repository}/${lib.xnio-nio.path}
+lib.xnio-nio.remote=${remote.repository}/${lib.xnio-nio.path}
 

Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/build.xml	2008-10-21 02:39:24 UTC (rev 4601)
@@ -211,16 +211,16 @@
         <get src="${remote.license.dir}/${lib.xnio-api.license}.txt" dest="${lib.xnio-api.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
     </target>
 
-    <!-- External library: XNIO standalone -->
+    <!-- External library: XNIO nio -->
 
-    <target name="lib.xnio-standalone-check">
-        <available property="lib.xnio-standalone.exists" file="${lib.xnio-standalone.local}"/>
+    <target name="lib.xnio-nio-check">
+        <available property="lib.xnio-nio.exists" file="${lib.xnio-nio.local}"/>
     </target>
 
-    <target name="lib.xnio-standalone" depends="lib.xnio-standalone-check" unless="lib.xnio-standalone.exists">
-        <mkdir dir="${local.repository}/${lib.xnio-standalone.dir}"/>
-        <get src="${lib.xnio-standalone.remote}" dest="${lib.xnio-standalone.local}" usetimestamp="true" ignoreerrors="false"/>
-        <get src="${remote.license.dir}/${lib.xnio-standalone.license}.txt" dest="${lib.xnio-standalone.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
+    <target name="lib.xnio-nio" depends="lib.xnio-nio-check" unless="lib.xnio-nio.exists">
+        <mkdir dir="${local.repository}/${lib.xnio-nio.dir}"/>
+        <get src="${lib.xnio-nio.remote}" dest="${lib.xnio-nio.local}" usetimestamp="true" ignoreerrors="false"/>
+        <get src="${remote.license.dir}/${lib.xnio-nio.license}.txt" dest="${lib.xnio-nio.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
     </target>
 
     <!-- ============================================== -->
@@ -800,7 +800,8 @@
                 <pathelement location="${lib.junit.local}"/>
                 <pathelement location="${lib.marshalling-api.local}"/>
                 <pathelement location="${lib.river.local}"/>
-                <pathelement location="${lib.xnio-standalone.local}"/>
+                <pathelement location="${lib.xnio-api.local}"/>
+                <pathelement location="${lib.xnio-nio.local}"/>
             </classpath>
         </javac>
         <touch file="protocol/basic/target/test/.lastcompile" verbose="false"/>
@@ -816,7 +817,8 @@
             <sysproperty key="ant.library.dir" value="${ant.home}/lib"/>
             <sysproperty key="lib.junit.local" value="${lib.junit.local}"/>
             <sysproperty key="lib.marshalling-api.local" value="${lib.marshalling-api.local}"/>
-            <sysproperty key="lib.xnio-standalone.local" value="${lib.xnio-standalone.local}"/>
+            <sysproperty key="lib.xnio-api.local" value="${lib.xnio-api.local}"/>
+            <sysproperty key="lib.xnio-nio.local" value="${lib.xnio-nio.local}"/>
             <jvmarg line="${test.jvmargs}"/>
             <formatter type="plain" extension="${extension}"/>
             <classpath>
@@ -829,7 +831,8 @@
                 <pathelement location="${lib.junit.local}"/>
                 <pathelement location="${lib.marshalling-api.local}"/>
                 <pathelement location="${lib.river.local}"/>
-                <pathelement location="${lib.xnio-standalone.local}"/>
+                <pathelement location="${lib.xnio-api.local}"/>
+                <pathelement location="${lib.xnio-nio.local}"/>
             </classpath>
             <batchtest fork="yes" todir="protocol/basic/target/test-results"
                        haltonfailure="no">
@@ -840,7 +843,7 @@
         </junit>
     </target>
 
-    <target name="protocol.basic.test" depends="lib.xnio-standalone,api,core,protocol.basic,testing-support,util,protocol.basic.test.compile">
+    <target name="protocol.basic.test" depends="lib.xnio-nio,api,core,protocol.basic,testing-support,util,protocol.basic.test.compile">
         <antcall inheritall="true" inheritrefs="true" target="protocol.basic.test.pseudotarget">
             <param name="extension" value=".txt"/>
             <param name="message" value="Running with no security manager"/>

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -22,12 +22,12 @@
 
 package org.jboss.remoting.core;
 
-import org.jboss.remoting.RemotingException;
 import org.jboss.remoting.spi.remote.ReplyHandler;
 import org.jboss.remoting.spi.remote.RemoteRequestContext;
 import org.jboss.xnio.AbstractIoFuture;
 import org.jboss.xnio.IoFuture;
 import java.util.concurrent.Executor;
+import java.io.IOException;
 
 /**
  *
@@ -67,8 +67,8 @@
             setResult((O) reply);
         }
 
-        public void handleException(final String exMsg, final Throwable exCause) {
-            setException(new RemotingException(exMsg, exCause));
+        public void handleException(final IOException exception) {
+            setException(exception);
         }
 
         public void handleCancellation() {

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -82,9 +82,9 @@
                 try {
                     requestListener.handleRequest(context, (I) request);
                 } catch (RemoteExecutionException e) {
-                    SpiUtils.safeHandleException(replyHandler, e.getMessage(), e.getCause());
+                    SpiUtils.safeHandleException(replyHandler, e);
                 } catch (Throwable t) {
-                    SpiUtils.safeHandleException(replyHandler, "Unexpected exception in request listener", t);
+                    SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Request handler threw an exception", t));
                 }
             }
         });

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -26,6 +26,7 @@
 import org.jboss.remoting.ClientContext;
 import org.jboss.remoting.RemotingException;
 import org.jboss.remoting.RequestCancelHandler;
+import org.jboss.remoting.RemoteExecutionException;
 import org.jboss.remoting.core.util.TaggingExecutor;
 import org.jboss.remoting.spi.remote.ReplyHandler;
 import org.jboss.remoting.spi.SpiUtils;
@@ -81,7 +82,7 @@
     public void sendFailure(final String msg, final Throwable cause) throws RemotingException, IllegalStateException {
         if (! closed.getAndSet(true)) {
             if (replyHandler != null) {
-                replyHandler.handleException(msg, cause);
+                replyHandler.handleException(new RemoteExecutionException(msg, cause));
             }
         } else {
             throw new IllegalStateException("Reply already sent");

Copied: remoting3/trunk/protocol/multiplex (from rev 4573, remoting3/trunk/protocol/basic)

Copied: remoting3/trunk/protocol/multiplex/src (from rev 4600, remoting3/trunk/protocol/basic/src)

Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex (from rev 4573, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic)

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/AbstractConnection.java	2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,44 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.remoting.spi.AbstractSimpleCloseable;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public abstract class AbstractConnection extends AbstractSimpleCloseable {
-    /**
-     * Basic constructor.
-     *
-     * @param executor the executor used to execute the close notification handlers
-     */
-    protected AbstractConnection(final Executor executor) {
-        super(executor);
-    }
-
-    public String toString() {
-        return "connection <" + Integer.toString(hashCode()) + ">";
-    }
-}

Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/AbstractConnection.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/AbstractConnection.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.AbstractSimpleCloseable;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public abstract class AbstractConnection extends AbstractSimpleCloseable {
+    /**
+     * Basic constructor.
+     *
+     * @param executor the executor used to execute the close notification handlers
+     */
+    protected AbstractConnection(final Executor executor) {
+        super(executor);
+    }
+
+    public String toString() {
+        return "connection <" + Integer.toString(hashCode()) + ">";
+    }
+}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicHandler.java	2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicHandler.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,915 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import org.jboss.xnio.IoHandler;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.log.Logger;
-import org.jboss.remoting.spi.remote.RequestHandler;
-import org.jboss.remoting.spi.remote.RequestHandlerSource;
-import org.jboss.remoting.spi.remote.ReplyHandler;
-import org.jboss.remoting.spi.remote.RemoteRequestContext;
-import org.jboss.remoting.spi.remote.Handle;
-import org.jboss.remoting.spi.SpiUtils;
-import org.jboss.remoting.spi.AbstractAutoCloseable;
-import static org.jboss.remoting.util.CollectionUtil.concurrentIntegerMap;
-import org.jboss.remoting.util.CollectionUtil;
-import org.jboss.remoting.util.ConcurrentIntegerMap;
-import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.Endpoint;
-import org.jboss.remoting.SimpleCloseable;
-import org.jboss.marshalling.MarshallerFactory;
-import org.jboss.marshalling.ByteInput;
-import org.jboss.marshalling.Unmarshaller;
-import org.jboss.marshalling.ByteOutput;
-import org.jboss.marshalling.Marshaller;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.nio.ByteBuffer;
-import java.nio.BufferUnderflowException;
-import java.io.IOException;
-
-/**
- *
- */
-public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
-
-    private static final Logger log = Logger.getLogger(BasicHandler.class);
-
-    //--== Connection configuration items ==--
-    private final MarshallerFactory marshallerFactory;
-    private final int linkMetric;
-    private final Executor executor;
-    private final ClassLoader classLoader;
-    // buffer allocator for outbound message assembly
-    private final BufferAllocator<ByteBuffer> allocator;
-
-    // running on remote node
-    private final ConcurrentIntegerMap<ReplyHandler> remoteRequests = concurrentIntegerMap();
-    // running on local node
-    private final ConcurrentIntegerMap<RemoteRequestContext> localRequests = concurrentIntegerMap();
-    // sequence for remote requests
-    private final AtomicInteger requestSequence = new AtomicInteger();
-
-    // clients whose requests get forwarded to the remote side
-    // even #s were opened from services forwarded to us (our sequence)
-    // odd #s were forwarded directly to us (remote sequence)
-    private final ConcurrentIntegerMap<RequestHandler> remoteClients = concurrentIntegerMap();
-    // forwarded to remote side (handled on this side)
-    private final ConcurrentIntegerMap<Handle<RequestHandler>> forwardedClients = concurrentIntegerMap();
-    // sequence for forwarded clients (unsigned; shift left one bit, add one)
-    private final AtomicInteger forwardedClientSequence = new AtomicInteger();
-    // sequence for clients created from services forwarded to us (unsigned; shift left one bit)
-    private final AtomicInteger remoteClientSequence = new AtomicInteger();
-
-    // services forwarded to us
-    private final ConcurrentIntegerMap<RequestHandlerSource> remoteServices = concurrentIntegerMap();
-    // forwarded to remote side (handled on this side)
-    private final ConcurrentIntegerMap<Handle<RequestHandlerSource>> forwardedServices = concurrentIntegerMap();
-    // sequence for forwarded services
-    private final AtomicInteger serviceSequence = new AtomicInteger();
-
-    private volatile AllocatedMessageChannel channel;
-
-    public BasicHandler(final RemotingChannelConfiguration configuration) {
-        allocator = configuration.getAllocator();
-        executor = configuration.getExecutor();
-        classLoader = configuration.getClassLoader();
-        marshallerFactory = configuration.getMarshallerFactory();
-        linkMetric = configuration.getLinkMetric();
-    }
-
-    public void handleOpened(final AllocatedMessageChannel channel) {
-        channel.resumeReads();
-    }
-
-    public void handleReadable(final AllocatedMessageChannel channel) {
-        for (;;) try {
-            final ByteBuffer buffer;
-            try {
-                buffer = channel.receive();
-            } catch (IOException e) {
-                log.error(e, "I/O error in protocol channel; closing channel");
-                IoUtils.safeClose(channel);
-                return;
-            }
-            if (buffer == null) {
-                // todo release all handles...
-                // todo what if the write queue is not empty?
-                IoUtils.safeClose(channel);
-                return;
-            }
-            if (! buffer.hasRemaining()) {
-                // would block
-                channel.resumeReads();
-                return;
-            }
-            final MessageType msgType;
-            try {
-                msgType = MessageType.getMessageType(buffer.get() & 0xff);
-            } catch (IllegalArgumentException ex) {
-                log.trace("Received invalid message type");
-                return;
-            }
-            log.trace("Received message %s, type %s", buffer, msgType);
-            switch (msgType) {
-                case REQUEST_ONEWAY: {
-                    final int clientId = buffer.getInt();
-                    final Handle<RequestHandler> handle = forwardedClients.get(clientId);
-                    if (handle == null) {
-                        log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
-                        return;
-                    }
-                    final Object payload;
-                    try {
-                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
-                        try {
-                            unmarshaller.start(createByteInput(buffer, true));
-                            try {
-                                payload = unmarshaller.readObject();
-                            } catch (ClassNotFoundException e) {
-                                log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
-                                break;
-                            }
-                        } finally {
-                            IoUtils.safeClose(unmarshaller);
-                        }
-                    } catch (IOException ex) {
-                        log.error(ex, "Failed to unmarshal a one-way request");
-                        break;
-                    }
-                    final RequestHandler requestHandler = handle.getResource();
-                    try {
-                        requestHandler.receiveRequest(payload);
-                    } catch (Throwable t) {
-                        log.error(t, "One-way request handler unexpectedly threw an exception");
-                    }
-                    break;
-                }
-                case REQUEST: {
-                    final int clientId = buffer.getInt();
-                    final Handle<RequestHandler> handle = forwardedClients.get(clientId);
-                    if (handle == null) {
-                        log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
-                        break;
-                    }
-                    final int requestId = buffer.getInt();
-                    final Object payload;
-                    try {
-                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
-                        try {
-                            unmarshaller.start(createByteInput(buffer, true));
-                            try {
-                                payload = unmarshaller.readObject();
-                            } catch (ClassNotFoundException e) {
-                                log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
-                                // todo - send request receive failed message
-                                break;
-                            }
-                        } finally {
-                            IoUtils.safeClose(unmarshaller);
-                        }
-                    } catch (IOException ex) {
-                        log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
-                        // todo send a request failure message
-                        break;
-                    }
-                    final RequestHandler requestHandler = handle.getResource();
-                    requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel, requestId, allocator));
-                    break;
-                }
-                case REPLY: {
-                    final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
-                    if (replyHandler == null) {
-                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
-                        break;
-                    }
-                    final Object payload;
-                    try {
-                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
-                        try {
-                            unmarshaller.start(createByteInput(buffer, true));
-                            try {
-                                payload = unmarshaller.readObject();
-                            } catch (ClassNotFoundException e) {
-                                replyHandler.handleException("Reply unmarshalling failed", e);
-                                log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
-                                break;
-                            }
-                        } finally {
-                            IoUtils.safeClose(unmarshaller);
-                        }
-                    } catch (IOException ex) {
-                        log.trace("Failed to unmarshal a reply (%s), sending a ReplyException");
-                        // todo
-                        SpiUtils.safeHandleException(replyHandler, null, null);
-                        break;
-                    }
-                    SpiUtils.safeHandleReply(replyHandler, payload);
-                    break;
-                }
-                case CANCEL_REQUEST: {
-                    final int requestId = buffer.getInt();
-                    final RemoteRequestContext context = localRequests.get(requestId);
-                    if (context != null) {
-                        context.cancel();
-                    }
-                    break;
-                }
-                case CANCEL_ACK: {
-                    final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = remoteRequests.get(requestId);
-                    if (replyHandler != null) {
-                        replyHandler.handleCancellation();
-                    }
-                    break;
-                }
-                case REQUEST_RECEIVE_FAILED: {
-                    final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
-                    if (replyHandler == null) {
-                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
-                        break;
-                    }
-                    final String reason = readUTFZ(buffer);
-                    // todo - throw a new ReplyException
-                    break;
-                }
-                case REQUEST_FAILED: {
-                    final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
-                    if (replyHandler == null) {
-                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
-                        break;
-                    }
-                    final Throwable cause;
-                    try {
-                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
-                        try {
-                            unmarshaller.start(createByteInput(buffer, true));
-                            try {
-                                cause = (Throwable) unmarshaller.readObject();
-                            } catch (ClassNotFoundException e) {
-                                replyHandler.handleException("Exception reply unmarshalling failed", e);
-                                log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
-                                break;
-                            } catch (ClassCastException e) {
-                                // todo - report a generic exception
-                                SpiUtils.safeHandleException(replyHandler, null, null);
-                                break;
-                            }
-                        } finally {
-                            IoUtils.safeClose(unmarshaller);
-                        }
-                    } catch (IOException ex) {
-                        log.trace("Failed to unmarshal an exception reply (%s), sending a generic execution exception");
-                        // todo
-                        SpiUtils.safeHandleException(replyHandler, null, null);
-                        break;
-                    }
-                    // todo - wrap with REE
-                    SpiUtils.safeHandleException(replyHandler, null, cause);
-                    break;
-                }
-                case REQUEST_OUTCOME_UNKNOWN: {
-                    final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
-                    if (replyHandler == null) {
-                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
-                        break;
-                    }
-                    final String reason = readUTFZ(buffer);
-                    // todo - throw a new IndetermOutcomeEx
-                    break;
-                }
-                case CLIENT_CLOSE: {
-                    final int clientId = buffer.getInt();
-                    final Handle<RequestHandler> handle = forwardedClients.remove(clientId);
-                    if (handle == null) {
-                        log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
-                        break;
-                    }
-                    IoUtils.safeClose(handle);
-                    break;
-                }
-                case CLIENT_OPEN: {
-                    final int serviceId = buffer.getInt();
-                    final int clientId = buffer.getInt();
-                    final Handle<RequestHandlerSource> handle = forwardedServices.get(serviceId);
-                    if (handle == null) {
-                        log.warn("Received client open message for unknown service %d", Integer.valueOf(serviceId));
-                        break;
-                    }
-                    try {
-                        final RequestHandlerSource requestHandlerSource = handle.getResource();
-                        final Handle<RequestHandler> clientHandle = requestHandlerSource.createRequestHandler();
-                        // todo check for duplicate
-                        // todo validate the client ID
-                        log.trace("Opening client %d from service %d", Integer.valueOf(clientId), Integer.valueOf(serviceId));
-                        forwardedClients.put(clientId, clientHandle);
-                    } catch (IOException ex) {
-                        log.error(ex, "Failed to create a request handler for client ID %d", Integer.valueOf(clientId));
-                        break;
-                    } finally {
-                        IoUtils.safeClose(handle);
-                    }
-                    break;
-                }
-                case SERVICE_CLOSE: {
-                    final Handle<RequestHandlerSource> handle = forwardedServices.remove(buffer.getInt());
-                    if (handle == null) {
-                        break;
-                    }
-                    IoUtils.safeClose(handle);
-                    break;
-                }
-                case SERVICE_ADVERTISE: {
-                    final int serviceId = buffer.getInt();
-                    final String serviceType = readUTFZ(buffer);
-                    final String groupName = readUTFZ(buffer);
-                    final String endpointName = readUTFZ(buffer);
-                    final int baseMetric = buffer.getInt();
-                    Endpoint endpoint = null;
-                    int id = -1;
-                    final RequestHandlerSource handlerSource = new RequestHandlerSourceImpl(allocator, id);
-                    final int calcMetric = baseMetric + linkMetric;
-                    if (calcMetric > 0) {
-                        try {
-                            final SimpleCloseable closeable = endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource, calcMetric);
-                            // todo - something with that closeable
-                        } catch (IOException e) {
-                            log.error(e, "Unable to register remote service");
-                        }
-                    }
-                    break;
-                }
-                case SERVICE_UNADVERTISE: {
-                    final int serviceId = buffer.getInt();
-                    IoUtils.safeClose(remoteServices.get(serviceId));
-                    break;
-                }
-                default: {
-                    log.trace("Received invalid message type %s", msgType);
-                }
-            }
-        } catch (BufferUnderflowException e) {
-            log.error(e, "Malformed packet");
-        }
-    }
-
-    public void handleWritable(final AllocatedMessageChannel channel) {
-        for (;;) {
-            final WriteHandler handler = outputQueue.peek();
-            if (handler == null) {
-                return;
-            }
-            try {
-                if (handler.handleWrite(channel)) {
-                    log.trace("Handled write with handler %s", handler);
-                    pending.decrementAndGet();
-                    outputQueue.remove();
-                } else {
-                    channel.resumeWrites();
-                    return;
-                }
-            } catch (Throwable t) {
-                pending.decrementAndGet();
-                outputQueue.remove();
-            }
-        }
-    }
-
-    public void handleClosed(final AllocatedMessageChannel channel) {
-    }
-
-    RequestHandlerSource getRemoteService(final int id) {
-        return new RequestHandlerSourceImpl(allocator, id);
-    }
-
-    private final class ReplyHandlerImpl implements ReplyHandler {
-
-        private final AllocatedMessageChannel channel;
-        private final int requestId;
-        private final BufferAllocator<ByteBuffer> allocator;
-
-        private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int requestId, final BufferAllocator<ByteBuffer> allocator) {
-            if (channel == null) {
-                throw new NullPointerException("channel is null");
-            }
-            if (allocator == null) {
-                throw new NullPointerException("allocator is null");
-            }
-            this.channel = channel;
-            this.requestId = requestId;
-            this.allocator = allocator;
-        }
-
-        public void handleReply(final Object reply) {
-            ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) MessageType.REPLY.getId());
-            buffer.putInt(requestId);
-            try {
-                final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller();
-                try {
-                    final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
-                    final ByteOutput output = createByteOutput(allocator, bufferList);
-                    try {
-                        marshaller.start(output);
-                        marshaller.writeObject(reply);
-                        marshaller.close();
-                        output.close();
-                        registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
-                    } finally {
-                        IoUtils.safeClose(output);
-                    }
-                } finally {
-                    IoUtils.safeClose(marshaller);
-                }
-            } catch (IOException e) {
-                log.error(e, "Failed to send a reply to the remote side");
-            } catch (InterruptedException e) {
-                log.error(e, "Reply handler thread interrupted before a reply could be sent");
-                Thread.currentThread().interrupt();
-            }
-        }
-
-        public void handleException(final String msg, final Throwable cause) {
-            ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) MessageType.REQUEST_FAILED.getId());
-            buffer.putInt(requestId);
-            try {
-                final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller();
-                try {
-                    final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
-                    final ByteOutput output = createByteOutput(allocator, bufferList);
-                    try {
-                        marshaller.start(output);
-                        marshaller.writeObject(cause);
-                        marshaller.close();
-                        output.close();
-                        registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
-                    } finally {
-                        IoUtils.safeClose(output);
-                    }
-                } finally {
-                    IoUtils.safeClose(marshaller);
-                }
-            } catch (IOException e) {
-                log.error(e, "Failed to send an exception to the remote side");
-            } catch (InterruptedException e) {
-                log.error(e, "Reply handler thread interrupted before an exception could be sent");
-                Thread.currentThread().interrupt();
-            }
-        }
-
-        public void handleCancellation() {
-            final ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) MessageType.CANCEL_ACK.getId());
-            buffer.putInt(requestId);
-            buffer.flip();
-            try {
-                registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
-            } catch (InterruptedException e) {
-                // todo log
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    // Writer members
-
-    private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
-    private final AtomicInteger pending = new AtomicInteger();
-
-    private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler writeHandler) throws InterruptedException {
-        outputQueue.put(writeHandler);
-        if (pending.getAndIncrement() == 0) {
-            channel.resumeWrites();
-        }
-    }
-
-    private int writeUTFZ(ByteBuffer buffer, CharSequence s) {
-        final int len = s.length();
-        for (int i = 0; i < len; i++) {
-            char c = s.charAt(i);
-            if (1 <= c && c < 0x80) {
-                if (buffer.hasRemaining()) {
-                    buffer.put((byte) c);
-                } else {
-                    return i;
-                }
-            } else if (c < 0x0800) {
-                if (buffer.remaining() >= 2) {
-                    buffer.put((byte) (0xc0 | (c >> 6)));
-                    buffer.put((byte) (0x80 | (c & 0x3f)));
-                } else {
-                    return i;
-                }
-            } else {
-                if (buffer.remaining() >= 3) {
-                    buffer.put((byte) (0xe0 | (c >> 12)));
-                    buffer.put((byte) (0x80 | ((c >> 6) & 0x3f)));
-                    buffer.put((byte) (0x80 | (c & 0x3f)));
-                } else {
-                    return i;
-                }
-            }
-        }
-        if (buffer.hasRemaining()) {
-            buffer.put((byte) 0);
-            return -1;
-        } else {
-            return len;
-        }
-    }
-
-    // Reader utils
-
-    private String readUTFZ(ByteBuffer buffer) {
-        StringBuilder builder = new StringBuilder();
-        int state = 0, a = 0;
-        while (buffer.hasRemaining()) {
-            final int v = buffer.get() & 0xff;
-            switch (state) {
-                case 0: {
-                    if (v == 0) {
-                        return builder.toString();
-                    } else if (v < 128) {
-                        builder.append((char) v);
-                    } else if (192 <= v && v < 224) {
-                        a = v << 6;
-                        state = 1;
-                    } else if (224 <= v && v < 232) {
-                        a = v << 12;
-                        state = 2;
-                    } else {
-                        builder.append('?');
-                    }
-                    break;
-                }
-                case 1: {
-                    if (v == 0) {
-                        builder.append('?');
-                        return builder.toString();
-                    } else if (128 <= v && v < 192) {
-                        a |= v & 0x3f;
-                        builder.append((char) a);
-                    } else {
-                        builder.append('?');
-                    }
-                    state = 0;
-                    break;
-                }
-                case 2: {
-                    if (v == 0) {
-                        builder.append('?');
-                        return builder.toString();
-                    } else if (128 <= v && v < 192) {
-                        a |= (v & 0x3f) << 6;
-                        state = 1;
-                    } else {
-                        builder.append('?');
-                        state = 0;
-                    }
-                    break;
-                }
-                default:
-                    throw new IllegalStateException("wrong state");
-            }
-        }
-        return builder.toString();
-    }
-
-    // client endpoint
-
-    private final class RequestHandlerImpl extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
-
-        private final int identifier;
-        private final BufferAllocator<ByteBuffer> allocator;
-
-        public RequestHandlerImpl(final int identifier, final BufferAllocator<ByteBuffer> allocator) {
-            super(executor);
-            if (allocator == null) {
-                throw new NullPointerException("allocator is null");
-            }
-            this.identifier = identifier;
-            this.allocator = allocator;
-            addCloseHandler(new CloseHandler<RequestHandler>() {
-                public void handleClose(final RequestHandler closed) {
-                    remoteClients.remove(identifier, this);
-                    ByteBuffer buffer = allocator.allocate();
-                    buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
-                    buffer.putInt(identifier);
-                    buffer.flip();
-                    try {
-                        registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
-                    } catch (InterruptedException e) {
-                        log.warn("Client close notification was interrupted before it could be sent");
-                    }
-                }
-            });
-        }
-
-        public void receiveRequest(final Object request) {
-            log.trace("Sending outbound one-way request of type %s", request == null ? "null" : request.getClass());
-            try {
-                final List<ByteBuffer> bufferList;
-                final Marshaller marshaller = marshallerFactory.createMarshaller();
-                try {
-                    bufferList = new ArrayList<ByteBuffer>();
-                    final ByteOutput output = createByteOutput(allocator, bufferList);
-                    try {
-                        marshaller.write(MessageType.REQUEST_ONEWAY.getId());
-                        marshaller.writeInt(identifier);
-                        marshaller.writeObject(request);
-                        marshaller.close();
-                        output.close();
-                    } finally {
-                        IoUtils.safeClose(output);
-                    }
-                } finally {
-                    IoUtils.safeClose(marshaller);
-                }
-                try {
-                    registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
-                } catch (InterruptedException e) {
-                    log.trace(e, "receiveRequest was interrupted");
-                    Thread.currentThread().interrupt();
-                    return;
-                }
-            } catch (Throwable t) {
-                // ignore
-                log.trace(t, "receiveRequest failed with an exception");
-                return;
-            }
-        }
-
-        public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
-            log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
-            try {
-                final List<ByteBuffer> bufferList;
-                final Marshaller marshaller = marshallerFactory.createMarshaller();
-                try {
-                    bufferList = new ArrayList<ByteBuffer>();
-                    final ByteOutput output = createByteOutput(allocator, bufferList);
-                    try {
-                        marshaller.write(MessageType.REQUEST.getId());
-                        marshaller.writeInt(identifier);
-
-                        int id;
-                        do {
-                            id = requestSequence.getAndIncrement();
-                        } while (remoteRequests.putIfAbsent(id, handler) != null);
-                        marshaller.writeInt(id);
-                        marshaller.writeObject(request);
-                        marshaller.close();
-                        output.close();
-                        try {
-                            registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                            executor.execute(new Runnable() {
-                                public void run() {
-                                    SpiUtils.safeHandleCancellation(handler);
-                                }
-                            });
-                            return SpiUtils.getBlankRemoteRequestContext();
-                        }
-                        log.trace("Sent request %s", request);
-                        return new RemoteRequestContextImpl(id, allocator, channel);
-                    } finally {
-                        IoUtils.safeClose(output);
-                    }
-                } finally {
-                    IoUtils.safeClose(marshaller);
-                }
-            } catch (final IOException t) {
-                log.trace(t, "receiveRequest failed with an exception");
-                executor.execute(new Runnable() {
-                    public void run() {
-                        SpiUtils.safeHandleException(handler, "Failed to build request", t);
-                    }
-                });
-                return SpiUtils.getBlankRemoteRequestContext();
-            }
-        }
-
-        public String toString() {
-            return "forwarding request handler <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
-        }
-    }
-
-    public final class RemoteRequestContextImpl implements RemoteRequestContext {
-
-        private final BufferAllocator<ByteBuffer> allocator;
-        private final int id;
-        private final AllocatedMessageChannel channel;
-
-        public RemoteRequestContextImpl(final int id, final BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
-            this.id = id;
-            this.allocator = allocator;
-            this.channel = channel;
-        }
-
-        public void cancel() {
-            try {
-                final ByteBuffer buffer = allocator.allocate();
-                buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
-                buffer.putInt(id);
-                buffer.flip();
-                registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
-            } catch (InterruptedException e) {
-                // todo log that cancel attempt failed
-                Thread.currentThread().interrupt();
-            } catch (Throwable t) {
-                // todo log that cancel attempt failed
-            }
-        }
-    }
-
-    public final class RequestHandlerSourceImpl extends AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
-
-        private final BufferAllocator<ByteBuffer> allocator;
-        private final int identifier;
-
-        protected RequestHandlerSourceImpl(final BufferAllocator<ByteBuffer> allocator, final int identifier) {
-            super(executor);
-            this.allocator = allocator;
-            this.identifier = identifier;
-            addCloseHandler(new CloseHandler<RequestHandlerSource>() {
-                public void handleClose(final RequestHandlerSource closed) {
-                    ByteBuffer buffer = allocator.allocate();
-                    buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
-                    buffer.putInt(identifier);
-                    buffer.flip();
-                    try {
-                        registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
-                    } catch (InterruptedException e) {
-                        log.warn("Service close notification was interrupted before it could be sent");
-                    }
-                }
-            });
-        }
-
-        public Handle<RequestHandler> createRequestHandler() throws IOException {
-            int id;
-            do {
-                id = remoteClientSequence.getAndIncrement() << 1;
-            } while (remoteClients.putIfAbsent(id, new RequestHandlerImpl(id, BasicHandler.this.allocator)) != null);
-            final int clientId = id;
-            final ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) MessageType.CLIENT_OPEN.getId());
-            buffer.putInt(identifier);
-            buffer.putInt(clientId);
-            buffer.flip();
-            // todo - probably should bail out if we're interrupted?
-            boolean intr = false;
-            for (;;) {
-                try {
-                    registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
-                    try {
-                        return new RequestHandlerImpl(clientId, allocator).getHandle();
-                    } finally {
-                        if (intr) {
-                            Thread.currentThread().interrupt();
-                        }
-                    }
-                } catch (InterruptedException e) {
-                    intr = true;
-                }
-            }
-        }
-
-        public String toString() {
-            return "forwarding request handler source <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
-        }
-    }
-
-    public static ByteInput createByteInput(final ByteBuffer buffer, final boolean eof) {
-        return new ByteInput() {
-            public int read() throws IOException {
-                if (buffer.hasRemaining()) {
-                    return buffer.get() & 0xff;
-                } else {
-                    return eof ? -1 : 0;
-                }
-            }
-
-            public int read(final byte[] b) throws IOException {
-                return read(b, 0, b.length);
-            }
-
-            public int read(final byte[] b, final int off, final int len) throws IOException {
-                int r = Math.min(buffer.remaining(), len);
-                if (r > 0) {
-                    buffer.get(b, off, r);
-                    return r;
-                } else {
-                    return eof ? -1 : 0;
-                }
-            }
-
-            public int available() throws IOException {
-                return buffer.remaining();
-            }
-
-            public long skip(final long n) throws IOException {
-                final int cnt = n > (long) Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) n;
-                int r = Math.min(buffer.remaining(), cnt);
-                if (r > 0) {
-                    final int oldPos = buffer.position();
-                    final int newPos = oldPos + r;
-                    if (newPos < 0) {
-                        final int lim = buffer.limit();
-                        buffer.position(lim);
-                        return lim - oldPos;
-                    }
-                }
-                return r;
-            }
-
-            public void close() {
-            }
-        };
-    }
-
-    public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer> allocator, final Collection<ByteBuffer> target) {
-        return new ByteOutput() {
-            private ByteBuffer current;
-
-            private ByteBuffer getCurrent() {
-                final ByteBuffer buffer = current;
-                return buffer == null ? (current = allocator.allocate()) : buffer;
-            }
-
-            public void write(final int i) throws IOException {
-                final ByteBuffer buffer = getCurrent();
-                buffer.put((byte) i);
-                if (! buffer.hasRemaining()) {
-                    buffer.flip();
-                    target.add(buffer);
-                    current = null;
-                }
-            }
-
-            public void write(final byte[] bytes) throws IOException {
-                write(bytes, 0, bytes.length);
-            }
-
-            public void write(final byte[] bytes, int offs, int len) throws IOException {
-                while (len > 0) {
-                    final ByteBuffer buffer = getCurrent();
-                    final int c = Math.min(len, buffer.remaining());
-                    buffer.put(bytes, offs, c);
-                    offs += c;
-                    len -= c;
-                    if (! buffer.hasRemaining()) {
-                        buffer.flip();
-                        target.add(buffer);
-                        current = null;
-                    }
-                }
-            }
-
-            public void close() throws IOException {
-                flush();
-            }
-
-            public void flush() throws IOException {
-                final ByteBuffer buffer = current;
-                if (buffer != null) {
-                    buffer.flip();
-                    target.add(buffer);
-                    current = null;
-                }
-            }
-        };
-    }
-}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java	2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/BasicProtocol.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,96 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.remoting.RemotingException;
-import org.jboss.remoting.SimpleCloseable;
-import org.jboss.remoting.spi.remote.RequestHandlerSource;
-import org.jboss.remoting.spi.remote.Handle;
-import org.jboss.xnio.IoHandlerFactory;
-import org.jboss.xnio.ChannelSource;
-import org.jboss.xnio.IoFuture;
-import org.jboss.xnio.AbstractConvertingIoFuture;
-import org.jboss.xnio.IoHandler;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.log.Logger;
-import org.jboss.xnio.channels.AllocatedMessageChannel;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public final class BasicProtocol {
-
-    private static final Logger log = Logger.getLogger(BasicProtocol.class);
-
-    private BasicProtocol() {
-    }
-
-    /**
-     * Create a request server for the basic protocol.
-     *
-     * @param executor the executor to use for invocations
-     * @param allocator the buffer allocator to use
-     * @return a handler factory for passing to an XNIO server
-     */
-    public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final BufferAllocator<ByteBuffer> allocator) {
-        return new IoHandlerFactory<AllocatedMessageChannel>() {
-            public IoHandler<? super AllocatedMessageChannel> createHandler() {
-                final RemotingChannelConfiguration configuration = new RemotingChannelConfiguration();
-                configuration.setAllocator(allocator);
-                configuration.setExecutor(executor);
-                // todo marshaller factory... etc
-                return new BasicHandler(configuration);
-            }
-        };
-    }
-
-    /**
-     * Create a request client for the basic protocol.
-     *
-     * @param executor the executor to use for invocations
-     * @param channelSource the XNIO channel source to use to establish the connection
-     * @param allocator the buffer allocator to use
-     * @return a handle which may be used to close the connection
-     * @throws IOException if an error occurs
-     */
-    public static IoFuture<SimpleCloseable> connect(final Executor executor, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
-        final RemotingChannelConfiguration configuration = new RemotingChannelConfiguration();
-        configuration.setAllocator(allocator);
-        configuration.setExecutor(executor);
-        // todo marshaller factory... etc
-        final BasicHandler basicHandler = new BasicHandler(configuration);
-        final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(basicHandler);
-        return new AbstractConvertingIoFuture<SimpleCloseable, AllocatedMessageChannel>(futureChannel) {
-            protected SimpleCloseable convert(final AllocatedMessageChannel channel) throws RemotingException {
-                return new AbstractConnection(executor) {
-                    public Handle<RequestHandlerSource> getServiceForId(final int id) throws IOException {
-                        return basicHandler.getRemoteService(id).getHandle();
-                    }
-                };
-            }
-        };
-    }
-}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConfigValue.java	2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,67 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-/**
- *
- */
-public enum ConfigValue {
-
-    /**
-     * The protocol version to use.  Value type is {@code int}.
-     */
-    PROTOCOL_VERSION(0),
-    /**
-     * The name of the marshaller to use.  Value type is {@code String}.
-     */
-    MARSHALLER_NAME(1),
-    ;
-    private final int id;
-
-    private ConfigValue(final int id) {
-        this.id = id;
-    }
-
-    /**
-     * Get the integer ID for this config value.
-     *
-     * @return the integer ID
-     */
-    public int getId() {
-        return id;
-    }
-
-    /**
-     * Get the config value for an integer ID.
-     *
-     * @param id the integer ID
-     * @return the config value instance
-     */
-    public static ConfigValue getConfigValue(final int id) {
-        switch (id) {
-            case 0: return PROTOCOL_VERSION;
-            case 1: return MARSHALLER_NAME;
-            default: throw new IllegalArgumentException("Invalid config value ID");
-        }
-    }
-}

Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConfigValue.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConfigValue.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+/**
+ *
+ */
+public enum ConfigValue {
+
+    /**
+     * The protocol version to use.  Value type is {@code int}.
+     */
+    PROTOCOL_VERSION(0),
+    /**
+     * The name of the marshaller to use.  Value type is {@code String}.
+     */
+    MARSHALLER_NAME(1),
+    ;
+    private final int id;
+
+    private ConfigValue(final int id) {
+        this.id = id;
+    }
+
+    /**
+     * Get the integer ID for this config value.
+     *
+     * @return the integer ID
+     */
+    public int getId() {
+        return id;
+    }
+
+    /**
+     * Get the config value for an integer ID.
+     *
+     * @param id the integer ID
+     * @return the config value instance
+     */
+    public static ConfigValue getConfigValue(final int id) {
+        switch (id) {
+            case 0: return PROTOCOL_VERSION;
+            case 1: return MARSHALLER_NAME;
+            default: throw new IllegalArgumentException("Invalid config value ID");
+        }
+    }
+}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConnectionListener.java	2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,32 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.remoting.SimpleCloseable;
-
-/**
- *
- */
-public interface ConnectionListener {
-    void handleOpened(SimpleCloseable connection);
-}

Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/ConnectionListener.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/ConnectionListener.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.SimpleCloseable;
+
+/**
+ *
+ */
+public interface ConnectionListener {
+    void handleOpened(SimpleCloseable connection);
+}

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,154 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.util.Arrays;
+
+/**
+ *
+ */
+public final class IdentityIntMap<T> {
+
+    private int[] values;
+    private Object[] keys;
+    private int count;
+    private int resizeCount;
+
+    public IdentityIntMap(int initialCapacity, final float loadFactor) {
+        if (initialCapacity < 1) {
+            throw new IllegalArgumentException("initialCapacity must be > 0");
+        }
+        if (loadFactor <= 0.0f || loadFactor >= 1.0f) {
+            throw new IllegalArgumentException("loadFactor must be > 0.0 and < 1.0");
+        }
+        if (initialCapacity < 16) {
+            initialCapacity = 16;
+        } else {
+            // round up
+            final int c = Integer.highestOneBit(initialCapacity) - 1;
+            initialCapacity = Integer.highestOneBit(initialCapacity + c);
+        }
+        keys = new Object[initialCapacity];
+        values = new int[initialCapacity];
+        resizeCount = (int) ((double) initialCapacity * (double) loadFactor);
+    }
+
+    public IdentityIntMap(final float loadFactor) {
+        this(64, loadFactor);
+    }
+
+    public IdentityIntMap(final int initialCapacity) {
+        this(initialCapacity, 0.5f);
+    }
+
+    public IdentityIntMap() {
+        this(0.5f);
+    }
+
+    public int get(T key, int defVal) {
+        if (key == null) {
+            throw new NullPointerException("key is null");
+        }
+        final Object[] keys = this.keys;
+        final int mask = keys.length - 1;
+        int hc = System.identityHashCode(key) & mask;
+        Object v;
+        for (;;) {
+            v = keys[hc];
+            if (v == key) {
+                return values[hc];
+            }
+            if (v == null) {
+                // not found
+                return defVal;
+            }
+            hc = (hc + 1) & mask;
+        }
+    }
+
+    public void put(T key, int value) {
+        if (key == null) {
+            throw new NullPointerException("key is null");
+        }
+        final Object[] keys = this.keys;
+        final int mask = keys.length - 1;
+        final int[] values = this.values;
+        Object v;
+        int hc = System.identityHashCode(key) & mask;
+        for (int idx = hc;; idx = hc++ & mask) {
+            v = keys[idx];
+            if (v == null) {
+                keys[idx] = key;
+                values[idx] = value;
+                if (++count > resizeCount) {
+                    resize();
+                }
+                return;
+            }
+            if (v == key) {
+                values[idx] = value;
+                return;
+            }
+            hc++;
+        }
+    }
+
+    private final void resize() {
+        final Object[] oldKeys = keys;
+        final int oldsize = oldKeys.length;
+        final int[] oldValues = values;
+        if (oldsize >= 0x40000000) {
+            throw new IllegalStateException("Table full");
+        }
+        final int newsize = oldsize << 1;
+        final int mask = newsize - 1;
+        final Object[] newKeys = new Object[newsize];
+        final int[] newValues = new int[newsize];
+        keys = newKeys;
+        values = newValues;
+        if ((resizeCount <<= 1) == 0) {
+            resizeCount = Integer.MAX_VALUE;
+        }
+        for (int oi = 0; oi < oldsize; oi ++) {
+            final Object key = oldKeys[oi];
+            if (key != null) {
+                int ni = System.identityHashCode(key) & mask;
+                for (;;) {
+                    final Object v = newKeys[ni];
+                    if (v == null) {
+                        // found
+                        newKeys[ni] = key;
+                        newValues[ni] = oldValues[oi];
+                        break;
+                    }
+                    ni = (ni + 1) & mask;
+                }
+            }
+        }
+    }
+
+    public void clear() {
+        Arrays.fill(keys, null);
+        count = 0;
+    }
+}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/MessageType.java	2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,91 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-/**
- * The type of a protocol message.
- */
-public enum MessageType {
-
-    // One-way request, no return value may be sent
-    REQUEST_ONEWAY(1),
-    // Two-way request, return value is expected
-    REQUEST(2),
-    // Reply
-    REPLY(3),
-    // Attempt to cancel a request
-    CANCEL_REQUEST(4),
-    // Acknowledge that a request was cancelled
-    CANCEL_ACK(5),
-    // Request failed due to protocol or unmarshalling problem
-    REQUEST_RECEIVE_FAILED(6),
-    // Request failed due to exception
-    REQUEST_FAILED(7),
-    // Request completed but no reply or exception was sent
-    REQUEST_OUTCOME_UNKNOWN(8),
-    // Remote side called .close() on a forwarded RequestHandler
-    CLIENT_CLOSE(9),
-    // Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
-    CLIENT_OPEN(10),
-    // Remote side called .close() on a forwarded RequestHandlerSource
-    SERVICE_CLOSE(11),
-    // Remote side brought a new service online
-    SERVICE_ADVERTISE(12),
-    // Remote side's service is no longer available
-    SERVICE_UNADVERTISE(13),
-    ;
-    private final int id;
-
-    private MessageType(int id) {
-        this.id = id;
-    }
-
-    public int getId() {
-        return id;
-    }
-
-    /**
-     * Get the message type for an integer ID.
-     *
-     * @param id the integer ID
-     * @return the message type instance
-     */
-    public static MessageType getMessageType(final int id) {
-        switch (id) {
-            case 1: return REQUEST_ONEWAY;
-            case 2: return REQUEST;
-            case 3: return REPLY;
-            case 4: return CANCEL_REQUEST;
-            case 5: return CANCEL_ACK;
-            case 6: return REQUEST_RECEIVE_FAILED;
-            case 7: return REQUEST_FAILED;
-            case 8: return REQUEST_OUTCOME_UNKNOWN;
-            case 9: return CLIENT_CLOSE;
-            case 10: return CLIENT_OPEN;
-            case 11: return SERVICE_CLOSE;
-            case 12: return SERVICE_ADVERTISE;
-            case 13: return SERVICE_UNADVERTISE;
-            default: throw new IllegalArgumentException("Invalid message type ID");
-        }
-    }
-}

Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/MessageType.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+/**
+ * The type of a protocol message.
+ */
+public enum MessageType {
+
+    // One-way request, no return value may be sent
+    REQUEST_ONEWAY(1),
+    // Two-way request, return value is expected
+    REQUEST(2),
+    // Reply
+    REPLY(3),
+    // Attempt to cancel a request
+    CANCEL_REQUEST(4),
+    // Acknowledge that a request was cancelled
+    CANCEL_ACK(5),
+    // Request failed due to protocol or unmarshalling problem
+    REQUEST_RECEIVE_FAILED(6),
+    // Request failed due to exception
+    REQUEST_FAILED(7),
+    // Request completed but no reply or exception was sent
+    REQUEST_OUTCOME_UNKNOWN(8),
+    // Remote side called .close() on a forwarded RequestHandler
+    CLIENT_CLOSE(9),
+    // Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
+    CLIENT_OPEN(10),
+    // Remote side called .close() on a forwarded RequestHandlerSource
+    SERVICE_CLOSE(11),
+    // Remote side brought a new service online
+    SERVICE_ADVERTISE(12),
+    // Remote side's service is no longer available
+    SERVICE_UNADVERTISE(13),
+    ;
+    private final int id;
+
+    private MessageType(int id) {
+        this.id = id;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    /**
+     * Get the message type for an integer ID.
+     *
+     * @param id the integer ID
+     * @return the message type instance
+     */
+    public static MessageType getMessageType(final int id) {
+        switch (id) {
+            case 1: return REQUEST_ONEWAY;
+            case 2: return REQUEST;
+            case 3: return REPLY;
+            case 4: return CANCEL_REQUEST;
+            case 5: return CANCEL_ACK;
+            case 6: return REQUEST_RECEIVE_FAILED;
+            case 7: return REQUEST_FAILED;
+            case 8: return REQUEST_OUTCOME_UNKNOWN;
+            case 9: return CLIENT_CLOSE;
+            case 10: return CLIENT_OPEN;
+            case 11: return SERVICE_CLOSE;
+            case 12: return SERVICE_ADVERTISE;
+            case 13: return SERVICE_UNADVERTISE;
+            default: throw new IllegalArgumentException("Invalid message type ID");
+        }
+    }
+}

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,907 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.Connector;
+import org.jboss.xnio.Acceptor;
+import org.jboss.xnio.log.Logger;
+import org.jboss.remoting.spi.remote.RequestHandler;
+import org.jboss.remoting.spi.remote.RequestHandlerSource;
+import org.jboss.remoting.spi.remote.ReplyHandler;
+import org.jboss.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.remoting.spi.remote.Handle;
+import org.jboss.remoting.spi.SpiUtils;
+import org.jboss.remoting.spi.AbstractAutoCloseable;
+import org.jboss.remoting.spi.stream.StreamDetector;
+import org.jboss.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.remoting.spi.stream.StreamProvider;
+import static org.jboss.remoting.util.CollectionUtil.concurrentIntegerMap;
+import org.jboss.remoting.util.CollectionUtil;
+import org.jboss.remoting.util.ConcurrentIntegerMap;
+import org.jboss.remoting.CloseHandler;
+import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.SimpleCloseable;
+import org.jboss.remoting.RemoteExecutionException;
+import org.jboss.remoting.IndeterminateOutcomeException;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.ObjectTable;
+import org.jboss.marshalling.Marshalling;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.nio.ByteBuffer;
+import java.nio.BufferUnderflowException;
+import java.io.IOException;
+import java.io.InvalidClassException;
+
+/**
+ * Protocol handler for the basic message-oriented Remoting protocol.
+ *
+ * @param <A> stream channel address type (Void if streams are not supported)
+ */
+public final class MultiplexHandler<A> implements IoHandler<AllocatedMessageChannel> {
+
+    private static final Logger log = Logger.getLogger(MultiplexHandler.class);
+
+    //--== Connection configuration items ==--
+    private final MarshallerFactory marshallerFactory;
+    private final Configuration marshallingConfiguration;
+    private final int linkMetric;
+    private final Executor executor;
+    // buffer allocator for outbound message assembly
+    private final BufferAllocator<ByteBuffer> allocator;
+    private final StreamDetector streamDetector;
+    private final Connector<A, AllocatedMessageChannel> messageConnector;
+    private final Acceptor<A, AllocatedMessageChannel> messageAcceptor;
+    private final Connector<A, StreamChannel> streamConnector;
+    private final Acceptor<A, StreamChannel> streamAcceptor;
+
+    // running on remote node
+    private final ConcurrentIntegerMap<ReplyHandler> remoteRequests = concurrentIntegerMap();
+    // running on local node
+    private final ConcurrentIntegerMap<RemoteRequestContext> localRequests = concurrentIntegerMap();
+    // sequence for remote requests
+    private final AtomicInteger requestSequence = new AtomicInteger();
+
+    // clients whose requests get forwarded to the remote side
+    // even #s were opened from services forwarded to us (our sequence)
+    // odd #s were forwarded directly to us (remote sequence)
+    private final ConcurrentIntegerMap<RequestHandler> remoteClients = concurrentIntegerMap();
+    // forwarded to remote side (handled on this side)
+    private final ConcurrentIntegerMap<Handle<RequestHandler>> forwardedClients = concurrentIntegerMap();
+    // sequence for forwarded clients (unsigned; shift left one bit, add one)
+    private final AtomicInteger forwardedClientSequence = new AtomicInteger();
+    // sequence for clients created from services forwarded to us (unsigned; shift left one bit)
+    private final AtomicInteger remoteClientSequence = new AtomicInteger();
+
+    // services forwarded to us
+    private final ConcurrentIntegerMap<RequestHandlerSource> remoteServices = concurrentIntegerMap();
+    // forwarded to remote side (handled on this side)
+    private final ConcurrentIntegerMap<Handle<RequestHandlerSource>> forwardedServices = concurrentIntegerMap();
+    // sequence for forwarded services
+    private final AtomicInteger serviceSequence = new AtomicInteger();
+
+    private final Endpoint endpoint;
+
+    private volatile AllocatedMessageChannel channel;
+
+    public MultiplexHandler(final Endpoint endpoint, final RemotingChannelConfiguration configuration, final StreamProvider<A> streamProvider) {
+        this.endpoint = endpoint;
+        messageConnector = streamProvider.getMessageChannelConnector();
+        messageAcceptor = streamProvider.getMessageChannelAcceptor();
+        streamConnector = streamProvider.getStreamChannelConnector();
+        streamAcceptor = streamProvider.getStreamChannelAcceptor();
+        allocator = configuration.getAllocator();
+        executor = configuration.getExecutor();
+        marshallerFactory = configuration.getMarshallerFactory();
+        marshallingConfiguration = configuration.getMarshallingConfiguration();
+        linkMetric = configuration.getLinkMetric();
+        streamDetector = configuration.getStreamDetector();
+    }
+
+    public void handleOpened(final AllocatedMessageChannel channel) {
+        channel.resumeReads();
+    }
+
+    public void handleReadable(final AllocatedMessageChannel channel) {
+        for (;;) try {
+            final ByteBuffer buffer;
+            try {
+                buffer = channel.receive();
+            } catch (IOException e) {
+                log.error(e, "I/O error in protocol channel; closing channel");
+                IoUtils.safeClose(channel);
+                return;
+            }
+            if (buffer == null) {
+                // todo release all handles...
+                // todo what if the write queue is not empty?
+                IoUtils.safeClose(channel);
+                return;
+            }
+            if (! buffer.hasRemaining()) {
+                // would block
+                channel.resumeReads();
+                return;
+            }
+            final MessageType msgType;
+            try {
+                msgType = MessageType.getMessageType(buffer.get() & 0xff);
+            } catch (IllegalArgumentException ex) {
+                log.trace("Received invalid message type");
+                return;
+            }
+            log.trace("Received message %s, type %s", buffer, msgType);
+            switch (msgType) {
+                case REQUEST_ONEWAY: {
+                    final int clientId = buffer.getInt();
+                    final Handle<RequestHandler> handle = forwardedClients.get(clientId);
+                    if (handle == null) {
+                        log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
+                        return;
+                    }
+                    final Object payload;
+                    try {
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+                        try {
+                            unmarshaller.start(Marshalling.createByteInput(buffer));
+                            try {
+                                payload = unmarshaller.readObject();
+                                unmarshaller.finish();
+                            } catch (ClassNotFoundException e) {
+                                log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
+                                break;
+                            }
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (IOException ex) {
+                        log.error(ex, "Failed to unmarshal a one-way request");
+                        break;
+                    }
+                    final RequestHandler requestHandler = handle.getResource();
+                    try {
+                        requestHandler.receiveRequest(payload);
+                    } catch (Throwable t) {
+                        log.error(t, "One-way request handler unexpectedly threw an exception");
+                    }
+                    break;
+                }
+                case REQUEST: {
+                    final int clientId = buffer.getInt();
+                    final Handle<RequestHandler> handle = forwardedClients.get(clientId);
+                    if (handle == null) {
+                        log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
+                        break;
+                    }
+                    final int requestId = buffer.getInt();
+                    final Object payload;
+                    try {
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+                        try {
+                            unmarshaller.start(Marshalling.createByteInput(buffer));
+                            try {
+                                payload = unmarshaller.readObject();
+                                unmarshaller.finish();
+                            } catch (ClassNotFoundException e) {
+                                log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+                                // todo - send request receive failed message
+                                break;
+                            }
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (IOException ex) {
+                        log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
+                        // todo send a request failure message
+                        break;
+                    }
+                    final RequestHandler requestHandler = handle.getResource();
+                    requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel, requestId, allocator));
+                    break;
+                }
+                case REPLY: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final Object payload;
+                    try {
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+                        try {
+                            unmarshaller.start(Marshalling.createByteInput(buffer));
+                            try {
+                                payload = unmarshaller.readObject();
+                                unmarshaller.finish();
+                            } catch (ClassNotFoundException e) {
+                                replyHandler.handleException(new InvalidClassException("Class not found: " + e.toString()));
+                                log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
+                                break;
+                            }
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (IOException ex) {
+                        log.trace("Failed to unmarshal a reply (%s), sending a ReplyException");
+                        // todo
+                        SpiUtils.safeHandleException(replyHandler, ex);
+                        break;
+                    }
+                    SpiUtils.safeHandleReply(replyHandler, payload);
+                    break;
+                }
+                case CANCEL_REQUEST: {
+                    final int requestId = buffer.getInt();
+                    final RemoteRequestContext context = localRequests.get(requestId);
+                    if (context != null) {
+                        context.cancel();
+                    }
+                    break;
+                }
+                case CANCEL_ACK: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = remoteRequests.get(requestId);
+                    if (replyHandler != null) {
+                        replyHandler.handleCancellation();
+                    }
+                    break;
+                }
+                case REQUEST_RECEIVE_FAILED: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final String reason = readUTFZ(buffer);
+                    
+                    // todo - throw a new ReplyException
+                    break;
+                }
+                case REQUEST_FAILED: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final Throwable cause;
+                    try {
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+                        try {
+                            unmarshaller.start(Marshalling.createByteInput(buffer));
+                            try {
+                                cause = (Throwable) unmarshaller.readObject();
+                            } catch (ClassNotFoundException e) {
+                                replyHandler.handleException(new InvalidClassException("Class not found: " + e.toString()));
+                                log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+                                break;
+                            } catch (ClassCastException e) {
+                                SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred when attempting to unmarshal the cause)"));
+                                break;
+                            }
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (IOException ex) {
+                        log.trace("Failed to unmarshal an exception reply (%s), sending a generic execution exception");
+                        SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote request failed (and an unexpected I/O error occurred when attempting to read the cause)"));
+                        break;
+                    }
+                    SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote execution failed", cause));
+                    break;
+                }
+                case REQUEST_OUTCOME_UNKNOWN: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
+                        break;
+                    }
+                    final String reason = readUTFZ(buffer);
+                    SpiUtils.safeHandleException(replyHandler, new IndeterminateOutcomeException(reason));
+                    break;
+                }
+                case CLIENT_CLOSE: {
+                    final int clientId = buffer.getInt();
+                    final Handle<RequestHandler> handle = forwardedClients.remove(clientId);
+                    if (handle == null) {
+                        log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
+                        break;
+                    }
+                    IoUtils.safeClose(handle);
+                    break;
+                }
+                case CLIENT_OPEN: {
+                    final int serviceId = buffer.getInt();
+                    final int clientId = buffer.getInt();
+                    final Handle<RequestHandlerSource> handle = forwardedServices.get(serviceId);
+                    if (handle == null) {
+                        log.warn("Received client open message for unknown service %d", Integer.valueOf(serviceId));
+                        break;
+                    }
+                    try {
+                        final RequestHandlerSource requestHandlerSource = handle.getResource();
+                        final Handle<RequestHandler> clientHandle = requestHandlerSource.createRequestHandler();
+                        // todo check for duplicate
+                        // todo validate the client ID
+                        log.trace("Opening client %d from service %d", Integer.valueOf(clientId), Integer.valueOf(serviceId));
+                        forwardedClients.put(clientId, clientHandle);
+                    } catch (IOException ex) {
+                        log.error(ex, "Failed to create a request handler for client ID %d", Integer.valueOf(clientId));
+                        break;
+                    } finally {
+                        IoUtils.safeClose(handle);
+                    }
+                    break;
+                }
+                case SERVICE_CLOSE: {
+                    final Handle<RequestHandlerSource> handle = forwardedServices.remove(buffer.getInt());
+                    if (handle == null) {
+                        break;
+                    }
+                    IoUtils.safeClose(handle);
+                    break;
+                }
+                case SERVICE_ADVERTISE: {
+                    final int serviceId = buffer.getInt();
+                    final String serviceType = readUTFZ(buffer);
+                    final String groupName = readUTFZ(buffer);
+                    final String endpointName = readUTFZ(buffer);
+                    final int baseMetric = buffer.getInt();
+                    int id = -1;
+                    final RequestHandlerSource handlerSource = new RequestHandlerSourceImpl(allocator, id);
+                    final int calcMetric = baseMetric + linkMetric;
+                    if (calcMetric > 0) {
+                        try {
+                            final SimpleCloseable closeable = endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource, calcMetric);
+                            // todo - something with that closeable
+                        } catch (IOException e) {
+                            log.error(e, "Unable to register remote service");
+                        }
+                    }
+                    break;
+                }
+                case SERVICE_UNADVERTISE: {
+                    final int serviceId = buffer.getInt();
+                    IoUtils.safeClose(remoteServices.get(serviceId));
+                    break;
+                }
+                default: {
+                    log.error("Malformed packet received (invalid message type %s)", msgType);
+                }
+            }
+        } catch (BufferUnderflowException e) {
+            log.error("Malformed packet received (buffer underflow)");
+        }
+    }
+
+    public void handleWritable(final AllocatedMessageChannel channel) {
+        for (;;) {
+            final WriteHandler handler = outputQueue.peek();
+            if (handler == null) {
+                return;
+            }
+            try {
+                if (handler.handleWrite(channel)) {
+                    log.trace("Handled write with handler %s", handler);
+                    pending.decrementAndGet();
+                    outputQueue.remove();
+                } else {
+                    channel.resumeWrites();
+                    return;
+                }
+            } catch (Throwable t) {
+                pending.decrementAndGet();
+                outputQueue.remove();
+            }
+        }
+    }
+
+    public void handleClosed(final AllocatedMessageChannel channel) {
+    }
+
+    RequestHandlerSource getRemoteService(final int id) {
+        return new RequestHandlerSourceImpl(allocator, id);
+    }
+
+    private final class ReplyHandlerImpl implements ReplyHandler {
+
+        private final AllocatedMessageChannel channel;
+        private final int requestId;
+        private final BufferAllocator<ByteBuffer> allocator;
+
+        private ReplyHandlerImpl(final AllocatedMessageChannel channel, final int requestId, final BufferAllocator<ByteBuffer> allocator) {
+            if (channel == null) {
+                throw new NullPointerException("channel is null");
+            }
+            if (allocator == null) {
+                throw new NullPointerException("allocator is null");
+            }
+            this.channel = channel;
+            this.requestId = requestId;
+            this.allocator = allocator;
+        }
+
+        public void handleReply(final Object reply) {
+            ByteBuffer buffer = allocator.allocate();
+            buffer.put((byte) MessageType.REPLY.getId());
+            buffer.putInt(requestId);
+            try {
+                final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+                try {
+                    final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+                    final ByteOutput output = createByteOutput(allocator, bufferList);
+                    try {
+                        marshaller.start(output);
+                        marshaller.writeObject(reply);
+                        marshaller.close();
+                        output.close();
+                        registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+                    } finally {
+                        IoUtils.safeClose(output);
+                    }
+                } finally {
+                    IoUtils.safeClose(marshaller);
+                }
+            } catch (IOException e) {
+                log.error(e, "Failed to send a reply to the remote side");
+            } catch (InterruptedException e) {
+                log.error(e, "Reply handler thread interrupted before a reply could be sent");
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        public void handleException(final IOException exception) {
+            ByteBuffer buffer = allocator.allocate();
+            buffer.put((byte) MessageType.REQUEST_FAILED.getId());
+            buffer.putInt(requestId);
+            try {
+                final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+                try {
+                    final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+                    final ByteOutput output = createByteOutput(allocator, bufferList);
+                    try {
+                        marshaller.start(output);
+                        marshaller.writeObject(exception);
+                        marshaller.close();
+                        output.close();
+                        registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+                    } finally {
+                        IoUtils.safeClose(output);
+                    }
+                } finally {
+                    IoUtils.safeClose(marshaller);
+                }
+            } catch (IOException e) {
+                log.error(e, "Failed to send an exception to the remote side");
+            } catch (InterruptedException e) {
+                log.error(e, "Reply handler thread interrupted before an exception could be sent");
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        public void handleCancellation() {
+            final ByteBuffer buffer = allocator.allocate();
+            buffer.put((byte) MessageType.CANCEL_ACK.getId());
+            buffer.putInt(requestId);
+            buffer.flip();
+            try {
+                registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+            } catch (InterruptedException e) {
+                // todo log
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    // Writer members
+
+    private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
+    private final AtomicInteger pending = new AtomicInteger();
+
+    private void registerWriter(final AllocatedMessageChannel channel, final WriteHandler writeHandler) throws InterruptedException {
+        outputQueue.put(writeHandler);
+        if (pending.getAndIncrement() == 0) {
+            channel.resumeWrites();
+        }
+    }
+
+    // Reader utils
+
+    private String readUTFZ(ByteBuffer buffer) {
+        StringBuilder builder = new StringBuilder();
+        int state = 0, a = 0;
+        while (buffer.hasRemaining()) {
+            final int v = buffer.get() & 0xff;
+            switch (state) {
+                case 0: {
+                    if (v == 0) {
+                        return builder.toString();
+                    } else if (v < 128) {
+                        builder.append((char) v);
+                    } else if (192 <= v && v < 224) {
+                        a = v << 6;
+                        state = 1;
+                    } else if (224 <= v && v < 232) {
+                        a = v << 12;
+                        state = 2;
+                    } else {
+                        builder.append('?');
+                    }
+                    break;
+                }
+                case 1: {
+                    if (v == 0) {
+                        builder.append('?');
+                        return builder.toString();
+                    } else if (128 <= v && v < 192) {
+                        a |= v & 0x3f;
+                        builder.append((char) a);
+                    } else {
+                        builder.append('?');
+                    }
+                    state = 0;
+                    break;
+                }
+                case 2: {
+                    if (v == 0) {
+                        builder.append('?');
+                        return builder.toString();
+                    } else if (128 <= v && v < 192) {
+                        a |= (v & 0x3f) << 6;
+                        state = 1;
+                    } else {
+                        builder.append('?');
+                        state = 0;
+                    }
+                    break;
+                }
+                default:
+                    throw new IllegalStateException("wrong state");
+            }
+        }
+        return builder.toString();
+    }
+
+    // client endpoint
+
+    private final class RequestHandlerImpl extends AbstractAutoCloseable<RequestHandler> implements RequestHandler {
+
+        private final int identifier;
+        private final BufferAllocator<ByteBuffer> allocator;
+
+        public RequestHandlerImpl(final int identifier, final BufferAllocator<ByteBuffer> allocator) {
+            super(executor);
+            if (allocator == null) {
+                throw new NullPointerException("allocator is null");
+            }
+            this.identifier = identifier;
+            this.allocator = allocator;
+            addCloseHandler(new CloseHandler<RequestHandler>() {
+                public void handleClose(final RequestHandler closed) {
+                    remoteClients.remove(identifier, this);
+                    ByteBuffer buffer = allocator.allocate();
+                    buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
+                    buffer.putInt(identifier);
+                    buffer.flip();
+                    try {
+                        registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+                    } catch (InterruptedException e) {
+                        log.warn("Client close notification was interrupted before it could be sent");
+                    }
+                }
+            });
+        }
+
+        public void receiveRequest(final Object request) {
+            log.trace("Sending outbound one-way request of type %s", request == null ? "null" : request.getClass());
+            try {
+                final List<ByteBuffer> bufferList;
+                final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+                try {
+                    bufferList = new ArrayList<ByteBuffer>();
+                    final ByteOutput output = createByteOutput(allocator, bufferList);
+                    try {
+                        marshaller.write(MessageType.REQUEST_ONEWAY.getId());
+                        marshaller.writeInt(identifier);
+                        marshaller.writeObject(request);
+                        marshaller.close();
+                        output.close();
+                    } finally {
+                        IoUtils.safeClose(output);
+                    }
+                } finally {
+                    IoUtils.safeClose(marshaller);
+                }
+                try {
+                    registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+                } catch (InterruptedException e) {
+                    log.trace(e, "receiveRequest was interrupted");
+                    Thread.currentThread().interrupt();
+                    return;
+                }
+            } catch (Throwable t) {
+                // ignore
+                log.trace(t, "receiveRequest failed with an exception");
+                return;
+            }
+        }
+
+        public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
+            log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
+            try {
+                final List<ByteBuffer> bufferList;
+                final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+                try {
+                    bufferList = new ArrayList<ByteBuffer>();
+                    final ByteOutput output = createByteOutput(allocator, bufferList);
+                    try {
+                        marshaller.write(MessageType.REQUEST.getId());
+                        marshaller.writeInt(identifier);
+
+                        int id;
+                        do {
+                            id = requestSequence.getAndIncrement();
+                        } while (remoteRequests.putIfAbsent(id, handler) != null);
+                        marshaller.writeInt(id);
+                        marshaller.writeObject(request);
+                        marshaller.close();
+                        output.close();
+                        try {
+                            registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            executor.execute(new Runnable() {
+                                public void run() {
+                                    SpiUtils.safeHandleCancellation(handler);
+                                }
+                            });
+                            return SpiUtils.getBlankRemoteRequestContext();
+                        }
+                        log.trace("Sent request %s", request);
+                        return new RemoteRequestContextImpl(id, allocator, channel);
+                    } finally {
+                        IoUtils.safeClose(output);
+                    }
+                } finally {
+                    IoUtils.safeClose(marshaller);
+                }
+            } catch (final IOException t) {
+                log.trace(t, "receiveRequest failed with an exception");
+                executor.execute(new Runnable() {
+                    public void run() {
+                        SpiUtils.safeHandleException(handler, t);
+                    }
+                });
+                return SpiUtils.getBlankRemoteRequestContext();
+            }
+        }
+
+        public String toString() {
+            return "forwarding request handler <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+        }
+    }
+
+    public final class RemoteRequestContextImpl implements RemoteRequestContext {
+
+        private final BufferAllocator<ByteBuffer> allocator;
+        private final int id;
+        private final AllocatedMessageChannel channel;
+
+        public RemoteRequestContextImpl(final int id, final BufferAllocator<ByteBuffer> allocator, final AllocatedMessageChannel channel) {
+            this.id = id;
+            this.allocator = allocator;
+            this.channel = channel;
+        }
+
+        public void cancel() {
+            try {
+                final ByteBuffer buffer = allocator.allocate();
+                buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
+                buffer.putInt(id);
+                buffer.flip();
+                registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+            } catch (InterruptedException e) {
+                // todo log that cancel attempt failed
+                Thread.currentThread().interrupt();
+            } catch (Throwable t) {
+                // todo log that cancel attempt failed
+            }
+        }
+    }
+
+    public final class RequestHandlerSourceImpl extends AbstractAutoCloseable<RequestHandlerSource> implements RequestHandlerSource {
+
+        private final BufferAllocator<ByteBuffer> allocator;
+        private final int identifier;
+
+        protected RequestHandlerSourceImpl(final BufferAllocator<ByteBuffer> allocator, final int identifier) {
+            super(executor);
+            this.allocator = allocator;
+            this.identifier = identifier;
+            addCloseHandler(new CloseHandler<RequestHandlerSource>() {
+                public void handleClose(final RequestHandlerSource closed) {
+                    ByteBuffer buffer = allocator.allocate();
+                    buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
+                    buffer.putInt(identifier);
+                    buffer.flip();
+                    try {
+                        registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+                    } catch (InterruptedException e) {
+                        log.warn("Service close notification was interrupted before it could be sent");
+                    }
+                }
+            });
+        }
+
+        public Handle<RequestHandler> createRequestHandler() throws IOException {
+            int id;
+            do {
+                id = remoteClientSequence.getAndIncrement() << 1;
+            } while (remoteClients.putIfAbsent(id, new RequestHandlerImpl(id, MultiplexHandler.this.allocator)) != null);
+            final int clientId = id;
+            final ByteBuffer buffer = allocator.allocate();
+            buffer.put((byte) MessageType.CLIENT_OPEN.getId());
+            buffer.putInt(identifier);
+            buffer.putInt(clientId);
+            buffer.flip();
+            // todo - probably should bail out if we're interrupted?
+            boolean intr = false;
+            for (;;) {
+                try {
+                    registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+                    try {
+                        return new RequestHandlerImpl(clientId, allocator).getHandle();
+                    } finally {
+                        if (intr) {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    intr = true;
+                }
+            }
+        }
+
+        public String toString() {
+            return "forwarding request handler source <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
+        }
+    }
+
+    public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer> allocator, final Collection<ByteBuffer> target) {
+        return new ByteOutput() {
+            private ByteBuffer current;
+
+            private ByteBuffer getCurrent() {
+                final ByteBuffer buffer = current;
+                return buffer == null ? (current = allocator.allocate()) : buffer;
+            }
+
+            public void write(final int i) {
+                final ByteBuffer buffer = getCurrent();
+                buffer.put((byte) i);
+                if (! buffer.hasRemaining()) {
+                    buffer.flip();
+                    target.add(buffer);
+                    current = null;
+                }
+            }
+
+            public void write(final byte[] bytes) {
+                write(bytes, 0, bytes.length);
+            }
+
+            public void write(final byte[] bytes, int offs, int len) {
+                while (len > 0) {
+                    final ByteBuffer buffer = getCurrent();
+                    final int c = Math.min(len, buffer.remaining());
+                    buffer.put(bytes, offs, c);
+                    offs += c;
+                    len -= c;
+                    if (! buffer.hasRemaining()) {
+                        buffer.flip();
+                        target.add(buffer);
+                        current = null;
+                    }
+                }
+            }
+
+            public void close() {
+                flush();
+            }
+
+            public void flush() {
+                final ByteBuffer buffer = current;
+                if (buffer != null) {
+                    buffer.flip();
+                    target.add(buffer);
+                    current = null;
+                }
+            }
+        };
+    }
+
+    public class ProtocolObjectTableWriter implements ObjectTable.Writer {
+
+        public void writeObject(final Marshaller marshaller, final Object o) throws IOException {
+            
+        }
+    }
+
+    public class ProtocolObjectTable implements ObjectTable {
+
+        public Writer getObjectWriter(final Object o) throws IOException /* fixed in beta2 */ {
+            if (o instanceof RequestHandler) {
+                final RequestHandler requestHandler = (RequestHandler) o;
+
+            } else if (o instanceof RequestHandlerSource) {
+                final RequestHandlerSource requestHandlerSource = (RequestHandlerSource) o;
+                
+            } else {
+                final StreamSerializerFactory ssf = streamDetector.detectStream(o);
+                if (ssf != null) {
+                    final IoHandler<? super AllocatedMessageChannel> streamHandler = ssf.getLocalSide(o, new StreamContextImpl(executor, marshallerFactory, marshallingConfiguration));
+                    // todo - this should really be the "server" side
+                    final IoFuture<AllocatedMessageChannel> futureChannel = messageConnector.connectTo(null, streamHandler);
+                    
+                }
+            }
+            return null;
+        }
+
+        public Object readObject(final Unmarshaller unmarshaller) throws IOException, ClassNotFoundException {
+            switch (unmarshaller.readByte()) {
+                case 1: {
+                    // remote client
+                    final int id = unmarshaller.readInt();
+                }
+                case 2: {
+                    // remote client source
+                }
+                case 3: {
+                    // stream
+                }
+                default: {
+                    // invalid
+                }
+            }
+            return null;
+        }
+    }
+}

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.RemotingException;
+import org.jboss.remoting.SimpleCloseable;
+import org.jboss.remoting.Endpoint;
+import org.jboss.remoting.spi.remote.RequestHandlerSource;
+import org.jboss.remoting.spi.remote.Handle;
+import org.jboss.remoting.spi.stream.StreamProvider;
+import org.jboss.xnio.IoHandlerFactory;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.AbstractConvertingIoFuture;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public final class MultiplexProtocol {
+
+    private static final Logger log = Logger.getLogger(MultiplexProtocol.class);
+
+    private MultiplexProtocol() {
+    }
+
+    /**
+     * Create a request server for the multiplex protocol.
+     *
+     * @param executor the executor to use for invocations
+     * @param allocator the buffer allocator to use
+     * @return a handler factory for passing to an XNIO server
+     * @param <A> stream channel address type
+     */
+    public static <A> IoHandlerFactory<AllocatedMessageChannel> createServer(final Endpoint endpoint, final Executor executor, final BufferAllocator<ByteBuffer> allocator, final StreamProvider<A> streamProvider) {
+        return new IoHandlerFactory<AllocatedMessageChannel>() {
+            public IoHandler<? super AllocatedMessageChannel> createHandler() {
+                final RemotingChannelConfiguration configuration = new RemotingChannelConfiguration();
+                configuration.setAllocator(allocator);
+                configuration.setExecutor(executor);
+                // todo marshaller factory... etc
+                return new MultiplexHandler<A>(endpoint, configuration, streamProvider);
+            }
+        };
+    }
+
+    /**
+     * Create a request client for the multiplex protocol.
+     *
+     * @return a handle which may be used to close the connection
+     * @throws IOException if an error occurs @param executor the executor to use for invocations
+     * @param channelSource the XNIO channel source to use to establish the connection
+     * @param allocator the buffer allocator to use
+     * @param streamProvider
+     */
+    public static <A> IoFuture<SimpleCloseable> connect(final Endpoint endpoint, final Executor executor, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator, final StreamProvider<A> streamProvider) throws IOException {
+        final RemotingChannelConfiguration configuration = new RemotingChannelConfiguration();
+        configuration.setAllocator(allocator);
+        configuration.setExecutor(executor);
+        // todo marshaller factory... etc
+        final MultiplexHandler multiplexHandler = new MultiplexHandler<A>(endpoint, configuration, streamProvider);
+        final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(multiplexHandler);
+        return new AbstractConvertingIoFuture<SimpleCloseable, AllocatedMessageChannel>(futureChannel) {
+            protected SimpleCloseable convert(final AllocatedMessageChannel channel) throws RemotingException {
+                return new AbstractConnection(executor) {
+                    public Handle<RequestHandlerSource> getServiceForId(final int id) throws IOException {
+                        return multiplexHandler.getRemoteService(id).getHandle();
+                    }
+                };
+            }
+        };
+    }
+}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/RemotingChannelConfiguration.java	2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,82 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import java.util.concurrent.Executor;
-import java.nio.ByteBuffer;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.marshalling.MarshallerFactory;
-
-/**
- *
- */
-public final class RemotingChannelConfiguration {
-    private MarshallerFactory marshallerFactory;
-    private int linkMetric;
-    private Executor executor;
-    private ClassLoader classLoader;
-    private BufferAllocator<ByteBuffer> allocator;
-
-    public RemotingChannelConfiguration() {
-    }
-
-    public MarshallerFactory getMarshallerFactory() {
-        return marshallerFactory;
-    }
-
-    public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
-        this.marshallerFactory = marshallerFactory;
-    }
-
-    public int getLinkMetric() {
-        return linkMetric;
-    }
-
-    public void setLinkMetric(final int linkMetric) {
-        this.linkMetric = linkMetric;
-    }
-
-    public Executor getExecutor() {
-        return executor;
-    }
-
-    public void setExecutor(final Executor executor) {
-        this.executor = executor;
-    }
-
-    public ClassLoader getClassLoader() {
-        return classLoader;
-    }
-
-    public void setClassLoader(final ClassLoader classLoader) {
-        this.classLoader = classLoader;
-    }
-
-    public BufferAllocator<ByteBuffer> getAllocator() {
-        return allocator;
-    }
-
-    public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
-        this.allocator = allocator;
-    }
-}

Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/RemotingChannelConfiguration.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/RemotingChannelConfiguration.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,93 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.util.concurrent.Executor;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Configuration;
+import org.jboss.remoting.spi.stream.StreamDetector;
+
+/**
+ *
+ */
+public final class RemotingChannelConfiguration {
+    private MarshallerFactory marshallerFactory;
+    private Configuration marshallingConfiguration;
+    private int linkMetric;
+    private Executor executor;
+    private BufferAllocator<ByteBuffer> allocator;
+    private StreamDetector streamDetector;
+
+    public RemotingChannelConfiguration() {
+    }
+
+    public MarshallerFactory getMarshallerFactory() {
+        return marshallerFactory;
+    }
+
+    public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+        this.marshallerFactory = marshallerFactory;
+    }
+
+    public Configuration getMarshallingConfiguration() {
+        return marshallingConfiguration;
+    }
+
+    public void setMarshallingConfiguration(final Configuration marshallingConfiguration) {
+        this.marshallingConfiguration = marshallingConfiguration;
+    }
+
+    public int getLinkMetric() {
+        return linkMetric;
+    }
+
+    public void setLinkMetric(final int linkMetric) {
+        this.linkMetric = linkMetric;
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    public void setExecutor(final Executor executor) {
+        this.executor = executor;
+    }
+
+    public BufferAllocator<ByteBuffer> getAllocator() {
+        return allocator;
+    }
+
+    public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
+        this.allocator = allocator;
+    }
+
+    public StreamDetector getStreamDetector() {
+        return streamDetector;
+    }
+
+    public void setStreamDetector(final StreamDetector streamDetector) {
+        this.streamDetector = streamDetector;
+    }
+}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/SimpleWriteHandler.java	2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,84 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.log.Logger;
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-public final class SimpleWriteHandler implements WriteHandler {
-    private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
-
-    private final BufferAllocator<ByteBuffer> allocator;
-    private final ByteBuffer[] buffers;
-
-    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final List<ByteBuffer> buffers) {
-        this.allocator = allocator;
-        this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
-        logBufferSize();
-    }
-
-    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer[] buffers) {
-        this.allocator = allocator;
-        this.buffers = buffers;
-        logBufferSize();
-    }
-
-    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer buffer) {
-        this.allocator = allocator;
-        buffers = new ByteBuffer[] { buffer };
-        logBufferSize();
-    }
-
-    private void logBufferSize() {
-        if (log.isTrace()) {
-            long t = 0L;
-            for (ByteBuffer buf : buffers) {
-                t += (long)buf.remaining();
-            }
-            log.trace("Writing a message of size %d", Long.valueOf(t));
-        }
-    }
-
-    public boolean handleWrite(final WritableMessageChannel channel) {
-        boolean done = true;
-        try {
-            return (done = channel.send(buffers));
-        } catch (IOException e) {
-            log.trace(e, "Write failed");
-            return true;
-        } finally {
-            if (done) {
-                for (ByteBuffer buffer : buffers) {
-                    allocator.free(buffer);
-                }
-            }
-        }
-    }
-}

Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/SimpleWriteHandler.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/SimpleWriteHandler.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public final class SimpleWriteHandler implements WriteHandler {
+    private static final Logger log = Logger.getLogger(SimpleWriteHandler.class);
+
+    private final BufferAllocator<ByteBuffer> allocator;
+    private final ByteBuffer[] buffers;
+
+    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final List<ByteBuffer> buffers) {
+        this.allocator = allocator;
+        this.buffers = buffers.toArray(new ByteBuffer[buffers.size()]);
+        logBufferSize();
+    }
+
+    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer[] buffers) {
+        this.allocator = allocator;
+        this.buffers = buffers;
+        logBufferSize();
+    }
+
+    public SimpleWriteHandler(final BufferAllocator<ByteBuffer> allocator, final ByteBuffer buffer) {
+        this.allocator = allocator;
+        buffers = new ByteBuffer[] { buffer };
+        logBufferSize();
+    }
+
+    private void logBufferSize() {
+        if (log.isTrace()) {
+            long t = 0L;
+            for (ByteBuffer buf : buffers) {
+                t += (long)buf.remaining();
+            }
+            log.trace("Writing a message of size %d", Long.valueOf(t));
+        }
+    }
+
+    public boolean handleWrite(final WritableMessageChannel channel) {
+        boolean done = true;
+        try {
+            return (done = channel.send(buffers));
+        } catch (IOException e) {
+            log.trace(e, "Write failed");
+            return true;
+        } finally {
+            if (done) {
+                for (ByteBuffer buffer : buffers) {
+                    allocator.free(buffer);
+                }
+            }
+        }
+    }
+}

Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/StreamContextImpl.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.stream.StreamContext;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.MarshallerFactory;
+import java.util.concurrent.Executor;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class StreamContextImpl implements StreamContext {
+
+    private final Executor executor;
+    private final MarshallerFactory marshallerFactory;
+    private final Configuration marshallerConfiguration;
+
+    StreamContextImpl(final Executor executor, final MarshallerFactory marshallerFactory, final Configuration marshallerConfiguration) {
+        this.executor = executor;
+        this.marshallerFactory = marshallerFactory;
+        this.marshallerConfiguration = marshallerConfiguration;
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    public Marshaller createMarshaller() throws IOException {
+        return marshallerFactory.createMarshaller(marshallerConfiguration);
+    }
+
+    public Unmarshaller createUnmarshaller() throws IOException {
+        return marshallerFactory.createUnmarshaller(marshallerConfiguration);
+    }
+}

Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/WriteHandler.java	2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,32 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import org.jboss.xnio.channels.WritableMessageChannel;
-
-/**
- *
- */
-public interface WriteHandler {
-    boolean handleWrite(WritableMessageChannel channel);
-}

Copied: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java (from rev 4600, remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/WriteHandler.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/WriteHandler.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.xnio.channels.WritableMessageChannel;
+
+/**
+ *
+ */
+public interface WriteHandler {
+    boolean handleWrite(WritableMessageChannel channel);
+}

Copied: remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex (from rev 4573, remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic)

Deleted: remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/ConnectionTestCase.java	2008-09-18 03:06:39 UTC (rev 4573)
+++ remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -1,81 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.basic;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import junit.framework.TestCase;
-import org.jboss.remoting.core.EndpointImpl;
-import org.jboss.remoting.test.support.LoggingHelper;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.Xnio;
-import org.jboss.xnio.CloseableExecutor;
-import org.jboss.xnio.nio.NioXnio;
-
-/**
- *
- */
-public final class ConnectionTestCase extends TestCase {
-    static {
-        LoggingHelper.init();
-    }
-
-    public void testConnection() throws Throwable {
-        final String REQUEST = "request";
-        final String REPLY = "reply";
-        final List<Throwable> problems = Collections.synchronizedList(new LinkedList<Throwable>());
-        final CloseableExecutor closeableExecutor = IoUtils.closeableExecutor(Executors.newCachedThreadPool(), 500L, TimeUnit.MILLISECONDS);
-        try {
-            final BufferAllocator<ByteBuffer> allocator = new BufferAllocator<ByteBuffer>() {
-                public ByteBuffer allocate() {
-                    return ByteBuffer.allocate(1024);
-                }
-
-                public void free(final ByteBuffer buffer) {
-                }
-            };
-            final Xnio xnio = NioXnio.create();
-            try {
-                final EndpointImpl endpoint = new EndpointImpl();
-                endpoint.setExecutor(closeableExecutor);
-                endpoint.start();
-                try {
-                } finally {
-                    endpoint.stop();
-                }
-            } finally {
-                IoUtils.safeClose(xnio);
-            }
-        } finally {
-            IoUtils.safeClose(closeableExecutor);
-        }
-        for (Throwable t : problems) {
-            throw t;
-        }
-    }
-}

Copied: remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java (from rev 4600, remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/ConnectionTestCase.java)
===================================================================
--- remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java	                        (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -0,0 +1,81 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import junit.framework.TestCase;
+import org.jboss.remoting.core.EndpointImpl;
+import org.jboss.remoting.test.support.LoggingHelper;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.CloseableExecutor;
+import org.jboss.xnio.nio.NioXnio;
+
+/**
+ *
+ */
+public final class ConnectionTestCase extends TestCase {
+    static {
+        LoggingHelper.init();
+    }
+
+    public void testConnection() throws Throwable {
+        final String REQUEST = "request";
+        final String REPLY = "reply";
+        final List<Throwable> problems = Collections.synchronizedList(new LinkedList<Throwable>());
+        final CloseableExecutor closeableExecutor = IoUtils.closeableExecutor(Executors.newCachedThreadPool(), 500L, TimeUnit.MILLISECONDS);
+        try {
+            final BufferAllocator<ByteBuffer> allocator = new BufferAllocator<ByteBuffer>() {
+                public ByteBuffer allocate() {
+                    return ByteBuffer.allocate(1024);
+                }
+
+                public void free(final ByteBuffer buffer) {
+                }
+            };
+            final Xnio xnio = NioXnio.create();
+            try {
+                final EndpointImpl endpoint = new EndpointImpl();
+                endpoint.setExecutor(closeableExecutor);
+                endpoint.start();
+                try {
+                } finally {
+                    endpoint.stop();
+                }
+            } finally {
+                IoUtils.safeClose(xnio);
+            }
+        } finally {
+            IoUtils.safeClose(closeableExecutor);
+        }
+        for (Throwable t : problems) {
+            throw t;
+        }
+    }
+}

Modified: remoting3/trunk/testing-support/src/main/resources/testing.policy
===================================================================
--- remoting3/trunk/testing-support/src/main/resources/testing.policy	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/testing-support/src/main/resources/testing.policy	2008-10-21 02:39:24 UTC (rev 4601)
@@ -15,7 +15,6 @@
 {
     permission java.lang.RuntimePermission "modifyThread"; // for executor control
     permission java.net.SocketPermission "*:*", "accept, connect, resolve";
-    permission java.util.PropertyPermission "xnio.provider", "read"; // todo - fixed in XNIO trunk...
 };
 
 // Permissions for Remoting itself
@@ -27,14 +26,11 @@
 
 grant codeBase "file:${build.home}/core/target/main/classes/-"
 {
-    // TODO: this is for the marshallers, which ought to be in their own module/module set
-    permission java.io.SerializablePermission "enableSubstitution";
     permission java.util.PropertyPermission "jboss.remoting.*", "read";
 };
 
 grant codeBase "file:${build.home}/protocol/basic/target/main/classes/-"
 {
-    permission java.net.SocketPermission "*:*", "accept, connect, resolve"; // todo - need a better solution
     permission java.util.PropertyPermission "jboss.remoting.*", "read";
 };
 
@@ -66,7 +62,7 @@
     permission java.security.AllPermission;
 };
 
-grant codeBase "file:${lib.xnio-standalone.local}"
+grant codeBase "file:${lib.xnio-nio.local}"
 {
     permission java.security.AllPermission;
 };

Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java	2008-10-20 13:51:33 UTC (rev 4600)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/OrderedExecutor.java	2008-10-21 02:39:24 UTC (rev 4601)
@@ -24,6 +24,7 @@
 
 import java.util.concurrent.Executor;
 import java.util.LinkedList;
+import org.jboss.xnio.log.Logger;
 
 /**
  * An executor that always runs all tasks in order, using a delegate executor to run the tasks.
@@ -32,6 +33,8 @@
  * same method, will result in B's task running after A's.
  */
 public final class OrderedExecutor implements Executor {
+    private static final Logger log = Logger.getLogger(OrderedExecutor.class);
+
     // @protectedby tasks
     private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
     // @protectedby tasks
@@ -60,7 +63,7 @@
                     try {
                         task.run();
                     } catch (Throwable t) {
-                        // eat it!
+                        log.error(t, "Runnable task %s failed", task);
                     }
                 }
             }
@@ -77,7 +80,15 @@
             tasks.add(command);
             if (! running) {
                 running = true;
-                parent.execute(runner);
+                boolean ok = false;
+                try {
+                    parent.execute(runner);
+                    ok = true;
+                } finally {
+                    if (! ok) {
+                        running = false;
+                    }
+                }
             }
         }
     }




More information about the jboss-remoting-commits mailing list