[jboss-remoting-commits] JBoss Remoting SVN: r5816 - in remoting3/trunk/jboss-remoting/src: main/java/org/jboss/remoting3/remote and 3 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Tue Mar 9 23:36:20 EST 2010


Author: david.lloyd at jboss.com
Date: 2010-03-09 23:36:18 -0500 (Tue, 09 Mar 2010)
New Revision: 5816

Added:
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryClassTable.java
Modified:
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRemoteRequestHandler.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryExternalizerFactory.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java
   remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
   remoting3/trunk/jboss-remoting/src/test/resources/logging.properties
Log:
Streams/object table/externalizer fixes, plus a couple more tests

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/ConnectionImpl.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -88,7 +88,7 @@
 
     public <I, O> ClientConnector<I, O> createClientConnector(final RequestListener<I, O> listener, final Class<I> requestClass, final Class<O> replyClass, final OptionMap optionMap) throws IOException {
         final ClientContextImpl context = new ClientContextImpl(getExecutor(), this);
-        final LocalRequestHandler localRequestHandler = endpoint.createLocalRequestHandler(listener, context, requestClass, replyClass, optionMap);
+        final LocalRequestHandler localRequestHandler = endpoint.createLocalRequestHandler(listener, context, requestClass, replyClass);
         final RequestHandlerConnector connector = connectionHandler.createConnector(localRequestHandler);
         context.addCloseHandler(SpiUtils.closingCloseHandler(localRequestHandler));
         return new ClientConnectorImpl<I, O>(connector, endpoint, requestClass, replyClass, context);

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/EndpointImpl.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -194,7 +194,7 @@
         }
     }
 
