[jboss-remoting-commits] JBoss Remoting SVN: r4565 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting and 8 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Tue Sep 9 10:05:03 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-09-09 10:05:02 -0400 (Tue, 09 Sep 2008)
New Revision: 4565

Added:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ReplyException.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConfigValue.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/RemotingChannelConfiguration.java
   remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentIntegerMap.java
   remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/EmulatedConcurrentIntegerHashMap.java
Removed:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/
   remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/marshal/
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/marshal/
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
   remoting3/trunk/build.properties
   remoting3/trunk/build.xml
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
   remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java
Log:
Convert to new marshalling API

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ReplyException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ReplyException.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ReplyException.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting;
+
+/**
+ *
+ */
+public class ReplyException extends RemotingException {
+
+    private static final long serialVersionUID = 5562116026829381932L;
+
+    public ReplyException() {
+    }
+
+    public ReplyException(final String msg) {
+        super(msg);
+    }
+
+    public ReplyException(final Throwable cause) {
+        super(cause);
+    }
+
+    public ReplyException(final String msg, final Throwable cause) {
+        super(msg, cause);
+    }
+}

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java	2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -41,6 +41,7 @@
      * @param msg the message
      * @param cause the cause
      */
+    // TODO - change to accept a RemotingException instead?
     void handleException(final String msg, Throwable cause);
 
     /**

Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties	2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/build.properties	2008-09-09 14:05:02 UTC (rev 4565)
@@ -115,6 +115,14 @@
 lib.jboss-managed.local=${local.repository}/${lib.jboss-managed.local-path}
 lib.jboss-managed.remote=${remote.repository}/${lib.jboss-managed.remote-path}
 
+lib.jboss-marshalling-api.version=1.0.0.Alpha.20080903-314
+lib.jboss-marshalling-api.name=marshalling-api.jar
+lib.jboss-marshalling-api.license=lgpl
+lib.jboss-marshalling-api.dir=jboss/marshalling/${lib.jboss-marshalling-api.version}/lib
+lib.jboss-marshalling-api.path=${lib.jboss-marshalling-api.dir}/${lib.jboss-marshalling-api.name}
+lib.jboss-marshalling-api.local=${local.repository}/${lib.jboss-marshalling-api.path}
+lib.jboss-marshalling-api.remote=${remote.repository}/${lib.jboss-marshalling-api.path}
+
 lib.jboss-serialization.version=1.1.0-snapshot-r358
 lib.jboss-serialization.name=jboss-serialization.jar
 lib.jboss-serialization.license=lgpl

Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml	2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/build.xml	2008-09-09 14:05:02 UTC (rev 4565)
@@ -127,6 +127,18 @@
         <get src="${remote.license.dir}/${lib.jboss-managed.license}.txt" dest="${lib.jboss-managed.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
     </target>
 
+    <!-- External library: JBoss Marshalling -->
+
+    <target name="lib.jboss-marshalling-api-check">
+        <available property="lib.jboss-marshalling-api.exists" file="${lib.jboss-marshalling-api.local}"/>
+    </target>
+
+    <target name="lib.jboss-marshalling-api" depends="lib.jboss-marshalling-api-check" unless="lib.jboss-marshalling-api.exists">
+        <mkdir dir="${local.repository}/${lib.jboss-marshalling-api.dir}"/>
+        <get src="${lib.jboss-marshalling-api.remote}" dest="${lib.jboss-marshalling-api.local}" usetimestamp="true" ignoreerrors="false"/>
+        <get src="${remote.license.dir}/${lib.jboss-marshalling-api.license}.txt" dest="${lib.jboss-marshalling-api.local}.license.txt" usetimestamp="true" ignoreerrors="false"/>
+    </target>
+
     <!-- External library: JBoss Serialization -->
 
     <target name="lib.jboss-serialization-check">
@@ -236,6 +248,7 @@
                 debug="true">
             <compilerarg value="-Xlint:unchecked"/>
             <classpath>
+                <pathelement location="${lib.jboss-marshalling-api.local}"/>
                 <pathelement location="${lib.xnio-api.local}"/>
                 <path refid="util.classpath"/>
             </classpath>
@@ -267,6 +280,7 @@
                 <path refid="api.classpath"/>
                 <path refid="testing-support.classpath"/>
                 <pathelement location="${lib.junit.local}"/>
+                <pathelement location="${lib.jboss-marshalling-api.local}"/>
                 <pathelement location="${lib.xnio-api.local}"/>
             </classpath>
         </javac>
@@ -319,7 +333,7 @@
         <delete dir="api/target"/>
     </target>
 
-    <target name="api" description="Build the API module" depends="lib.xnio-api,util,api.compile">
+    <target name="api" description="Build the API module" depends="lib.jboss-marshalling-api-check,lib.xnio-api,util,api.compile">
         <path id="api.classpath">
             <pathelement location="api/target/main/classes"/>
         </path>
@@ -390,7 +404,7 @@
                 <path refid="api.classpath"/>
                 <path refid="util.classpath"/>
                 <path refid="version.classpath"/>
-                <pathelement location="${lib.jboss-serialization.local}"/>
+                <pathelement location="${lib.jboss-marshalling-api.local}"/>
                 <pathelement location="${lib.xnio-api.local}"/>
             </classpath>
         </javac>
@@ -422,6 +436,7 @@
                 <path refid="core.classpath"/>
                 <path refid="util.classpath"/>
                 <path refid="testing-support.classpath"/>
+                <pathelement location="${lib.jboss-marshalling-api.local}"/>
                 <pathelement location="${lib.junit.local}"/>
                 <pathelement location="${lib.xnio-api.local}"/>
             </classpath>
@@ -476,7 +491,7 @@
         <delete dir="core/target"/>
     </target>
 
-    <target name="core" description="Build the core module" depends="lib.jboss-serialization,api,util,version,core.compile">
+    <target name="core" description="Build the core module" depends="api,util,version,core.compile">
         <path id="core.classpath">
             <pathelement location="core/target/main/classes"/>
         </path>
@@ -743,6 +758,7 @@
                 <!-- TODO: marshallers should be moved to their own module -->
                 <path refid="core.classpath"/>
                 <path refid="util.classpath"/>
+                <pathelement location="${lib.jboss-marshalling-api.local}"/>
                 <pathelement location="${lib.xnio-api.local}"/>
             </classpath>
         </javac>
@@ -776,6 +792,7 @@
                 <path refid="util.classpath"/>
                 <path refid="testing-support.classpath"/>
                 <pathelement location="${lib.junit.local}"/>
+                <pathelement location="${lib.jboss-marshalling-api.local}"/>
                 <pathelement location="${lib.xnio-standalone.local}"/>
             </classpath>
         </javac>
@@ -802,6 +819,7 @@
                 <path refid="util.classpath"/>
                 <pathelement location="protocol/basic/target/test/classes"/>
                 <pathelement location="${lib.junit.local}"/>
+                <pathelement location="${lib.jboss-marshalling-api.local}"/>
                 <pathelement location="${lib.xnio-standalone.local}"/>
             </classpath>
             <batchtest fork="yes" todir="protocol/basic/target/test-results"
@@ -830,7 +848,7 @@
         <delete dir="protocol.basic/target"/>
     </target>
 
-    <target name="protocol.basic" description="Build the protocol.basic module" depends="lib.xnio-api,api,core,util,protocol.basic.compile">
+    <target name="protocol.basic" description="Build the protocol.basic module" depends="lib.jboss-marshalling-api,lib.xnio-api,api,core,util,protocol.basic.compile">
         <path id="protocol.basic.classpath">
             <pathelement location="protocol/basic/target/main/classes"/>
         </path>

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java	2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -2,7 +2,6 @@
 
 import java.io.IOException;
 import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
 import org.jboss.cx.remoting.stream.ObjectSource;
 import org.jboss.xnio.channels.StreamChannel;
 import org.jboss.xnio.channels.StreamSourceChannel;
@@ -12,6 +11,7 @@
 import org.jboss.xnio.ChannelSource;
 import org.jboss.xnio.IoUtils;
 import org.jboss.xnio.log.Logger;
+import org.jboss.marshalling.MarshallerFactory;
 
 /**
  *

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -27,43 +27,30 @@
 import org.jboss.xnio.BufferAllocator;
 import org.jboss.xnio.IoUtils;
 import org.jboss.xnio.log.Logger;
-import static org.jboss.xnio.Buffers.*;
 import org.jboss.cx.remoting.spi.remote.RequestHandler;
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.ReplyHandler;
 import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
 import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
-import org.jboss.cx.remoting.spi.marshal.Marshaller;
-import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
-import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
-import org.jboss.cx.remoting.spi.marshal.IdentityResolver;
 import org.jboss.cx.remoting.spi.SpiUtils;
 import org.jboss.cx.remoting.spi.AbstractAutoCloseable;
-import static org.jboss.cx.remoting.util.CollectionUtil.concurrentMap;
-import static org.jboss.cx.remoting.util.CollectionUtil.arrayList;
+import static org.jboss.cx.remoting.util.CollectionUtil.concurrentIntegerMap;
 import org.jboss.cx.remoting.util.CollectionUtil;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_ONEWAY;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.REPLY;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.CLIENT_CLOSE;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.CLIENT_OPEN;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_CLOSE;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.VERSION;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_ADVERTISE;
-import static org.jboss.cx.remoting.protocol.basic.MessageType.SERVICE_UNADVERTISE;
+import org.jboss.cx.remoting.util.ConcurrentIntegerMap;
 import org.jboss.cx.remoting.CloseHandler;
 import org.jboss.cx.remoting.Endpoint;
-import java.util.concurrent.ConcurrentMap;
+import org.jboss.cx.remoting.SimpleCloseable;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.ByteOutput;
+import org.jboss.marshalling.Marshaller;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.List;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
 import java.nio.ByteBuffer;
 import java.nio.BufferUnderflowException;
 import java.io.IOException;
@@ -74,96 +61,67 @@
 public final class BasicHandler implements IoHandler<AllocatedMessageChannel> {
 
     private static final Logger log = Logger.getLogger(BasicHandler.class);
-    private static final int LOCAL_VERSION = 0x00000100;
 
+    //--== Connection configuration items ==--
+    private final MarshallerFactory marshallerFactory;
+    private final int linkMetric;
+    private final Executor executor;
+    private final ClassLoader classLoader;
+    // buffer allocator for outbound message assembly
+    private final BufferAllocator<ByteBuffer> allocator;
+
     // running on remote node
-    private final ConcurrentMap<Integer, ReplyHandler> outstandingRequests = concurrentMap();
+    private final ConcurrentIntegerMap<ReplyHandler> remoteRequests = concurrentIntegerMap();
+    // running on local node
+    private final ConcurrentIntegerMap<RemoteRequestContext> localRequests = concurrentIntegerMap();
+    // sequence for remote requests
+    private final AtomicInteger requestSequence = new AtomicInteger();
 
     // clients whose requests get forwarded to the remote side
-    private final ConcurrentMap<Integer, RequestHandler> remoteClients = concurrentMap();
+    // even #s were opened from services forwarded to us (our sequence)
+    // odd #s were forwarded directly to us (remote sequence)
+    private final ConcurrentIntegerMap<RequestHandler> remoteClients = concurrentIntegerMap();
     // forwarded to remote side (handled on this side)
-    private final ConcurrentMap<Integer, Handle<RequestHandler>> forwardedClients = concurrentMap();
+    private final ConcurrentIntegerMap<Handle<RequestHandler>> forwardedClients = concurrentIntegerMap();
+    // sequence for forwarded clients
+    private final AtomicInteger forwardedClientSequence = new AtomicInteger();
+    // sequence for clients created from services forwarded to us
+    private final AtomicInteger remoteClientSequence = new AtomicInteger();
 
     // services forwarded to us
-    private final ConcurrentMap<Integer, RequestHandlerSource> remoteServices = concurrentMap();
+    private final ConcurrentIntegerMap<RequestHandlerSource> remoteServices = concurrentIntegerMap();
     // forwarded to remote side (handled on this side)
-    private final ConcurrentMap<Integer, Handle<RequestHandlerSource>> forwardedServices = concurrentMap();
+    private final ConcurrentIntegerMap<Handle<RequestHandlerSource>> forwardedServices = concurrentIntegerMap();
+    // sequence for forwarded services
+    private final AtomicInteger serviceSequence = new AtomicInteger();
 
-    private final boolean server;
-    private final BufferAllocator<ByteBuffer> allocator;
-
-    private final AtomicBoolean isnew = new AtomicBoolean(true);
     private volatile AllocatedMessageChannel channel;
-    private volatile int remoteVersion;
-    private final Executor executor;
-    private final MarshallerFactory<ByteBuffer> marshallerFactory;
-    private final ObjectResolver resolver;
-    private final ClassLoader classLoader;
-    private List<String> localMarshallerList = Collections.singletonList("java-serialization");
-    private volatile String marshallerType;
-    private volatile int metric;
-    private final AtomicBoolean initialized = new AtomicBoolean(false);
 
-    public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final Executor executor, final MarshallerFactory<ByteBuffer> marshallerFactory) {
-        this.server = server;
-        this.allocator = allocator;
-        this.executor = executor;
-        final RequestHandlerImpl endpoint = new RequestHandlerImpl(0, allocator);
-        remoteClients.put(Integer.valueOf(0), endpoint);
-        this.marshallerFactory = marshallerFactory;
-        // todo
-        resolver = IdentityResolver.getInstance();
-        classLoader = getClass().getClassLoader();
+    public BasicHandler(final RemotingChannelConfiguration configuration) {
+        allocator = configuration.getAllocator();
+        executor = configuration.getExecutor();
+        classLoader = configuration.getClassLoader();
+        marshallerFactory = configuration.getMarshallerFactory();
+        linkMetric = configuration.getLinkMetric();
     }
 
-    /**
-     * Sequence number of requests originating locally.
-     */
-    private final AtomicInteger localRequestIdSeq = new AtomicInteger();
-    /**
-     * Sequence number of local clients forwarded to the remote side.
-     */
-    private final AtomicInteger localClientIdSeq = new AtomicInteger();
-    /**
-     * Sequence number of remote clients opened locally from services from the remote side.
-     */
-    private final AtomicInteger remoteClientIdSeq = new AtomicInteger();
-
     public void handleOpened(final AllocatedMessageChannel channel) {
-        if (isnew.getAndSet(false)) {
-            this.channel = channel;
-        }
-        final List<ByteBuffer> bufferList = arrayList();
-        ByteBuffer buffer = allocator.allocate();
-        buffer.put((byte) VERSION);
-        buffer.putInt(LOCAL_VERSION);
-        String joinedList = CollectionUtil.join(",", localMarshallerList);
-        for (;;) {
-            int i = writeUTFZ(buffer, joinedList);
-            if (i == -1) {
-                break;
-            }
-            bufferList.add(flip(buffer));
-            joinedList = joinedList.substring(i);
-            buffer = allocator.allocate();
-        }
-        bufferList.add(flip(buffer));
-        try {
-            registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
-        } catch (InterruptedException e) {
-            log.error("Interrupted while sending intial version message");
-            IoUtils.safeClose(channel);
-            Thread.currentThread().interrupt();
-            return;
-        }
         channel.resumeReads();
     }
 
     public void handleReadable(final AllocatedMessageChannel channel) {
         for (;;) try {
-            final ByteBuffer buffer = channel.receive();
+            final ByteBuffer buffer;
+            try {
+                buffer = channel.receive();
+            } catch (IOException e) {
+                log.error(e, "I/O error in protocol channel; closing channel");
+                IoUtils.safeClose(channel);
+                return;
+            }
             if (buffer == null) {
                 // todo release all handles...
+                // todo what if the write queue is not empty?
                 IoUtils.safeClose(channel);
                 return;
             }
@@ -172,145 +130,188 @@
                 channel.resumeReads();
                 return;
             }
-            int msgType = buffer.get() & 0xff;
-            if (initialized.getAndSet(true) != (msgType != 0)) {
-                log.error("Expected a version message; closing connection");
-                IoUtils.safeClose(channel);
+            final MessageType msgType;
+            try {
+                msgType = MessageType.getMessageType(buffer.get() & 0xff);
+            } catch (IllegalArgumentException ex) {
+                log.trace("Received invalid message type");
                 return;
             }
-            log.trace("Received message %s, type %d", buffer, Integer.valueOf(msgType));
+            log.trace("Received message %s, type %s", buffer, msgType);
             switch (msgType) {
-                case VERSION: {
-                    // participants always choose the lowest version number
-                    // since we only support one version (0), we don't do anything with the value
-                    buffer.getInt();
-                    // Select the client's most preferred marshaling method that the server supports
-                    final String marshallerList = readUTFZ(buffer);
-                    final Iterable<String> remoteMarshallerList = CollectionUtil.split(",", marshallerList);
-                    final Iterable<String> clientList = server ? remoteMarshallerList : localMarshallerList;
-                    final Iterable<String> serverList = server ? localMarshallerList : remoteMarshallerList;
-                    for (final String clientSuggestion : clientList) {
-                        for (final String serverSuggestion : serverList) {
-                            if (clientSuggestion.equals(serverSuggestion)) {
-                                marshallerType = clientSuggestion;
-                                log.trace("Chose marshaller type '%s'", marshallerType);
-                            }
-                        }
-                    }
-                    if (marshallerType == null) {
-                        log.error("Could not agree on a marshaller type; closing connection");
-                        IoUtils.safeClose(channel);
-                        return;
-                    }
-                    break;
-                }
                 case REQUEST_ONEWAY: {
                     final int clientId = buffer.getInt();
-                    final Handle<RequestHandler> handle = getForwardedClient(clientId);
+                    final Handle<RequestHandler> handle = forwardedClients.get(clientId);
                     if (handle == null) {
                         log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
                         return;
                     }
-                    final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
-                    if (! unmarshaller.unmarshal(buffer)) {
-                        log.trace("Incomplete one-way request for client ID %d", Integer.valueOf(clientId));
-                        break;
-                    }
                     final Object payload;
                     try {
-                        payload = unmarshaller.get();
-                    } catch (ClassNotFoundException e) {
-                        log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
+                        try {
+                            unmarshaller.start(createByteInput(buffer, true));
+                            try {
+                                payload = unmarshaller.readObject();
+                            } catch (ClassNotFoundException e) {
+                                log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
+                                break;
+                            }
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (IOException ex) {
+                        log.error(ex, "Failed to unmarshal a one-way request");
                         break;
                     }
                     final RequestHandler requestHandler = handle.getResource();
-                    requestHandler.receiveRequest(payload);
+                    try {
+                        requestHandler.receiveRequest(payload);
+                    } catch (Throwable t) {
+                        log.error(t, "One-way request handler unexpectedly threw an exception");
+                    }
                     break;
                 }
                 case REQUEST: {
                     final int clientId = buffer.getInt();
-                    final Handle<RequestHandler> handle = getForwardedClient(clientId);
+                    final Handle<RequestHandler> handle = forwardedClients.get(clientId);
                     if (handle == null) {
                         log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
                         break;
                     }
                     final int requestId = buffer.getInt();
-                    final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
-                    if (! unmarshaller.unmarshal(buffer)) {
-                        log.trace("Incomplete request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
-                        new ReplyHandlerImpl(channel, requestId, allocator).handleException("Incomplete request", null);
-                        break;
-                    }
                     final Object payload;
                     try {
-                        payload = unmarshaller.get();
-                    } catch (ClassNotFoundException e) {
-                        log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
+                        try {
+                            unmarshaller.start(createByteInput(buffer, true));
+                            try {
+                                payload = unmarshaller.readObject();
+                            } catch (ClassNotFoundException e) {
+                                log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
+                                // todo - send request receive failed message
+                                break;
+                            }
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (IOException ex) {
+                        log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
+                        // todo send a request failure message
                         break;
                     }
                     final RequestHandler requestHandler = handle.getResource();
-                    requestHandler.receiveRequest(payload, (ReplyHandler) new ReplyHandlerImpl(channel, requestId, allocator));
+                    requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel, requestId, allocator));
                     break;
                 }
                 case REPLY: {
                     final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = takeOutstandingReqeust(requestId);
+                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
                     if (replyHandler == null) {
                         log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
                         break;
                     }
-                    final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
-                    if (! unmarshaller.unmarshal(buffer)) {
-                        replyHandler.handleException("Incomplete reply", null);
-                        log.trace("Incomplete reply to request ID %d", Integer.valueOf(requestId));
-                        break;
-                    }
                     final Object payload;
                     try {
-                        payload = unmarshaller.get();
-                    } catch (ClassNotFoundException e) {
-                        replyHandler.handleException("Reply unmarshalling failed", e);
-                        log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
+                        try {
+                            unmarshaller.start(createByteInput(buffer, true));
+                            try {
+                                payload = unmarshaller.readObject();
+                            } catch (ClassNotFoundException e) {
+                                replyHandler.handleException("Reply unmarshalling failed", e);
+                                log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
+                                break;
+                            }
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (IOException ex) {
+                        log.trace("Failed to unmarshal a reply (%s), sending a ReplyException");
+                        // todo
+                        SpiUtils.safeHandleException(replyHandler, null, null);
                         break;
                     }
                     SpiUtils.safeHandleReply(replyHandler, payload);
                     break;
                 }
-                case REQUEST_FAILED: {
+                case CANCEL_REQUEST: {
                     final int requestId = buffer.getInt();
-                    final ReplyHandler replyHandler = takeOutstandingReqeust(requestId);
+                    final RemoteRequestContext context = localRequests.get(requestId);
+                    if (context != null) {
+                        context.cancel();
+                    }
+                    break;
+                }
+                case CANCEL_ACK: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = remoteRequests.get(requestId);
+                    if (replyHandler != null) {
+                        replyHandler.handleCancellation();
+                    }
+                    break;
+                }
+                case REQUEST_RECEIVE_FAILED: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
                     if (replyHandler == null) {
                         log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
                         break;
                     }
-                    final Unmarshaller<ByteBuffer> unmarshaller = marshallerFactory.createUnmarshaller(resolver, classLoader);
-                    if (! unmarshaller.unmarshal(buffer)) {
-                        replyHandler.handleException("Incomplete exception reply", null);
-                        log.trace("Incomplete exception reply to request ID %d", Integer.valueOf(requestId));
+                    final String reason = readUTFZ(buffer);
+                    // todo - throw a new ReplyException
+                    break;
+                }
+                case REQUEST_FAILED: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
                         break;
                     }
-                    final Object message;
+                    final Throwable cause;
                     try {
-                        message = unmarshaller.get();
-                    } catch (ClassNotFoundException e) {
-                        replyHandler.handleException("Exception reply unmarshalling failed", e);
-                        log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+                        final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller();
+                        try {
+                            unmarshaller.start(createByteInput(buffer, true));
+                            try {
+                                cause = (Throwable) unmarshaller.readObject();
+                            } catch (ClassNotFoundException e) {
+                                replyHandler.handleException("Exception reply unmarshalling failed", e);
+                                log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+                                break;
+                            } catch (ClassCastException e) {
+                                // todo - report a generic exception
+                                SpiUtils.safeHandleException(replyHandler, null, null);
+                                break;
+                            }
+                        } finally {
+                            IoUtils.safeClose(unmarshaller);
+                        }
+                    } catch (IOException ex) {
+                        log.trace("Failed to unmarshal an exception reply (%s), sending a generic execution exception");
+                        // todo
+                        SpiUtils.safeHandleException(replyHandler, null, null);
                         break;
                     }
-                    final Object cause;
-                    try {
-                        cause = unmarshaller.get();
-                    } catch (ClassNotFoundException e) {
-                        replyHandler.handleException("Exception reply unmarshalling failed", e);
-                        log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
+                    // todo - wrap with REE
+                    SpiUtils.safeHandleException(replyHandler, null, cause);
+                    break;
+                }
+                case REQUEST_OUTCOME_UNKNOWN: {
+                    final int requestId = buffer.getInt();
+                    final ReplyHandler replyHandler = remoteRequests.remove(requestId);
+                    if (replyHandler == null) {
+                        log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
                         break;
                     }
-                    SpiUtils.safeHandleException(replyHandler, message == null ? null : message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
+                    final String reason = readUTFZ(buffer);
+                    // todo - throw a new IndetermOutcomeEx
                     break;
                 }
                 case CLIENT_CLOSE: {
                     final int clientId = buffer.getInt();
-                    final Handle<RequestHandler> handle = takeForwardedClient(clientId);
+                    final Handle<RequestHandler> handle = forwardedClients.remove(clientId);
                     if (handle == null) {
                         log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
                         break;
@@ -321,7 +322,7 @@
                 case CLIENT_OPEN: {
                     final int serviceId = buffer.getInt();
                     final int clientId = buffer.getInt();
-                    final Handle<RequestHandlerSource> handle = registry.lookup(serviceId);
+                    final Handle<RequestHandlerSource> handle = forwardedServices.get(serviceId);
                     if (handle == null) {
                         log.warn("Received client open message for unknown service %d", Integer.valueOf(serviceId));
                         break;
@@ -332,14 +333,21 @@
                         // todo check for duplicate
                         // todo validate the client ID
                         log.trace("Opening client %d from service %d", Integer.valueOf(clientId), Integer.valueOf(serviceId));
-                        forwardedClients.put(Integer.valueOf(clientId), clientHandle);
+                        forwardedClients.put(clientId, clientHandle);
+                    } catch (IOException ex) {
+                        log.error(ex, "Failed to create a request handler for client ID %d", Integer.valueOf(clientId));
+                        break;
                     } finally {
                         IoUtils.safeClose(handle);
                     }
                     break;
                 }
                 case SERVICE_CLOSE: {
-                    registry.unbind(buffer.getInt());
+                    final Handle<RequestHandlerSource> handle = forwardedServices.remove(buffer.getInt());
+                    if (handle == null) {
+                        break;
+                    }
+                    IoUtils.safeClose(handle);
                     break;
                 }
                 case SERVICE_ADVERTISE: {
@@ -348,37 +356,31 @@
                     final String groupName = readUTFZ(buffer);
                     final String endpointName = readUTFZ(buffer);
                     final int baseMetric = buffer.getInt();
-                    Endpoint endpoint;
-                    int id;
+                    Endpoint endpoint = null;
+                    int id = -1;
                     final RequestHandlerSource handlerSource = new RequestHandlerSourceImpl(allocator, id);
-                    final int calcMetric = baseMetric + metric;
+                    final int calcMetric = baseMetric + linkMetric;
                     if (calcMetric > 0) {
-                        endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource, calcMetric);
+                        try {
+                            final SimpleCloseable closeable = endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource, calcMetric);
+                            // todo - something with that closeable
+                        } catch (IOException e) {
+                            log.error(e, "Unable to register remote service");
+                        }
                     }
                     break;
                 }
                 case SERVICE_UNADVERTISE: {
                     final int serviceId = buffer.getInt();
-                    IoUtils.safeClose(remoteServices.get(Integer.valueOf(serviceId)));
+                    IoUtils.safeClose(remoteServices.get(serviceId));
                     break;
                 }
                 default: {
-                    log.trace("Received invalid message type %d", Integer.valueOf(msgType));
+                    log.trace("Received invalid message type %s", msgType);
                 }
             }
-        } catch (IOException e) {
-            log.error(e, "I/O error in protocol channel");
-            IoUtils.safeClose(channel);
-            return;
         } catch (BufferUnderflowException e) {
             log.error(e, "Malformed packet");
-//        } catch (InterruptedException e) {
-//            log.error(e, "Read thread interrupted, closing channel");
-//            IoUtils.safeClose(channel);
-//            Thread.currentThread().interrupt();
-//            return;
-        } catch (Throwable t) {
-            log.error(t, "Handler failed");
         }
     }
 
@@ -407,10 +409,6 @@
     public void handleClosed(final AllocatedMessageChannel channel) {
     }
 
-    RequestHandler getRemoteClient(final int i) {
-        return remoteClients.get(Integer.valueOf(i));
-    }
-
     RequestHandlerSource getRemoteService(final int id) {
         return new RequestHandlerSourceImpl(allocator, id);
     }
@@ -435,56 +433,65 @@
 
         public void handleReply(final Object reply) {
             ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) REPLY);
+            buffer.put((byte) MessageType.REPLY.getId());
             buffer.putInt(requestId);
             try {
-                final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(resolver);
-                marshaller.start(reply);
-                final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
-                while (! marshaller.marshal(buffer)) {
-                    bufferList.add(flip(buffer));
-                    buffer = allocator.allocate();
+                final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller();
+                try {
+                    final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+                    final ByteOutput output = createByteOutput(allocator, bufferList);
+                    try {
+                        marshaller.start(output);
+                        marshaller.writeObject(reply);
+                        marshaller.close();
+                        output.close();
+                        registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+                    } finally {
+                        IoUtils.safeClose(output);
+                    }
+                } finally {
+                    IoUtils.safeClose(marshaller);
                 }
-                bufferList.add(flip(buffer));
-                registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
             } catch (IOException e) {
-                // todo log
+                log.error(e, "Failed to send a reply to the remote side");
             } catch (InterruptedException e) {
-                // todo log
+                log.error(e, "Reply handler thread interrupted before a reply could be sent");
                 Thread.currentThread().interrupt();
             }
         }
 
         public void handleException(final String msg, final Throwable cause) {
             ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) REQUEST_FAILED);
+            buffer.put((byte) MessageType.REQUEST_FAILED.getId());
             buffer.putInt(requestId);
             try {
-                final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(resolver);
-                final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
-                marshaller.start(msg);
-                while (! marshaller.marshal(buffer)) {
-                    bufferList.add(flip(buffer));
-                    buffer = allocator.allocate();
+                final org.jboss.marshalling.Marshaller marshaller = marshallerFactory.createMarshaller();
+                try {
+                    final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
+                    final ByteOutput output = createByteOutput(allocator, bufferList);
+                    try {
+                        marshaller.start(output);
+                        marshaller.writeObject(cause);
+                        marshaller.close();
+                        output.close();
+                        registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+                    } finally {
+                        IoUtils.safeClose(output);
+                    }
+                } finally {
+                    IoUtils.safeClose(marshaller);
                 }
-                marshaller.start(cause);
-                while (! marshaller.marshal(buffer)) {
-                    bufferList.add(flip(buffer));
-                    buffer = allocator.allocate();
-                }
-                bufferList.add(flip(buffer));
-                registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
             } catch (IOException e) {
-                // todo log
+                log.error(e, "Failed to send an exception to the remote side");
             } catch (InterruptedException e) {
-                // todo log
+                log.error(e, "Reply handler thread interrupted before an exception could be sent");
                 Thread.currentThread().interrupt();
             }
         }
 
         public void handleCancellation() {
             final ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) CANCEL_ACK);
+            buffer.put((byte) MessageType.CANCEL_ACK.getId());
             buffer.putInt(requestId);
             buffer.flip();
             try {
@@ -496,45 +503,6 @@
         }
     }
 
-    // Session mgmt
-
-    public int openRequest(ReplyHandler handler) {
-        int id;
-        do {
-            id = localRequestIdSeq.getAndIncrement();
-        } while (outstandingRequests.putIfAbsent(Integer.valueOf(id), handler) != null);
-        return id;
-    }
-
-    public int openClientFromService() {
-        int id;
-        do {
-            id = remoteClientIdSeq.getAndIncrement() << 1 | (server ? 1 : 0);
-        } while (remoteClients.putIfAbsent(Integer.valueOf(id), new RequestHandlerImpl(id, allocator)) != null);
-        return id;
-    }
-
-    public void openClientForForwardedService(int id, RequestHandler clientEndpoint) {
-        try {
-            forwardedClients.put(Integer.valueOf(id), clientEndpoint.getHandle());
-        } catch (IOException e) {
-            // TODO fix
-            e.printStackTrace();
-        }
-    }
-
-    public Handle<RequestHandler> getForwardedClient(int id) {
-        return forwardedClients.get(Integer.valueOf(id));
-    }
-
-    private Handle<RequestHandler> takeForwardedClient(final int id) {
-        return forwardedClients.remove(Integer.valueOf(id));
-    }
-
-    public ReplyHandler takeOutstandingReqeust(int id) {
-        return outstandingRequests.remove(Integer.valueOf(id));
-    }
-
     // Writer members
 
     private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
@@ -655,8 +623,9 @@
             this.allocator = allocator;
             addCloseHandler(new CloseHandler<RequestHandler>() {
                 public void handleClose(final RequestHandler closed) {
+                    remoteClients.remove(identifier, this);
                     ByteBuffer buffer = allocator.allocate();
-                    buffer.put((byte) MessageType.CLIENT_CLOSE);
+                    buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
                     buffer.putInt(identifier);
                     buffer.flip();
                     try {
@@ -671,17 +640,23 @@
         public void receiveRequest(final Object request) {
             log.trace("Sending outbound one-way request of type %s", request == null ? "null" : request.getClass());
             try {
-                final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(null);
-                final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
-                ByteBuffer buffer = allocator.allocate();
-                buffer.put((byte) MessageType.REQUEST_ONEWAY);
-                buffer.putInt(identifier);
-                marshaller.start(request);
-                while (! marshaller.marshal(buffer)) {
-                    bufferList.add(flip(buffer));
-                    buffer = allocator.allocate();
+                final List<ByteBuffer> bufferList;
+                final Marshaller marshaller = marshallerFactory.createMarshaller();
+                try {
+                    bufferList = new ArrayList<ByteBuffer>();
+                    final ByteOutput output = createByteOutput(allocator, bufferList);
+                    try {
+                        marshaller.write(MessageType.REQUEST_ONEWAY.getId());
+                        marshaller.writeInt(identifier);
+                        marshaller.writeObject(request);
+                        marshaller.close();
+                        output.close();
+                    } finally {
+                        IoUtils.safeClose(output);
+                    }
+                } finally {
+                    IoUtils.safeClose(marshaller);
                 }
-                bufferList.add(flip(buffer));
                 try {
                     registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
                 } catch (InterruptedException e) {
@@ -699,33 +674,43 @@
         public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
             log.trace("Sending outbound request of type %s", request == null ? "null" : request.getClass());
             try {
-                final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(null);
-                final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
-                ByteBuffer buffer = allocator.allocate();
-                buffer.put((byte) MessageType.REQUEST);
-                buffer.putInt(identifier);
-                final int id = openRequest(handler);
-                buffer.putInt(id);
-                marshaller.start(request);
-                while (! marshaller.marshal(buffer)) {
-                    bufferList.add(flip(buffer));
-                    buffer = allocator.allocate();
-                }
-                bufferList.add(flip(buffer));
+                final List<ByteBuffer> bufferList;
+                final Marshaller marshaller = marshallerFactory.createMarshaller();
                 try {
-                    registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    executor.execute(new Runnable() {
-                        public void run() {
-                            SpiUtils.safeHandleCancellation(handler);
+                    bufferList = new ArrayList<ByteBuffer>();
+                    final ByteOutput output = createByteOutput(allocator, bufferList);
+                    try {
+                        marshaller.write(MessageType.REQUEST.getId());
+                        marshaller.writeInt(identifier);
+
+                        int id;
+                        do {
+                            id = requestSequence.getAndIncrement();
+                        } while (remoteRequests.putIfAbsent(id, handler) != null);
+                        marshaller.writeInt(id);
+                        marshaller.writeObject(request);
+                        marshaller.close();
+                        output.close();
+                        try {
+                            registerWriter(channel, new SimpleWriteHandler(allocator, bufferList));
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                            executor.execute(new Runnable() {
+                                public void run() {
+                                    SpiUtils.safeHandleCancellation(handler);
+                                }
+                            });
+                            return SpiUtils.getBlankRemoteRequestContext();
                         }
-                    });
-                    return SpiUtils.getBlankRemoteRequestContext();
+                        log.trace("Sent request %s", request);
+                        return new RemoteRequestContextImpl(id, allocator, channel);
+                    } finally {
+                        IoUtils.safeClose(output);
+                    }
+                } finally {
+                    IoUtils.safeClose(marshaller);
                 }
-                log.trace("Sent request %s", request);
-                return new RemoteRequestContextImpl(id, allocator, channel);
-            } catch (final Throwable t) {
+            } catch (final IOException t) {
                 log.trace(t, "receiveRequest failed with an exception");
                 executor.execute(new Runnable() {
                     public void run() {
@@ -756,7 +741,7 @@
         public void cancel() {
             try {
                 final ByteBuffer buffer = allocator.allocate();
-                buffer.put((byte) MessageType.CANCEL_REQUEST);
+                buffer.put((byte) MessageType.CANCEL_REQUEST.getId());
                 buffer.putInt(id);
                 buffer.flip();
                 registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
@@ -781,7 +766,7 @@
             addCloseHandler(new CloseHandler<RequestHandlerSource>() {
                 public void handleClose(final RequestHandlerSource closed) {
                     ByteBuffer buffer = allocator.allocate();
-                    buffer.put((byte) MessageType.SERVICE_CLOSE);
+                    buffer.put((byte) MessageType.SERVICE_CLOSE.getId());
                     buffer.putInt(identifier);
                     buffer.flip();
                     try {
@@ -794,9 +779,13 @@
         }
 
         public Handle<RequestHandler> createRequestHandler() throws IOException {
-            final int clientId = openClientFromService();
+            int id;
+            do {
+                id = remoteClientSequence.getAndIncrement() << 1;
+            } while (remoteClients.putIfAbsent(id, new RequestHandlerImpl(id, BasicHandler.this.allocator)) != null);
+            final int clientId = id;
             final ByteBuffer buffer = allocator.allocate();
-            buffer.put((byte) MessageType.CLIENT_OPEN);
+            buffer.put((byte) MessageType.CLIENT_OPEN.getId());
             buffer.putInt(identifier);
             buffer.putInt(clientId);
             buffer.flip();
@@ -822,4 +811,105 @@
             return "forwarding request handler source <" + Integer.toString(hashCode(), 16) + "> (id = " + identifier + ")";
         }
     }
+
+    public static ByteInput createByteInput(final ByteBuffer buffer, final boolean eof) {
+        return new ByteInput() {
+            public int read() throws IOException {
+                if (buffer.hasRemaining()) {
+                    return buffer.get() & 0xff;
+                } else {
+                    return eof ? -1 : 0;
+                }
+            }
+
+            public int read(final byte[] b) throws IOException {
+                return read(b, 0, b.length);
+            }
+
+            public int read(final byte[] b, final int off, final int len) throws IOException {
+                int r = Math.min(buffer.remaining(), len);
+                if (r > 0) {
+                    buffer.get(b, off, r);
+                    return r;
+                } else {
+                    return eof ? -1 : 0;
+                }
+            }
+
+            public int available() throws IOException {
+                return buffer.remaining();
+            }
+
+            public long skip(final long n) throws IOException {
+                final int cnt = n > (long) Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) n;
+                int r = Math.min(buffer.remaining(), cnt);
+                if (r > 0) {
+                    final int oldPos = buffer.position();
+                    final int newPos = oldPos + r;
+                    if (newPos < 0) {
+                        final int lim = buffer.limit();
+                        buffer.position(lim);
+                        return lim - oldPos;
+                    }
+                }
+                return r;
+            }
+
+            public void close() {
+            }
+        };
+    }
+
+    public static ByteOutput createByteOutput(final BufferAllocator<ByteBuffer> allocator, final Collection<ByteBuffer> target) {
+        return new ByteOutput() {
+            private ByteBuffer current;
+
+            private ByteBuffer getCurrent() {
+                final ByteBuffer buffer = current;
+                return buffer == null ? (current = allocator.allocate()) : buffer;
+            }
+
+            public void write(final int i) throws IOException {
+                final ByteBuffer buffer = getCurrent();
+                buffer.put((byte) i);
+                if (! buffer.hasRemaining()) {
+                    buffer.flip();
+                    target.add(buffer);
+                    current = null;
+                }
+            }
+
+            public void write(final byte[] bytes) throws IOException {
+                write(bytes, 0, bytes.length);
+            }
+
+            public void write(final byte[] bytes, int offs, int len) throws IOException {
+                while (len > 0) {
+                    final ByteBuffer buffer = getCurrent();
+                    final int c = Math.min(len, buffer.remaining());
+                    buffer.put(bytes, offs, c);
+                    offs += c;
+                    len -= c;
+                    if (! buffer.hasRemaining()) {
+                        buffer.flip();
+                        target.add(buffer);
+                        current = null;
+                    }
+                }
+            }
+
+            public void close() throws IOException {
+                flush();
+            }
+
+            public void flush() throws IOException {
+                final ByteBuffer buffer = current;
+                if (buffer != null) {
+                    buffer.flip();
+                    target.add(buffer);
+                    current = null;
+                }
+            }
+        };
+    }
 }

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java	2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -26,7 +26,6 @@
 import org.jboss.cx.remoting.SimpleCloseable;
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.core.marshal.JavaSerializationMarshallerFactory;
 import org.jboss.xnio.IoHandlerFactory;
 import org.jboss.xnio.ChannelSource;
 import org.jboss.xnio.IoFuture;
@@ -59,7 +58,11 @@
     public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final BufferAllocator<ByteBuffer> allocator) {
         return new IoHandlerFactory<AllocatedMessageChannel>() {
             public IoHandler<? super AllocatedMessageChannel> createHandler() {
-                return new BasicHandler(true, allocator, executor, new JavaSerializationMarshallerFactory(executor));
+                final RemotingChannelConfiguration configuration = new RemotingChannelConfiguration();
+                configuration.setAllocator(allocator);
+                configuration.setExecutor(executor);
+                // todo marshaller factory... etc
+                return new BasicHandler(configuration);
             }
         };
     }
@@ -74,7 +77,11 @@
      * @throws IOException if an error occurs
      */
     public static IoFuture<SimpleCloseable> connect(final Executor executor, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
-        final BasicHandler basicHandler = new BasicHandler(false, allocator, executor, new JavaSerializationMarshallerFactory(executor));
+        final RemotingChannelConfiguration configuration = new RemotingChannelConfiguration();
+        configuration.setAllocator(allocator);
+        configuration.setExecutor(executor);
+        // todo marshaller factory... etc
+        final BasicHandler basicHandler = new BasicHandler(configuration);
         final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(basicHandler);
         return new AbstractConvertingIoFuture<SimpleCloseable, AllocatedMessageChannel>(futureChannel) {
             protected SimpleCloseable convert(final AllocatedMessageChannel channel) throws RemotingException {

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConfigValue.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConfigValue.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConfigValue.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+/**
+ *
+ */
+public enum ConfigValue {
+
+    /**
+     * The protocol version to use.  Value type is {@code int}.
+     */
+    PROTOCOL_VERSION(0),
+    /**
+     * The name of the marshaller to use.  Value type is {@code String}.
+     */
+    MARSHALLER_NAME(1),
+    ;
+    private final int id;
+
+    private ConfigValue(final int id) {
+        this.id = id;
+    }
+
+    /**
+     * Get the integer ID for this config value.
+     *
+     * @return the integer ID
+     */
+    public int getId() {
+        return id;
+    }
+
+    /**
+     * Get the config value for an integer ID.
+     *
+     * @param id the integer ID
+     * @return the config value instance
+     */
+    public static ConfigValue getConfigValue(final int id) {
+        switch (id) {
+            case 0: return PROTOCOL_VERSION;
+            case 1: return MARSHALLER_NAME;
+            default: throw new IllegalArgumentException("Invalid config value ID");
+        }
+    }
+}

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java	2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -23,34 +23,69 @@
 package org.jboss.cx.remoting.protocol.basic;
 
 /**
- *
+ * The type of a protocol message.
  */
-public final class MessageType {
-    // Initial version & marshaller negotiation
-    public static final int VERSION            = 0;
+public enum MessageType {
+
     // One-way request, no return value may be sent
-    public static final int REQUEST_ONEWAY     = 1;
+    REQUEST_ONEWAY(1),
     // Two-way request, return value is expected
-    public static final int REQUEST            = 2;
+    REQUEST(2),
     // Reply
-    public static final int REPLY              = 3;
+    REPLY(3),
     // Attempt to cancel a request
-    public static final int CANCEL_REQUEST     = 4;
+    CANCEL_REQUEST(4),
     // Acknowledge that a request was cancelled
-    public static final int CANCEL_ACK         = 5;
+    CANCEL_ACK(5),
+    // Request failed due to protocol or unmarshalling problem
+    REQUEST_RECEIVE_FAILED(6),
     // Request failed due to exception
-    public static final int REQUEST_FAILED     = 6;
+    REQUEST_FAILED(7),
+    // Request completed but no reply or exception was sent
+    REQUEST_OUTCOME_UNKNOWN(8),
     // Remote side called .close() on a forwarded RequestHandler
-    public static final int CLIENT_CLOSE       = 7;
+    CLIENT_CLOSE(9),
     // Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
-    public static final int CLIENT_OPEN        = 8;
+    CLIENT_OPEN(10),
     // Remote side called .close() on a forwarded RequestHandlerSource
-    public static final int SERVICE_CLOSE      = 9;
+    SERVICE_CLOSE(11),
     // Remote side brought a new service online
-    public static final int SERVICE_ADVERTISE  = 10;
+    SERVICE_ADVERTISE(12),
     // Remote side's service is no longer available
-    public static final int SERVICE_UNADVERTISE= 11;
+    SERVICE_UNADVERTISE(13),
+    ;
+    private final int id;
 
-    private MessageType() {
+    private MessageType(int id) {
+        this.id = id;
     }
+
+    public int getId() {
+        return id;
+    }
+
+    /**
+     * Get the message type for an integer ID.
+     *
+     * @param id the integer ID
+     * @return the message type instance
+     */
+    public static MessageType getMessageType(final int id) {
+        switch (id) {
+            case 1: return REQUEST_ONEWAY;
+            case 2: return REQUEST;
+            case 3: return REPLY;
+            case 4: return CANCEL_REQUEST;
+            case 5: return CANCEL_ACK;
+            case 6: return REQUEST_RECEIVE_FAILED;
+            case 7: return REQUEST_FAILED;
+            case 8: return REQUEST_OUTCOME_UNKNOWN;
+            case 9: return CLIENT_CLOSE;
+            case 10: return CLIENT_OPEN;
+            case 11: return SERVICE_CLOSE;
+            case 12: return SERVICE_ADVERTISE;
+            case 13: return SERVICE_UNADVERTISE;
+            default: throw new IllegalArgumentException("Invalid message type ID");
+        }
+    }
 }

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/RemotingChannelConfiguration.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/RemotingChannelConfiguration.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/RemotingChannelConfiguration.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import java.util.concurrent.Executor;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.marshalling.MarshallerFactory;
+
+/**
+ *
+ */
+public final class RemotingChannelConfiguration {
+    private MarshallerFactory marshallerFactory;
+    private int linkMetric;
+    private Executor executor;
+    private ClassLoader classLoader;
+    private BufferAllocator<ByteBuffer> allocator;
+
+    public RemotingChannelConfiguration() {
+    }
+
+    public MarshallerFactory getMarshallerFactory() {
+        return marshallerFactory;
+    }
+
+    public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+        this.marshallerFactory = marshallerFactory;
+    }
+
+    public int getLinkMetric() {
+        return linkMetric;
+    }
+
+    public void setLinkMetric(final int linkMetric) {
+        this.linkMetric = linkMetric;
+    }
+
+    public Executor getExecutor() {
+        return executor;
+    }
+
+    public void setExecutor(final Executor executor) {
+        this.executor = executor;
+    }
+
+    public ClassLoader getClassLoader() {
+        return classLoader;
+    }
+
+    public void setClassLoader(final ClassLoader classLoader) {
+        this.classLoader = classLoader;
+    }
+
+    public BufferAllocator<ByteBuffer> getAllocator() {
+        return allocator;
+    }
+
+    public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
+        this.allocator = allocator;
+    }
+}

Modified: remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -84,73 +84,6 @@
                 endpoint.setExecutor(closeableExecutor);
                 endpoint.start();
                 try {
-                    try {
-                        final Handle<RequestHandlerSource> requestHandlerSourceHandle = endpoint.createRequestHandlerSource(new AbstractRequestListener<Object, Object>() {
-                            public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
-                                try {
-                                    context.sendReply(REPLY);
-                                } catch (IOException e) {
-                                    problems.add(e);
-                                }
-                            }
-                        }, null, null);
-                        try {
-                            serviceRegistry.bind(requestHandlerSourceHandle.getResource(), 13);
-                            final IoHandlerFactory<AllocatedMessageChannel> handlerFactory = BasicProtocol.createServer(closeableExecutor, allocator);
-                            final IoHandlerFactory<StreamChannel> newHandlerFactory = Channels.convertStreamToAllocatedMessage(handlerFactory, 32768, 32768);
-                            final Closeable tcpServerCloseable = xnio.createTcpServer(newHandlerFactory, new InetSocketAddress(12345)).create();
-                            try {
-                                final CloseableTcpConnector connector = xnio.createTcpConnector().create();
-                                try {
-                                    final TcpClient tcpClient = connector.createChannelSource(new InetSocketAddress("localhost", 12345));
-                                    final ChannelSource<AllocatedMessageChannel> channelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
-                                    final IoFuture<SimpleCloseable> futureCloseable = BasicProtocol.connect(closeableExecutor, channelSource, allocator);
-                                    final SimpleCloseable connection = futureCloseable.get();
-                                    try {
-                                        final Handle<RequestHandlerSource> handleThirteen = connection.getServiceForId(13);
-                                        try {
-                                            final ClientSource<Object,Object> clientSource = endpoint.createClientSource(handleThirteen.getResource());
-                                            try {
-                                                final Client<Object,Object> client = clientSource.createClient();
-                                                try {
-                                                    final IoFuture<Object> future = client.send(REQUEST);
-                                                    assertEquals(IoFuture.Status.DONE, future.await(TimeUnit.MILLISECONDS, 500L));
-                                                    assertEquals(REPLY, future.get());
-                                                    client.close();
-                                                    clientSource.close();
-                                                    handleThirteen.close();
-                                                    connection.close();
-                                                    connector.close();
-                                                    tcpServerCloseable.close();
-                                                    requestHandlerSourceHandle.close();
-                                                    serviceRegistry.clear();
-                                                    endpoint.stop();
-                                                    xnio.close();
-                                                    closeableExecutor.close();
-                                                } finally {
-                                                    IoUtils.safeClose(client);
-                                                }
-                                            } finally {
-                                                IoUtils.safeClose(clientSource);
-                                            }
-                                        } finally {
-                                            IoUtils.safeClose(handleThirteen);
-                                        }
-                                    } finally {
-                                        IoUtils.safeClose(connection);
-                                    }
-                                } finally {
-                                    IoUtils.safeClose(connector);
-                                }
-                            } finally {
-                                IoUtils.safeClose(tcpServerCloseable);
-                            }
-                        } finally {
-                            IoUtils.safeClose(requestHandlerSourceHandle);
-                        }
-                    } finally {
-                        serviceRegistry.clear();
-                    }
                 } finally {
                     endpoint.stop();
                 }

Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java	2008-09-09 08:55:28 UTC (rev 4564)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/CollectionUtil.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -63,6 +63,10 @@
         return new ConcurrentReferenceHashMap<K, V>(16, STRONG, WEAK);
     }
 
+    public static <V> ConcurrentIntegerMap<V> concurrentIntegerMap() {
+        return new EmulatedConcurrentIntegerHashMap<V>(CollectionUtil.<Integer, V>concurrentMap());
+    }
+
     /**
      * Create a synchronized map that obeys the contract for {@code ConcurrentMap}.
      *

Added: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentIntegerMap.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentIntegerMap.java	                        (rev 0)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ConcurrentIntegerMap.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.util;
+
+import java.util.Set;
+import java.util.Collection;
+
+/**
+ *
+ */
+public interface ConcurrentIntegerMap<V> {
+    boolean containsKey(int key);
+
+    boolean containsValue(Object value);
+
+    V get(int key);
+
+    V put(int key, V value);
+
+    V putIfAbsent(int key, V value);
+
+    void putAll(ConcurrentIntegerMap<? extends V> m);
+
+    V remove(int key);
+
+    boolean remove(int key, Object oldValue);
+
+    V replace(int key, V value);
+
+    boolean replace(int key, V oldValue, V newValue);
+
+    void clear();
+
+    int size();
+
+    boolean isEmpty();
+
+    Set<Entry<V>> entrySet();
+
+    Collection<V> values();
+
+    boolean equals(Object other);
+
+    int hashCode();
+
+    interface Entry<V> {
+        int getKey();
+
+        V getValue();
+
+        V setValue(V value);
+
+        int hashCode();
+
+        boolean equals(Object other);
+    }
+}

Added: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/EmulatedConcurrentIntegerHashMap.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/EmulatedConcurrentIntegerHashMap.java	                        (rev 0)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/EmulatedConcurrentIntegerHashMap.java	2008-09-09 14:05:02 UTC (rev 4565)
@@ -0,0 +1,197 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.util;
+
+import java.util.Set;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ *
+ */
+public final class EmulatedConcurrentIntegerHashMap<V> implements ConcurrentIntegerMap<V> {
+
+    private final ConcurrentMap<Integer, V> delegate;
+
+    public EmulatedConcurrentIntegerHashMap(final ConcurrentMap<Integer, V> delegate) {
+        this.delegate = delegate;
+    }
+
+    public boolean containsKey(final int key) {
+        return delegate.containsKey(Integer.valueOf(key));
+    }
+
+    public boolean containsValue(final Object value) {
+        return delegate.containsValue(value);
+    }
+
+    public V get(final int key) {
+        return delegate.get(Integer.valueOf(key));
+    }
+
+    public V put(final int key, final V value) {
+        return delegate.put(Integer.valueOf(key), value);
+    }
+
+    public V putIfAbsent(final int key, final V value) {
+        return delegate.putIfAbsent(Integer.valueOf(key), value);
+    }
+
+    public void putAll(final ConcurrentIntegerMap<? extends V> m) {
+        throw new UnsupportedOperationException("maybe later");
+    }
+
+    public V remove(final int key) {
+        return delegate.remove(Integer.valueOf(key));
+    }
+
+    public boolean remove(final int key, final Object oldValue) {
+        return delegate.remove(Integer.valueOf(key), oldValue);
+    }
+
+    public V replace(final int key, final V value) {
+        return delegate.replace(Integer.valueOf(key), value);
+    }
+
+    public boolean replace(final int key, final V oldValue, final V newValue) {
+        return delegate.replace(Integer.valueOf(key), oldValue, newValue);
+    }
+
+    public void clear() {
+        delegate.clear();
+    }
+
+    public int size() {
+        return delegate.size();
+    }
+
+    public boolean isEmpty() {
+        return delegate.isEmpty();
+    }
+
+    public Set<Entry<V>> entrySet() {
+        return new EmulatedEntrySet<V>(delegate.entrySet());
+    }
+
+    public Collection<V> values() {
+        return delegate.values();
+    }
+
+    public boolean equals(final Object obj) {
+        return super.equals(obj);
+    }
+
+    public int hashCode() {
+        return super.hashCode();
+    }
+
+    private static class EmulatedEntrySet<V> implements Set<Entry<V>> {
+
+        private final Set<Map.Entry<Integer, V>> delegate;
+
+        public EmulatedEntrySet(final Set<Map.Entry<Integer,V>> delegate) {
+            this.delegate = delegate;
+        }
+
+        public int size() {
+            return delegate.size();
+        }
+
+        public boolean isEmpty() {
+            return delegate.isEmpty();
+        }
+
+        public boolean contains(final Object o) {
+            // todo
+            return false;
+        }
+
+        public Iterator<Entry<V>> iterator() {
+            final Iterator<Map.Entry<Integer, V>> i = delegate.iterator();
+            return new Iterator<Entry<V>>() {
+                public boolean hasNext() {
+                    return i.hasNext();
+                }
+
+                public Entry<V> next() {
+                    final Map.Entry<Integer, V> n = i.next();
+                    return new Entry<V>() {
+                        public int getKey() {
+                            return n.getKey().intValue();
+                        }
+
+                        public V getValue() {
+                            return n.getValue();
+                        }
+
+                        public V setValue(final V value) {
+                            return n.setValue(value);
+                        }
+                    };
+                }
+
+                public void remove() {
+                    i.remove();
+                }
+            };
+        }
+
+        public Object[] toArray() {
+            throw new UnsupportedOperationException("maybe later");
+        }
+
+        public <T> T[] toArray(final T[] a) {
+            throw new UnsupportedOperationException("maybe later");
+        }
+
+        public boolean add(final Entry<V> o) {
+            throw new UnsupportedOperationException("add() not supported");
+        }
+
+        public boolean remove(final Object o) {
+            throw new UnsupportedOperationException("maybe later");
+        }
+
+        public boolean containsAll(final Collection<?> c) {
+            throw new UnsupportedOperationException("maybe later");
+        }
+
+        public boolean addAll(final Collection<? extends Entry<V>> c) {
+            throw new UnsupportedOperationException("addAll() not supported");
+        }
+
+        public boolean retainAll(final Collection<?> c) {
+            throw new UnsupportedOperationException("maybe later");
+        }
+
+        public boolean removeAll(final Collection<?> c) {
+            throw new UnsupportedOperationException("maybe later");
+        }
+
+        public void clear() {
+            delegate.clear();
+        }
+    }
+}




More information about the jboss-remoting-commits mailing list