-    <I, O> LocalRequestHandler createLocalRequestHandler(final RequestListener<? super I, ? extends O> requestListener, final ClientContextImpl clientContext, final Class<I> requestClass, final Class<O> replyClass, final OptionMap optionMap) throws IOException {
+    <I, O> LocalRequestHandler createLocalRequestHandler(final RequestListener<? super I, ? extends O> requestListener, final ClientContextImpl clientContext, final Class<I> requestClass, final Class<O> replyClass) throws IOException {
         if (requestListener == null) {
             throw new IllegalArgumentException("requestListener is null");
         }
@@ -234,7 +234,7 @@
     }
 
     private final class ServiceBuilderImpl<I, O> implements ServiceBuilder<I, O> {
-        private String groupName;
+        private String groupName = "default";
         private String serviceType;
         private Class<I> requestType;
         private Class<O> replyType;
@@ -255,7 +255,7 @@
         @SuppressWarnings({ "unchecked" })
         public <N> ServiceBuilder<N, O> setRequestType(final Class<N> newRequestType) {
             if (newRequestType == null) {
-                throw new NullPointerException("newRequestType is null");
+                throw new IllegalArgumentException("newRequestType is null");
             }
             clientListener = null;
             ServiceBuilderImpl<N, O> castBuilder = (ServiceBuilderImpl<N, O>) this;
@@ -266,7 +266,7 @@
         @SuppressWarnings({ "unchecked" })
         public <N> ServiceBuilder<I, N> setReplyType(final Class<N> newReplyType) {
             if (newReplyType == null) {
-                throw new NullPointerException("newReplyType is null");
+                throw new IllegalArgumentException ("newReplyType is null");
             }
             clientListener = null;
             ServiceBuilderImpl<I, N> castBuilder = (ServiceBuilderImpl<I, N>) this;
@@ -289,7 +289,7 @@
 
         public ServiceBuilder<I, O> setOptionMap(final OptionMap optionMap) {
             if (optionMap == null) {
-                throw new NullPointerException("optionMap is null");
+                throw new IllegalArgumentException ("optionMap is null");
             }
             this.optionMap = optionMap;
             return this;
@@ -301,19 +301,19 @@
                 sm.checkPermission(REGISTER_SERVICE_PERM);
             }
             if (groupName == null) {
-                throw new NullPointerException("groupName is null");
+                throw new IllegalArgumentException("groupName is null");
             }
             if (serviceType == null) {
-                throw new NullPointerException("serviceType is null");
+                throw new IllegalArgumentException("serviceType is null");
             }
             if (requestType == null) {
-                throw new NullPointerException("requestType is null");
+                throw new IllegalArgumentException("requestType is null");
             }
             if (replyType == null) {
-                throw new NullPointerException("replyType is null");
+                throw new IllegalArgumentException("replyType is null");
             }
             if (clientListener == null) {
-                throw new NullPointerException("clientListener is null");
+                throw new IllegalArgumentException("clientListener is null");
             }
             final Integer metric = optionMap.get(RemotingOptions.METRIC);
             if (metric != null && metric.intValue() < 0) {
@@ -430,13 +430,13 @@
             sm.checkPermission(CREATE_CLIENT_PERM);
         }
         if (requestHandler == null) {
-            throw new NullPointerException("requestHandler is null");
+            throw new IllegalArgumentException("requestHandler is null");
         }
         if (requestType == null) {
-            throw new NullPointerException("requestType is null");
+            throw new IllegalArgumentException("requestType is null");
         }
         if (replyType == null) {
-            throw new NullPointerException("replyType is null");
+            throw new IllegalArgumentException("replyType is null");
         }
         checkOpen();
         final ClientImpl<I, O> client = ClientImpl.create(requestHandler, executor, requestType, replyType, clientClassLoader);
@@ -534,7 +534,7 @@
     public <I, O> Client<I, O> createLocalClient(final ClientListener<I, O> clientListener, final Class<I> requestClass, final Class<O> replyClass, final ClassLoader clientClassLoader, final OptionMap optionMap) throws IOException {
         final ClientContextImpl context = new ClientContextImpl(executor, null);
         final RequestListener<I, O> requestListener = clientListener.handleClientOpen(context, optionMap);
-        final LocalRequestHandler localRequestHandler = createLocalRequestHandler(requestListener, context, requestClass, replyClass, optionMap);
+        final LocalRequestHandler localRequestHandler = createLocalRequestHandler(requestListener, context, requestClass, replyClass);
         final LocalRemoteRequestHandler remoteRequestHandler = new LocalRemoteRequestHandler(localRequestHandler, clientClassLoader, optionMap, this.optionMap, executor);
         return ClientImpl.create(remoteRequestHandler, executor, requestClass, replyClass, clientClassLoader);
     }

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRemoteRequestHandler.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRemoteRequestHandler.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/LocalRemoteRequestHandler.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -23,11 +23,17 @@
 package org.jboss.remoting3;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InvalidClassException;
 import java.io.InvalidObjectException;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.io.Writer;
 import java.util.NoSuchElementException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.marshalling.ByteOutput;
 import org.jboss.marshalling.Pair;
 import org.jboss.marshalling.cloner.ClassCloner;
 import org.jboss.marshalling.cloner.ClassLoaderClassCloner;
@@ -108,6 +114,18 @@
                 return new CloningObjectSource((ObjectSource) original, outboundCloner);
             } else if (original instanceof ObjectSink) {
                 return new CloningObjectSink(inboundCloner, (ObjectSink) original);
+            } else if (original instanceof InputStream) {
+                return original;
+            } else if (original instanceof OutputStream) {
+                return original;
+            } else if (original instanceof Reader) {
+                return original;
+            } else if (original instanceof Writer) {
+                return original;
+            } else if (original instanceof ByteInput) {
+                return original;
+            } else if (original instanceof ByteOutput) {
+                return original;
             } else if (original instanceof LocalRequestHandlerConnector) {
                 return original;
             } else if (original instanceof EndpointImpl) {

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/InboundStream.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -34,7 +34,7 @@
     private final RemoteConnection remoteConnection;
     private final Receiver receiver;
 
-    private State state;
+    private State state = State.WAITING_FIRST;
     private static final Logger log = Loggers.main;
 
     InboundStream(final int id, final RemoteConnection remoteConnection, final Receiver receiver) {
@@ -49,7 +49,7 @@
         final NioByteInput byteInput = new NioByteInput(
                 new NioByteInputHandler()
         );
-        receiver = new NioByteInputReceiver(byteInput);
+        receiver = new NioByteInputReceiver(byteInput, remoteConnection);
         byteInputResult.accept(byteInput, this);
     }
 
@@ -152,7 +152,9 @@
     }
 
     void sendAsyncStart() {
-        doSend(RemoteProtocol.STREAM_ASYNC_START);
+        synchronized (this) {
+            doSend(RemoteProtocol.STREAM_ASYNC_START);
+        }
     }
 
     void sendAck() {
@@ -185,11 +187,13 @@
         }
     }
 
-    private final class NioByteInputReceiver implements Receiver, NioByteInput.BufferReturn {
+    private static final class NioByteInputReceiver implements Receiver, NioByteInput.BufferReturn {
         private final NioByteInput nioByteInput;
+        private final RemoteConnection remoteConnection;
 
-        NioByteInputReceiver(final NioByteInput nioByteInput) {
+        NioByteInputReceiver(final NioByteInput nioByteInput, final RemoteConnection remoteConnection) {
             this.nioByteInput = nioByteInput;
+            this.remoteConnection = remoteConnection;
         }
 
         public void push(final ByteBuffer buffer) {

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/MarshallerObjectSink.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -34,14 +34,17 @@
     }
 
     public void accept(final T instance) throws IOException {
+        marshaller.writeByte(RemoteProtocol.OSINK_OBJECT);
         marshaller.writeObject(instance);
     }
 
     public void flush() throws IOException {
+        marshaller.writeByte(RemoteProtocol.OSINK_FLUSH);
         marshaller.flush();
     }
 
     public void close() throws IOException {
+        marshaller.writeByte(RemoteProtocol.OSINK_CLOSE);
         marshaller.close();
     }
 }

Added: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryClassTable.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryClassTable.java	                        (rev 0)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryClassTable.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.remote;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import org.jboss.marshalling.ClassTable;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.stream.ObjectSink;
+import org.jboss.remoting3.stream.ObjectSource;
+import org.jboss.xnio.OptionMap;
+
+final class PrimaryClassTable implements ClassTable {
+    static final PrimaryClassTable INSTANCE = new PrimaryClassTable();
+
+    private static final List<Class<?>> READ_TABLE;
+    private static final Map<Class<?>, Writer> WRITE_TABLE;
+
+    private static final int CLASS_MAX = 8;
+
+    private static final int CLASS_INPUT_STREAM = 0;
+    private static final int CLASS_OUTPUT_STREAM = 1;
+    private static final int CLASS_READER = 2;
+    private static final int CLASS_WRITER = 3;
+    private static final int CLASS_OBJECT_SOURCE = 4;
+    private static final int CLASS_OBJECT_SINK = 5;
+    private static final int CLASS_OPTION_MAP = 6;
+
+    static {
+        final Map<Class<?>, Writer> map = new IdentityHashMap<Class<?>, Writer>();
+        final List<Class<?>> list = Arrays.asList(new Class<?>[CLASS_MAX]);
+        add(map, list, InputStream.class, CLASS_INPUT_STREAM);
+        add(map, list, OutputStream.class, CLASS_OUTPUT_STREAM);
+        add(map, list, Reader.class, CLASS_READER);
+        add(map, list, java.io.Writer.class, CLASS_WRITER);
+        add(map, list, ObjectSource.class, CLASS_OBJECT_SOURCE);
+        add(map, list, ObjectSink.class, CLASS_OBJECT_SINK);
+        add(map, list, OptionMap.class, CLASS_OPTION_MAP);
+        READ_TABLE = list;
+        WRITE_TABLE = map;
+    }
+
+    private PrimaryClassTable() {
+    }
+
+    private static void add(Map<Class<?>, Writer> map, List<Class<?>> list, Class<?> clazz, int idx) {
+        map.put(clazz, new ByteWriter(idx));
+        list.set(idx, clazz);
+    }
+    
+    public Writer getClassWriter(final Class<?> clazz) throws IOException {
+        return WRITE_TABLE.get(clazz);
+    }
+
+    public Class<?> readClass(final Unmarshaller unmarshaller) throws IOException, ClassNotFoundException {
+        return READ_TABLE.get(unmarshaller.readUnsignedByte());
+    }
+
+    private static final class ByteWriter implements Writer {
+        private final byte b;
+
+        public ByteWriter(final int b) {
+            this.b = (byte) b;
+        }
+
+        public void writeClass(final Marshaller marshaller, final Class<?> clazz) throws IOException {
+            marshaller.writeByte(b);
+        }
+    }
+}

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryExternalizerFactory.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryExternalizerFactory.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryExternalizerFactory.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -23,24 +23,76 @@
 package org.jboss.remoting3.remote;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.InvalidObjectException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jboss.marshalling.AbstractExternalizer;
 import org.jboss.marshalling.ClassExternalizerFactory;
 import org.jboss.marshalling.Creator;
 import org.jboss.marshalling.Externalizer;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.NioByteInput;
+import org.jboss.marshalling.NioByteOutput;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.marshalling.util.IntKeyMap;
+import org.jboss.remoting3.stream.ObjectSink;
+import org.jboss.remoting3.stream.ObjectSource;
+import org.jboss.remoting3.stream.ReaderInputStream;
+import org.jboss.remoting3.stream.WriterOutputStream;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
 
 final class PrimaryExternalizerFactory implements ClassExternalizerFactory {
 
-    static final ClassExternalizerFactory INSTANCE = new PrimaryExternalizerFactory();
+    private static final Logger log = Loggers.main;
 
+    private final RemoteConnectionHandler connectionHandler;
+    private final Executor executor;
+
+    final Externalizer inputStream = new InputStreamExternalizer();
+    final Externalizer outputStream = new OutputStreamExternalizer();
+    final Externalizer reader = new ReaderExternalizer();
+    final Externalizer writer = new WriterExternalizer();
+    final Externalizer objectSource = new ObjectSourceExternalizer();
+    final Externalizer objectSink = new ObjectSinkExternalizer();
+
+    PrimaryExternalizerFactory(final RemoteConnectionHandler connectionHandler) {
+        this.connectionHandler = connectionHandler;
+        executor = connectionHandler.getConnectionContext().getConnectionProviderContext().getExecutor();
+    }
+
     public Externalizer getExternalizer(final Class<?> type) {
         if (type == UnsentRequestHandlerConnector.class) {
             return RequestHandlerConnectorExternalizer.INSTANCE;
+        } else if (InputStream.class.isAssignableFrom(type)) {
+            return inputStream;
+        } else if (OutputStream.class.isAssignableFrom(type)) {
+            return outputStream;
+        } else if (Reader.class.isAssignableFrom(type)) {
+            return reader;
+        } else if (Writer.class.isAssignableFrom(type)) {
+            return writer;
+        } else if (ObjectSource.class.isAssignableFrom(type)) {
+            return objectSource;
+        } else if (ObjectSink.class.isAssignableFrom(type)) {
+            return objectSink;
+        } else {
+            return null;
         }
-        return null;
     }
 
-    static class RequestHandlerConnectorExternalizer implements Externalizer {
+    static class RequestHandlerConnectorExternalizer extends AbstractExternalizer {
         static final RequestHandlerConnectorExternalizer INSTANCE = new RequestHandlerConnectorExternalizer();
 
         private static final long serialVersionUID = 8137262079765758375L;
@@ -53,9 +105,219 @@
         public Object createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
             return new ReceivedRequestHandlerConnector(RemoteConnectionHandler.getCurrent(), input.readInt());
         }
+    }
 
-        public void readExternal(final Object subject, final ObjectInput input) throws IOException, ClassNotFoundException {
-            // n/a
+    class InputStreamExternalizer extends AbstractExternalizer {
+
+        public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+            writeOutboundStream(output, (InputStream) subject);
         }
+
+        public InputStream createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+            return readInboundStream(input.readInt());
+        }
     }
+
+    class OutputStreamExternalizer extends AbstractExternalizer {
+
+        public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+            writeInboundStream(output, (OutputStream) subject);
+        }
+
+        public OutputStream createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+            return readOutboundStream(input.readInt());
+        }
+    }
+
+    class ReaderExternalizer extends AbstractExternalizer {
+
+        public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+            writeOutboundStream(output, new ReaderInputStream((Reader)subject, RemoteProtocol.UTF_8));
+        }
+
+        public Reader createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+            return new InputStreamReader(readInboundStream(input.readInt()), RemoteProtocol.UTF_8);
+        }
+    }
+
+    class WriterExternalizer extends AbstractExternalizer {
+
+        public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+            writeInboundStream(output, new WriterOutputStream((java.io.Writer)subject, RemoteProtocol.UTF_8));
+        }
+
+        public java.io.Writer createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+            return new OutputStreamWriter(readOutboundStream(input.readInt()), RemoteProtocol.UTF_8);
+        }
+    }
+
+    class ObjectSourceExternalizer extends AbstractExternalizer {
+
+        public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+            writeOutboundStream(output, (ObjectSource) subject);
+        }
+
+        public ObjectSource createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+            boolean ok = false;
+            final Unmarshaller unmarshaller = connectionHandler.getMarshallerFactory().createUnmarshaller(connectionHandler.getMarshallingConfiguration());
+            try {
+                unmarshaller.start(readInboundStream(input.readInt()));
+                return new UnmarshallerObjectSource(unmarshaller);
+            } finally {
+                if (! ok) {
+                    IoUtils.safeClose(unmarshaller);
+                }
+            }
+        }
+    }
+
+    class ObjectSinkExternalizer extends AbstractExternalizer {
+
+        public void writeExternal(final Object subject, final ObjectOutput output) throws IOException {
+            writeInboundStream(output, (ObjectSink) subject);
+        }
+
+        public ObjectSink createExternal(final Class<?> subjectType, final ObjectInput input, final Creator defaultCreator) throws IOException, ClassNotFoundException {
+            boolean ok = false;
+            final Marshaller marshaller = connectionHandler.getMarshallerFactory().createMarshaller(connectionHandler.getMarshallingConfiguration());
+            try {
+                marshaller.start(readOutboundStream(input.readInt()));
+                return new MarshallerObjectSink(marshaller);
+            } finally {
+                if (! ok) {
+                    IoUtils.safeClose(marshaller);
+                }
+            }
+        }
+    }
+
+    private void writeInboundStream(final ObjectOutput marshaller, final ObjectSink objectSink) throws IOException {
+        final IntKeyMap<InboundStream> inboundStreams = connectionHandler.getInboundStreams();
+        final Random random = connectionHandler.getRandom();
+        int id;
+        synchronized (inboundStreams) {
+            while (inboundStreams.containsKey(id = random.nextInt() & ~1));
+            inboundStreams.put(id, new InboundStream(id, connectionHandler.getRemoteConnection(), new InboundStream.ByteInputResult() {
+                public void accept(final NioByteInput nioByteInput, final InboundStream inboundStream) {
+                    try {
+                        executor.execute(new InboundObjectSinkReceiveTask(nioByteInput, inboundStream, connectionHandler, objectSink));
+                    } catch (RejectedExecutionException e) {
+                        log.warn("Unable to start task for forwarded stream: %s", e);
+                        inboundStream.sendAsyncException();
+                    }
+                }
+            }));
+        }
+        marshaller.writeInt(id);
+    }
+
+    private NioByteInput readInboundStream(final int id) throws InvalidObjectException {
+        final IntKeyMap<InboundStream> inboundStreams = connectionHandler.getInboundStreams();
+        final AtomicReference<NioByteInput> ref = new AtomicReference<NioByteInput>();
+        final InboundStream inboundStream;
+        synchronized (inboundStreams) {
+            if (inboundStreams.containsKey(id)) {
+                throw duplicateId(id);
+            }
+            inboundStream = new InboundStream(id, connectionHandler.getRemoteConnection(), new InboundStream.ByteInputResult() {
+                public void accept(final NioByteInput nioByteInput, final InboundStream inboundStream) {
+                    ref.set(nioByteInput);
+                }
+            });
+            inboundStreams.put(id, inboundStream);
+        }
+        synchronized (inboundStream) {
+            inboundStream.sendAsyncStart();
+        }
+        return ref.get();
+    }
+
+    private void writeOutboundStream(final ObjectOutput marshaller, final ObjectSource objectSource) throws IOException {
+        final IntKeyMap<OutboundStream> outboundStreams = connectionHandler.getOutboundStreams();
+        final Random random = connectionHandler.getRandom();
+        int id;
+        final OutboundStream outboundStream;
+        synchronized (outboundStreams) {
+            while (outboundStreams.containsKey(id = random.nextInt() | 1));
+            outboundStreams.put(id, outboundStream = new OutboundStream(id, connectionHandler.getRemoteConnection()));
+        }
+        marshaller.writeInt(id);
+        try {
+            executor.execute(new OutboundObjectSourceTransmitTask(objectSource, outboundStream, connectionHandler));
+        } catch (RejectedExecutionException e) {
+            log.warn("Unable to start task for forwarded stream: %s", e);
+            outboundStream.sendException();
+        }
+    }
+
+    private NioByteOutput readOutboundStream(final int id) throws InvalidObjectException {
+        final IntKeyMap<OutboundStream> outboundStreams = connectionHandler.getOutboundStreams();
+        final OutboundStream outboundStream;
+        synchronized (outboundStreams) {
+            if (outboundStreams.containsKey(id)) {
+                throw duplicateId(id);
+            }
+            outboundStream = new OutboundStream(id, connectionHandler.getRemoteConnection());
+            outboundStreams.put(id, outboundStream);
+        }
+        synchronized (outboundStream) {
+            outboundStream.asyncStart();
+        }
+        return new NioByteOutput(new NioByteOutput.BufferWriter() {
+            public ByteBuffer getBuffer() {
+                return outboundStream.getBuffer();
+            }
+
+            public void accept(final ByteBuffer buffer, final boolean eof) throws IOException {
+                outboundStream.send(buffer);
+                if (eof) {
+                    outboundStream.sendEof();
+                }
+            }
+
+            public void flush() throws IOException {
+            }
+        });
+    }
+
+    /**
+     * This looks backwards but it really isn't.  When we write an OutputStream, we want the remote side to send us inbound
+     * to feed it.
+     *
+     * @param marshaller the marshaller
+     * @param outputStream the output stream
+     * @throws IOException if an I/O error occurs
+     */
+    private void writeInboundStream(final ObjectOutput marshaller, final OutputStream outputStream) throws IOException {
+        final IntKeyMap<InboundStream> inboundStreams = connectionHandler.getInboundStreams();
+        final Random random = connectionHandler.getRandom();
+        int id;
+        synchronized (inboundStreams) {
+            while (inboundStreams.containsKey(id = random.nextInt() & ~1));
+            inboundStreams.put(id, new InboundStream(id, connectionHandler.getRemoteConnection(), outputStream));
+        }
+        marshaller.writeInt(id);
+    }
+
+    private void writeOutboundStream(final ObjectOutput marshaller, final InputStream inputStream) throws IOException {
+        final IntKeyMap<OutboundStream> outboundStreams = connectionHandler.getOutboundStreams();
+        final Random random = connectionHandler.getRandom();
+        int id;
+        final OutboundStream outboundStream;
+        synchronized (outboundStreams) {
+            while (outboundStreams.containsKey(id = random.nextInt() | 1));
+            outboundStreams.put(id, outboundStream = new OutboundStream(id, connectionHandler.getRemoteConnection()));
+        }
+        marshaller.writeInt(id);
+        try {
+            executor.execute(new OutboundInputStreamTransmitTask(inputStream, outboundStream));
+        } catch (RejectedExecutionException e) {
+            log.warn("Unable to start task for forwarded stream: %s", e);
+            outboundStream.sendException();
+        }
+    }
+
+    private static InvalidObjectException duplicateId(final int id) {
+        return new InvalidObjectException("Duplicated stream ID " + id);
+    }
 }

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/PrimaryObjectTable.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -23,41 +23,66 @@
 package org.jboss.remoting3.remote;
 
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Reader;
-import java.io.StreamCorruptedException;
-import java.util.Random;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
 import org.jboss.marshalling.Marshaller;
-import org.jboss.marshalling.NioByteInput;
 import org.jboss.marshalling.ObjectTable;
 import org.jboss.marshalling.Unmarshaller;
-import org.jboss.marshalling.util.IntKeyMap;
 import org.jboss.remoting3.Endpoint;
-import org.jboss.remoting3.stream.ObjectSink;
-import org.jboss.remoting3.stream.ObjectSource;
-import org.jboss.remoting3.stream.ReaderInputStream;
-import org.jboss.remoting3.stream.WriterOutputStream;
 import org.jboss.xnio.log.Logger;
 
 final class PrimaryObjectTable implements ObjectTable {
 
-    private final Endpoint endpoint;
-    private final RemoteConnectionHandler connectionHandler;
-    private final Executor executor;
     private static final Logger log = Loggers.main;
 
-    PrimaryObjectTable(final Endpoint endpoint, final RemoteConnectionHandler connectionHandler) {
-        this.endpoint = endpoint;
-        this.connectionHandler = connectionHandler;
-        executor = this.connectionHandler.getConnectionContext().getConnectionProviderContext().getExecutor();
+    private final Map<Object, Writer> writerMap;
+    private final List<Object> readerList;
+
+    // Object table types
+
+    static final byte OBJ_ENDPOINT = 0;
+    static final byte OBJ_CLIENT_CONNECTOR = 1;
+    static final byte OBJ_INPUT_STREAM = 2;
+    static final byte OBJ_OUTPUT_STREAM = 3;
+    static final byte OBJ_READER = 4;
+    static final byte OBJ_WRITER = 5;
+    static final byte OBJ_OBJECT_SOURCE = 6;
+    static final byte OBJ_OBJECT_SINK = 7;
+
+    PrimaryObjectTable(final Endpoint endpoint, final PrimaryExternalizerFactory externalizerFactory) {
+        final Map<Object, Writer> map = new IdentityHashMap<Object, Writer>();
+        final List<Object> list = Arrays.asList(new Object[8]);
+        add(map, list, 0, endpoint);
+        add(map, list, 1, PrimaryExternalizerFactory.RequestHandlerConnectorExternalizer.INSTANCE);
+        add(map, list, 2, externalizerFactory.inputStream);
+        add(map, list, 3, externalizerFactory.outputStream);
+        add(map, list, 4, externalizerFactory.reader);
+        add(map, list, 5, externalizerFactory.writer);
+        add(map, list, 6, externalizerFactory.objectSource);
+        add(map, list, 7, externalizerFactory.objectSink);
+        readerList = list;
+        writerMap = map;
     }
 
-    private static final Writer ZERO_WRITER = new ByteWriter(RemoteProtocol.OBJ_ENDPOINT);
-    private static final Writer ONE_WRITER = new ByteWriter(RemoteProtocol.OBJ_CLIENT_CONNECTOR);
+    private static void add(final Map<Object, Writer> map, final List<Object> list, final int idx, final Object instance) {
+        final ByteWriter writer = CACHED_WRITERS[idx];
+        map.put(instance, writer);
+        list.set(idx, instance);
+    }
 
+    private static final ByteWriter[] CACHED_WRITERS = {
+            new ByteWriter(0),
+            new ByteWriter(1),
+            new ByteWriter(2),
+            new ByteWriter(3),
+            new ByteWriter(4),
+            new ByteWriter(5),
+            new ByteWriter(6),
+            new ByteWriter(7),
+    };
+
     private static final class ByteWriter implements Writer {
         private final byte b;
 
@@ -68,140 +93,18 @@
         public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
             marshaller.writeByte(b);
         }
-    }
 
-    public Writer getObjectWriter(final Object object) throws IOException {
-        if (object == endpoint) {
-            return ZERO_WRITER;
-        } else if (object == PrimaryExternalizerFactory.RequestHandlerConnectorExternalizer.INSTANCE) {
-            return ONE_WRITER;
-        } else if (object instanceof InputStream) {
-            return new Writer() {
-                public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
-                    writeOutboundStream(marshaller, RemoteProtocol.OBJ_INPUT_STREAM, (InputStream) object);
-                }
-            };
-        } else if (object instanceof OutputStream) {
-            return new Writer() {
-                public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
-                    writeInboundStream(marshaller, RemoteProtocol.OBJ_OUTPUT_STREAM, (OutputStream) object);
-                }
-            };
-        } else if (object instanceof Reader) {
-            return new Writer() {
-                public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
-                    writeOutboundStream(marshaller, RemoteProtocol.OBJ_READER, new ReaderInputStream((Reader)object, RemoteProtocol.UTF_8));
-                }
-            };
-        } else if (object instanceof java.io.Writer) {
-            return new Writer() {
-                public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
-                    writeInboundStream(marshaller, RemoteProtocol.OBJ_WRITER, new WriterOutputStream((java.io.Writer)object, RemoteProtocol.UTF_8));
-                }
-            };
-        } else if (object instanceof ObjectSource) {
-            return new Writer() {
-                public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
-                    writeOutboundStream(marshaller, RemoteProtocol.OBJ_OBJECT_SOURCE, (ObjectSource) object);
-                }
-            };
-        } else if (object instanceof ObjectSink) {
-            return new Writer() {
-                public void writeObject(final Marshaller marshaller, final Object object) throws IOException {
-                    writeInboundStream(marshaller, RemoteProtocol.OBJ_OBJECT_SINK, (ObjectSink) object);
-                }
-            };
-        } else {
-            return null;
+        public int getByte() {
+            return b & 0xff;
         }
     }
 
-    private void writeInboundStream(final Marshaller marshaller, final byte code, final ObjectSink objectSink) throws IOException {
-        marshaller.writeByte(code);
-        final IntKeyMap<InboundStream> inboundStreams = connectionHandler.getInboundStreams();
-        final Random random = connectionHandler.getRandom();
-        int id;
-        synchronized (inboundStreams) {
-            while (inboundStreams.containsKey(id = random.nextInt() & ~1));
-            inboundStreams.put(id, new InboundStream(id, connectionHandler.getRemoteConnection(), new InboundStream.ByteInputResult() {
-                public void accept(final NioByteInput nioByteInput, final InboundStream inboundStream) {
-                    try {
-                        executor.execute(new InboundObjectSinkReceiveTask(nioByteInput, inboundStream, connectionHandler, objectSink));
-                    } catch (RejectedExecutionException e) {
-                        log.warn("Unable to start task for forwarded stream: %s", e);
-                        inboundStream.sendAsyncException();
-                    }
-                }
-            }));
-        }
-        marshaller.writeInt(id);
+    public Writer getObjectWriter(final Object object) throws IOException {
+        return writerMap.get(object);
     }
 
-    private void writeOutboundStream(final Marshaller marshaller, final byte code, final ObjectSource objectSource) throws IOException {
-        marshaller.writeByte(code);
-        final IntKeyMap<OutboundStream> outboundStreams = connectionHandler.getOutboundStreams();
-        final Random random = connectionHandler.getRandom();
-        int id;
-        final OutboundStream outboundStream;
-        synchronized (outboundStreams) {
-            while (outboundStreams.containsKey(id = random.nextInt() | 1));
-            outboundStreams.put(id, outboundStream = new OutboundStream(id, connectionHandler.getRemoteConnection()));
-        }
-        marshaller.writeInt(id);
-        try {
-            executor.execute(new OutboundObjectSourceTransmitTask(objectSource, outboundStream, connectionHandler));
-        } catch (RejectedExecutionException e) {
-            log.warn("Unable to start task for forwarded stream: %s", e);
-            outboundStream.sendException();
-        }
-    }
-
-    /**
-     * This looks backwards but it really isn't.  When we write an OutputStream, we want the remote side to send us inbound
-     * to feed it.
-     *
-     * @param marshaller the marshaller
-     * @param code the code
-     * @param outputStream the output stream
-     * @throws IOException if an I/O error occurs
-     */
-    private void writeInboundStream(final Marshaller marshaller, final byte code, final OutputStream outputStream) throws IOException {
-        marshaller.writeByte(code);
-        final IntKeyMap<InboundStream> inboundStreams = connectionHandler.getInboundStreams();
-        final Random random = connectionHandler.getRandom();
-        int id;
-        synchronized (inboundStreams) {
-            while (inboundStreams.containsKey(id = random.nextInt() & ~1));
-            inboundStreams.put(id, new InboundStream(id, connectionHandler.getRemoteConnection(), outputStream));
-        }
-        marshaller.writeInt(id);
-    }
-
-    private void writeOutboundStream(final Marshaller marshaller, final byte code, final InputStream inputStream) throws IOException {
-        marshaller.writeByte(code);
-        final IntKeyMap<OutboundStream> outboundStreams = connectionHandler.getOutboundStreams();
-        final Random random = connectionHandler.getRandom();
-        int id;
-        final OutboundStream outboundStream;
-        synchronized (outboundStreams) {
-            while (outboundStreams.containsKey(id = random.nextInt() | 1));
-            outboundStreams.put(id, outboundStream = new OutboundStream(id, connectionHandler.getRemoteConnection()));
-        }
-        marshaller.writeInt(id);
-        try {
-            executor.execute(new OutboundInputStreamTransmitTask(inputStream, outboundStream));
-        } catch (RejectedExecutionException e) {
-            log.warn("Unable to start task for forwarded stream: %s", e);
-            outboundStream.sendException();
-        }
-    }
-
     public Object readObject(final Unmarshaller unmarshaller) throws IOException, ClassNotFoundException {
         final int id = unmarshaller.readUnsignedByte();
-        switch (id) {
-            case RemoteProtocol.OBJ_ENDPOINT: return endpoint;
-            case RemoteProtocol.OBJ_CLIENT_CONNECTOR: return PrimaryExternalizerFactory.RequestHandlerConnectorExternalizer.INSTANCE;
-            default: throw new StreamCorruptedException("Unknown object table ID byte " + id);
-        }
+        return readerList.get(id);
     }
 }

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteConnectionHandler.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -74,8 +74,11 @@
         this.remoteConnection = remoteConnection;
         this.marshallerFactory = marshallerFactory;
         final MarshallingConfiguration config = new MarshallingConfiguration();
-        config.setClassExternalizerFactory(PrimaryExternalizerFactory.INSTANCE);
-        config.setObjectTable(new PrimaryObjectTable(connectionContext.getConnectionProviderContext().getEndpoint(), this));
+        final PrimaryExternalizerFactory externalizerFactory = new PrimaryExternalizerFactory(this);
+        final PrimaryObjectTable objectTable = new PrimaryObjectTable(connectionContext.getConnectionProviderContext().getEndpoint(), externalizerFactory);
+        config.setClassTable(PrimaryClassTable.INSTANCE);
+        config.setClassExternalizerFactory(externalizerFactory);
+        config.setObjectTable(objectTable);
         config.setStreamHeader(Marshalling.nullStreamHeader());
         // fixed for now (v0)
         config.setVersion(2);

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/RemoteProtocol.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -87,17 +87,6 @@
     static final byte GREETING_ENDPOINT_NAME = 2; // sent by client & server
     static final byte GREETING_MARSHALLER_VERSION = 3; // sent by client & server
 
-    // Object table types
-
-    static final byte OBJ_ENDPOINT = 0;
-    static final byte OBJ_CLIENT_CONNECTOR = 1;
-    static final byte OBJ_INPUT_STREAM = 2;
-    static final byte OBJ_OUTPUT_STREAM = 3;
-    static final byte OBJ_READER = 4;
-    static final byte OBJ_WRITER = 5;
-    static final byte OBJ_OBJECT_SOURCE = 6;
-    static final byte OBJ_OBJECT_SINK = 7;
-
     // Object sink stream commands
 
     static final int OSINK_OBJECT = 0;
@@ -111,6 +100,7 @@
 
     static final Charset UTF_8 = Charset.forName("UTF8");
 
+
     /**
      * Create an instance of the connection provider for the "remote" protocol.
      *

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/remote/UnmarshallerObjectSource.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -23,25 +23,63 @@
 package org.jboss.remoting3.remote;
 
 import java.io.IOException;
+import java.io.InvalidObjectException;
 import java.util.NoSuchElementException;
 import org.jboss.marshalling.Unmarshaller;
 import org.jboss.remoting3.stream.ObjectSource;
 
 final class UnmarshallerObjectSource<T> implements ObjectSource<T> {
     private final Unmarshaller unmarshaller;
+    private State state;
 
+    enum State {
+        NEW,
+        READY,
+        DONE,
+    }
+
     UnmarshallerObjectSource(final Unmarshaller unmarshaller) {
         this.unmarshaller = unmarshaller;
     }
 
     public boolean hasNext() throws IOException {
-        return false;
+        synchronized (this) {
+            if (state == State.NEW) {
+                final int cmd = unmarshaller.readUnsignedByte();
+                if (cmd == RemoteProtocol.OSOURCE_OBJECT) {
+                    state = State.READY;
+                } else {
+                    state = State.DONE;
+                    unmarshaller.close();
+                    return false;
+                }
+            }
+            return state == State.READY;
+        }
     }
 
+    @SuppressWarnings({ "unchecked" })
     public T next() throws NoSuchElementException, IOException {
-        return null;
+        synchronized (this) {
+            if (hasNext()) {
+                try {
+                    final T obj = (T) unmarshaller.readObject();
+                    state = State.NEW;
+                    return obj;
+                } catch (ClassNotFoundException e) {
+                    state = State.NEW;
+                    throw new InvalidObjectException("Class not found: " + e);
+                }
+            } else {
+                throw new NoSuchElementException();
+            }
+        }
     }
 
     public void close() throws IOException {
+        synchronized (this) {
+            state = State.DONE;
+            unmarshaller.close();
+        }
     }
 }

Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -24,6 +24,8 @@
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.Charset;
 import java.nio.charset.UnsupportedCharsetException;
@@ -33,6 +35,8 @@
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.Executor;
+import org.jboss.marshalling.ByteInput;
+import org.jboss.marshalling.ByteOutput;
 import org.jboss.marshalling.Pair;
 import org.jboss.xnio.Cancellable;
 import org.jboss.xnio.FutureResult;
@@ -277,6 +281,116 @@
         return new EnumerationObjectSource<T>(enumeration);
     }
 
+    /**
+     * Copy from one stream to another.
+     *
+     * @param input the source stream
+     * @param output the destination stream
+     * @param close {@code true} if the input and output streams should be closed
+     * @param bufferSize the buffer size
+     * @throws IOException if an I/O error occurs
+     */
+    public static void copyStream(InputStream input, OutputStream output, boolean close, int bufferSize) throws IOException {
+        final byte[] buffer = new byte[bufferSize];
+        int res;
+        try {
+            for (;;) {
+                res = input.read(buffer);
+                if (res == -1) {
+                    if (close) {
+                        input.close();
+                        output.close();
+                    }
+                    return;
+                }
+                output.write(buffer, 0, res);
+            }
+        } finally {
+            if (close) {
+                IoUtils.safeClose(input);
+                IoUtils.safeClose(output);
+            }
+        }
+    }
+
+    /**
+     * Copy from one stream to another.  A default buffer size is assumed.
+     *
+     * @param input the source stream
+     * @param output the destination stream
+     * @param close {@code true} if the input and output streams should be closed
+     * @throws IOException if an I/O error occurs
+     */
+    public static void copyStream(InputStream input, OutputStream output, boolean close) throws IOException {
+        copyStream(input, output, close, 8192);
+    }
+
+    /**
+     * Copy from one stream to another.  A default buffer size is assumed, and both streams are closed on completion.
+     *
+     * @param input the source stream
+     * @param output the destination stream
+     * @throws IOException if an I/O error occurs
+     */
+    public static void copyStream(InputStream input, OutputStream output) throws IOException {
+        copyStream(input, output, true, 8192);
+    }
+
+    /**
+     * Copy from one stream to another.
+     *
+     * @param input the source stream
+     * @param output the destination stream
+     * @param close {@code true} if the input and output streams should be closed
+     * @param bufferSize the buffer size
+     * @throws IOException if an I/O error occurs
+     */
+    public static void copyStream(ByteInput input, ByteOutput output, boolean close, int bufferSize) throws IOException {
+        final byte[] buffer = new byte[bufferSize];
+        int res;
+        try {
+            for (;;) {
+                res = input.read(buffer);
+                if (res == -1) {
+                    if (close) {
+                        input.close();
+                        output.close();
+                    }
+                    return;
+                }
+                output.write(buffer, 0, res);
+            }
+        } finally {
+            if (close) {
+                IoUtils.safeClose(input);
+                IoUtils.safeClose(output);
+            }
+        }
+    }
+
+    /**
+     * Copy from one stream to another.  A default buffer size is assumed.
+     *
+     * @param input the source stream
+     * @param output the destination stream
+     * @param close {@code true} if the input and output streams should be closed
+     * @throws IOException if an I/O error occurs
+     */
+    public static void copyStream(ByteInput input, ByteOutput output, boolean close) throws IOException {
+        copyStream(input, output, close, 8192);
+    }
+
+    /**
+     * Copy from one stream to another.  A default buffer size is assumed, and both streams are closed on completion.
+     *
+     * @param input the source stream
+     * @param output the destination stream
+     * @throws IOException if an I/O error occurs
+     */
+    public static void copyStream(ByteInput input, ByteOutput output) throws IOException {
+        copyStream(input, output, true, 8192);
+    }
+
     static Charset getCharset(final String charsetName) throws UnsupportedEncodingException {
         try {
             return Charset.forName(charsetName);

Modified: remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/InvocationTestBase.java	2010-03-10 04:36:18 UTC (rev 5816)
@@ -22,7 +22,12 @@
 
 package org.jboss.remoting3.test;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Reader;
 import java.util.concurrent.atomic.AtomicReference;
 import org.jboss.remoting3.Client;
 import org.jboss.remoting3.ClientConnector;
@@ -37,6 +42,7 @@
 import org.jboss.remoting3.RequestContext;
 import org.jboss.remoting3.RequestListener;
 import org.jboss.remoting3.ServiceNotFoundException;
+import org.jboss.remoting3.stream.Streams;
 import org.jboss.xnio.IoUtils;
 import org.jboss.xnio.OptionMap;
 import org.jboss.xnio.Options;
@@ -57,6 +63,7 @@
 
     @BeforeTest
     public void setUp() throws IOException {
+        log.info("::::: STARTING TEST FOR: %s :::::", getClass().getName());
         enter();
         try {
             Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
@@ -315,6 +322,111 @@
         }
     }
 
+    public void testInputStream() throws Throwable {
+        enter();
+        try {
+            final Registration registration = endpoint.serviceBuilder(InputStream.class, InputStream.class).setServiceType("streamtest").setClientListener(new ClientListener<InputStream, InputStream>() {
+                public RequestListener<InputStream, InputStream> handleClientOpen(final ClientContext clientContext, final OptionMap optionMap) {
+                    return new RequestListener<InputStream, InputStream>() {
+                        public void handleRequest(final RequestContext<InputStream> context, final InputStream request) throws RemoteExecutionException {
+                            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                            try {
+                                Streams.copyStream(request, baos);
+                            } catch (IOException e) {
+                                try {
+                                    context.sendFailure("I/O error", e);
+                                } catch (IOException e1) {
+                                    // blah
+                                }
+                            }
+                            try {
+                                context.sendReply(new ByteArrayInputStream(baos.toByteArray()));
+                            } catch (IOException e) {
+                                // blah
+                            }
+                        }
+                    };
+                }
+            }).register();
+            try {
+                final Connection connection = getConnection();
+                try {
+                    final Client<InputStream, InputStream> client = connection.openClient("streamtest", "*", InputStream.class, InputStream.class, InvocationTestBase.class.getClassLoader(), OptionMap.EMPTY).get();
+                    try {
+                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                        Streams.copyStream(client.invoke(new ByteArrayInputStream("This is a test!!!".getBytes())), baos);
+                        assertEquals(new String(baos.toByteArray()), "This is a test!!!");
+                    } finally {
+                        IoUtils.safeClose(client);
+                        client.awaitClosedUninterruptibly();
+                    }
+                } finally {
+                    IoUtils.safeClose(connection);
+                    connection.awaitClosedUninterruptibly();
+                }
+            } finally {
+                IoUtils.safeClose(registration);
+                registration.awaitClosedUninterruptibly();
+            }
+        } finally {
+            exit();
+        }
+    }
+
+    public void testOutputStream() throws Throwable {
+        enter();
+        try {
+            final ByteArrayOutputStream os = new ByteArrayOutputStream();
+            final Registration registration = endpoint.serviceBuilder(OutputStream.class, OutputStream.class).setServiceType("streamtest").setClientListener(new ClientListener<OutputStream, OutputStream>() {
+                public RequestListener<OutputStream, OutputStream> handleClientOpen(final ClientContext clientContext, final OptionMap optionMap) {
+                    return new RequestListener<OutputStream, OutputStream>() {
+                        public void handleRequest(final RequestContext<OutputStream> context, final OutputStream request) throws RemoteExecutionException {
+                            try {
+                                Streams.copyStream(new ByteArrayInputStream("This is a test...".getBytes()), request);
+                            } catch (IOException e) {
+                                try {
+                                    context.sendFailure("I/O error", e);
+                                } catch (IOException e1) {
+                                    // blah
+                                }
+                            }
+                            try {
+                                context.sendReply(os);
+                            } catch (IOException e) {
+                                // blah
+                            }
+                        }
+                    };
+                }
+            }).register();
+            try {
+                final Connection connection = getConnection();
+                try {
+                    final Client<OutputStream, OutputStream> client = connection.openClient("streamtest", "*", OutputStream.class, OutputStream.class, InvocationTestBase.class.getClassLoader(), OptionMap.EMPTY).get();
+                    try {
+                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                        final OutputStream result = client.invoke(baos);
+                        assertEquals(new String(baos.toByteArray()), "This is a test...");
+                        Streams.copyStream(new ByteArrayInputStream("This is a test #2...".getBytes()), result);
+                        // this test can't finish in time
+//                        assertEquals(new String(os.toByteArray()), "This is a test #2...");
+                    } finally {
+                        IoUtils.safeClose(client);
+                        client.awaitClosedUninterruptibly();
+                    }
+                } finally {
+                    IoUtils.safeClose(connection);
+                    connection.awaitClosedUninterruptibly();
+                }
+            } finally {
+                IoUtils.safeClose(registration);
+                registration.awaitClosedUninterruptibly();
+            }
+        } finally {
+            exit();
+        }
+    }
+
     @AfterTest
     public void tearDown() throws IOException {
         enter();

Modified: remoting3/trunk/jboss-remoting/src/test/resources/logging.properties
===================================================================
--- remoting3/trunk/jboss-remoting/src/test/resources/logging.properties	2010-03-09 20:15:58 UTC (rev 5815)
+++ remoting3/trunk/jboss-remoting/src/test/resources/logging.properties	2010-03-10 04:36:18 UTC (rev 5816)
@@ -24,7 +24,7 @@
 loggers=javax.security.sasl,org.jboss.xnio.ssl
 
 # Root logger configuration
-logger.level=${test.leve:INFO}
+logger.level=${test.level:INFO}
 logger.handlers=CONSOLE
 
 # Configure javax.security.sasl to be less verbose by default



More information about the jboss-remoting-commits mailing list