[jboss-remoting-commits] JBoss Remoting SVN: r4566 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi/stream and 5 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Thu Sep 11 15:54:37 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-09-11 15:54:37 -0400 (Thu, 11 Sep 2008)
New Revision: 4566

Added:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java
   remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutor.java
   remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutorFactory.java
Removed:
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializerFactory.java
   remoting3/trunk/build.properties
   remoting3/trunk/build.xml
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
Log:
Initial switch towards new marshalling to replace old

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamContext.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.stream;
+
+import java.util.concurrent.Executor;
+import org.jboss.marshalling.MarshallerFactory;
+
+/**
+ * A context for stream serialization.
+ */
+public interface StreamContext {
+
+    /**
+     * Get an executor which may be used for various asynchronous tasks.
+     *
+     * @return an executor
+     */
+    Executor getExecutor();
+
+    /**
+     * Get a marshaller factory which is configured compatibly with the channel.
+     *
+     * @return the marshaller factory
+     */
+    MarshallerFactory getMarshallerFactory();
+}

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializerFactory.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializerFactory.java	2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/stream/StreamSerializerFactory.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -2,15 +2,36 @@
 
 import org.jboss.xnio.IoHandler;
 import org.jboss.xnio.ChannelSource;
-import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
 import java.io.IOException;
 import java.io.Serializable;
 
 /**
- *
+ * A factory for stream serializers.  Stream serializers are responsible for forwarding streams across the network
+ * in a manner specific to the stream type.
  */
 public interface StreamSerializerFactory extends Serializable {
-    IoHandler<? super StreamChannel> getLocalSide(Object localSide) throws IOException;
 
-    Object getRemoteSide(ChannelSource<StreamChannel> remoteClient) throws IOException;
+    /**
+     * Get the XNIO handler for the local side of the serializer.  This side will access the local instance.  The returned
+     * handler is attached to the channel that is associated with the stream instance.
+     *
+     * @param localSide the instance that is being serialized
+     * @param streamContext the stream context
+     * @return the local handler
+     * @throws IOException if an error occurs while preparing the handler
+     */
+    IoHandler<? super AllocatedMessageChannel> getLocalSide(Object localSide, StreamContext streamContext) throws IOException;
+
+    /**
+     * Get the remote proxy instance for the remote side of the serializer.  This side will emulate the streaming object
+     * on the remote side.  This method is responsible for initiating the communications channel, which the returned
+     * instance will use to transmit data.
+     *
+     * @param channelSource the channel source which is used to create the stream's channel
+     * @param streamContext the stream context
+     * @return the remote proxy instance
+     * @throws IOException if an error occurs while preparing the handler
+     */
+    Object getRemoteSide(ChannelSource<AllocatedMessageChannel> channelSource, StreamContext streamContext) throws IOException;
 }

Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties	2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/build.properties	2008-09-11 19:54:37 UTC (rev 4566)
@@ -9,7 +9,7 @@
 # Dependencies - keep in alpha order by property name
 # ===================================================
 
-lib.apiviz.version=1.0.5
+lib.apiviz.version=1.2.3.GA
 lib.apiviz.name=apiviz-${lib.apiviz.version}.jar
 lib.apiviz.license=lgpl
 lib.apiviz.dir=apiviz/${lib.apiviz.version}/lib

Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml	2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/build.xml	2008-09-11 19:54:37 UTC (rev 4566)
@@ -1300,21 +1300,25 @@
     <!-- JAVADOCS                                       -->
     <!-- ============================================== -->
 
-    <target name="api-javadoc" depends="api,core,standalone,util,lib.apiviz">
+    <target name="api-javadoc" depends="api,core,standalone,util,lib.apiviz,lib.jboss-marshalling-api,lib.xnio-api">
         <delete dir="api/target/main/docs"/>
         <mkdir dir="api/target/main/docs"/>
         <javadoc destdir="api/target/main/docs" author="false" version="false" use="false" windowtitle="JBoss Remoting API">
             <doclet name="${lib.apiviz.doclet}" path="${lib.apiviz.local}"/>
             <packageset dir="api/src/main/java"/>
+            <packageset dir="standalone/src/main/java"/>
             <packageset dir="util/src/main/java"/>
-            <packageset dir="standalone/src/main/java"/>
             <doctitle><![CDATA[<h1>JBoss Remoting 3</h1>]]></doctitle>
             <bottom><![CDATA[<i>Copyright &#169; 2008 JBoss, a division of Red Hat, Inc.</i>]]></bottom>
             <link href="http://java.sun.com/j2se/1.5.0/docs/api/"/>
             <link href="http://docs.jboss.org/xnio/1.0/api/"/>
             <classpath>
                 <path refid="core.classpath"/>
+                <path refid="api.classpath"/>
+                <path refid="standalone.classpath"/>
+                <path refid="util.classpath"/>
                 <pathelement location="${lib.xnio-api.local}"/>
+                <pathelement location="${lib.jboss-marshalling-api.local}"/>
             </classpath>
         </javadoc>
     </target>

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java	2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -18,7 +18,7 @@
 import org.jboss.cx.remoting.ClientSource;
 import org.jboss.cx.remoting.SimpleCloseable;
 import org.jboss.cx.remoting.ServiceListener;
-import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
+import org.jboss.cx.remoting.util.OrderedExecutorFactory;
 import org.jboss.cx.remoting.spi.remote.RequestHandler;
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.Handle;

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java	2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/InputStreamStreamSerializerFactory.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -3,14 +3,15 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
+import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
 import org.jboss.xnio.log.Logger;
 import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-import org.jboss.cx.remoting.core.util.DecodingBuilder;
-import org.jboss.xnio.channels.StreamChannel;
-import org.jboss.xnio.channels.StreamSinkChannel;
-import org.jboss.xnio.channels.CommonOptions;
-import org.jboss.xnio.channels.StreamSourceChannel;
+import org.jboss.cx.remoting.spi.stream.StreamContext;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.WritableMessageChannel;
+import org.jboss.xnio.channels.ReadableAllocatedMessageChannel;
 import org.jboss.xnio.IoHandler;
 import org.jboss.xnio.ChannelSource;
 import org.jboss.xnio.IoUtils;
@@ -21,9 +22,9 @@
 
 /**
  * An input stream serializer.  The input stream transfer protocol consists of two types of "chunks": data and error.
- * A data chunk starts with an ASCII {@code 'd'}, followed by a two-byte (unsigned) length field (a value of
- * {@code 0x0000} indicates a 65536-byte chunk), followed by the actual data.  An error chunk consists of a series of
- * UTF-8 bytes representing a description of the error, followed by the end of the stream.
+ * A data chunk starts with an ASCII {@code 'd'}, followed by the actual data.  An error chunk starts with an ASCII
+ * {@code 'e'} followed by a series of UTF-8 bytes representing a description of the error, followed by the end of
+ * the stream.
  *
  * Normally data chunks are transferred over the stream until the original {@link InputStream} is exhausted, at which time
  * the proxy stream will return a {@code -1} for the EOF condition.
@@ -39,13 +40,13 @@
         // no-arg constructor required
     }
 
-    public IoHandler<StreamSinkChannel> getLocalSide(final Object localSide) throws IOException {
-        return new LocalHandler((InputStream) localSide, allocator);
+    public IoHandler<? super AllocatedMessageChannel> getLocalSide(final Object localSide, final StreamContext streamContext) throws IOException {
+        return new LocalHandler((InputStream) localSide, allocator, streamContext.getExecutor());
     }
 
-    public Object getRemoteSide(final ChannelSource<StreamChannel> remoteClient) throws IOException {
-//        return new RemoteInputStream(taskList, futureChannel);
-        return null;
+    public Object getRemoteSide(final ChannelSource<AllocatedMessageChannel> remoteClient, final StreamContext streamContext) throws IOException {
+        final RemoteHandler handler = new RemoteHandler();
+        return new RemoteInputStream(remoteClient.open(handler), allocator, handler);
     }
 
     public BufferAllocator<ByteBuffer> getAllocator() {
@@ -56,345 +57,273 @@
         this.allocator = allocator;
     }
 
-    private static final byte DATA_CHUNK = (byte) 'd';
-    private static final byte ERROR = (byte) 'e';
+    private static final int DATA_CHUNK = 'd';
+    private static final int ERROR = 'e';
 
-    public static final class LocalHandler implements IoHandler<StreamSinkChannel> {
+    public static final class LocalHandler implements IoHandler<WritableMessageChannel> {
 
-        // @protectedby {@code this}
-        private final InputStream inputStream;
+        private final Object lock = new Object();
+        private final Executor executor;
         private final BufferAllocator<ByteBuffer> allocator;
-        private volatile ByteBuffer current;
-        private volatile boolean eof;
+        private final Runnable fillTask = new FillTask();
 
-        private LocalHandler(final InputStream inputStream, final BufferAllocator<ByteBuffer> allocator) {
+        // @protectedby {@code lock}
+        private WritableMessageChannel channel;
+        // @protectedby {@code lock}
+        private final InputStream inputStream;
+        // @protectedby {@code lock}
+        private ByteBuffer writing;
+        // @protectedby {@code lock}
+        private boolean eof;
+
+        private LocalHandler(final InputStream inputStream, final BufferAllocator<ByteBuffer> allocator, final Executor executor) {
             this.inputStream = inputStream;
             this.allocator = allocator;
+            this.executor = executor;
         }
 
-        private boolean fillBuffer() throws IOException {
-            final ByteBuffer buffer = allocator.allocate();
-            buffer.put(DATA_CHUNK);
-            buffer.putShort((short) 0);
-            final int cnt;
-            if (buffer.hasArray()) {
-                final byte[] a = buffer.array();
-                final int off = buffer.arrayOffset();
-                final int rem = Math.min(buffer.remaining(), 65536);
-                cnt = inputStream.read(a, off, rem);
-                if (cnt == -1) {
-                    return false;
-                }
-                skip(current, cnt);
-            } else {
-                final int rem = Math.min(buffer.remaining(), 65536);
-                final byte[] a = new byte[rem];
-                cnt = inputStream.read(a);
-                if (cnt == -1) {
-                    return false;
-                }
-                current.put(a);
-            }
-            buffer.putShort(1, (short) cnt);
-            current = flip(buffer);
-            return true;
+        public void handleOpened(final WritableMessageChannel channel) {
+            this.channel = channel;
+            executor.execute(fillTask);
         }
 
-        private void prepareChunk(final StreamSinkChannel channel) {
-            try {
-                eof = fillBuffer();
-            } catch (Throwable e) {
-                try {
-                    current = ByteBuffer.wrap(("e" + e.getMessage()).getBytes("utf-8"));
-                } catch (UnsupportedEncodingException e1) {
-                    current = ByteBuffer.wrap(new byte[] { ERROR });
-                }
-                eof = true;
-            }
-            channel.resumeWrites();
+        public void handleReadable(final WritableMessageChannel channel) {
+            // not called on a sink channel
         }
 
-        public void handleOpened(final StreamSinkChannel channel) {
-            if (channel.getOptions().contains(CommonOptions.TCP_NODELAY)) {
-                try {
-                    channel.setOption(CommonOptions.TCP_NODELAY, Boolean.TRUE);
-                } catch (IOException e) {
-                    // not too big a deal; just skip it
-                    log.trace(e, "Failed to enable TCP_NODELAY");
+        public void handleWritable(final WritableMessageChannel channel) {
+            synchronized (lock) {
+                final ByteBuffer buffer = writing;
+                if (buffer == null) {
+                    if (eof) {
+                        IoUtils.safeClose(channel);
+                    } else {
+                        executor.execute(fillTask);
+                    }
+                } else {
+                    final boolean sent;
+                    try {
+                        sent = channel.send(buffer);
+                    } catch (IOException e) {
+                        log.debug("Channel write failed: %s", e);
+                        IoUtils.safeClose(channel);
+                        return;
+                    }
+                    if (sent) {
+                        writing = null;
+                        allocator.free(buffer);
+                        executor.execute(fillTask);
+                    } else {
+                        channel.resumeWrites();
+                    }
                 }
             }
-            prepareChunk(channel);
         }
 
-        public void handleReadable(final StreamSinkChannel channel) {
-            // not called on a sink channel
+        public void handleClosed(final WritableMessageChannel channel) {
+            synchronized (this) {
+                IoUtils.safeClose(inputStream);
+            }
         }
 
-        public void handleWritable(final StreamSinkChannel channel) {
-            while (current.hasRemaining()) {
+        private final class FillTask implements Runnable {
+            public void run() {
                 try {
-                    final int c = channel.write(current);
-                    if (c == 0) {
-                        channel.resumeWrites();
-                        return;
+                    final ByteBuffer buffer = allocator.allocate();
+                    buffer.put((byte) DATA_CHUNK);
+                    buffer.putShort((short) 0);
+                    final int rem = buffer.remaining();
+                    final int cnt;
+                    if (buffer.hasArray()) {
+                        final byte[] a = buffer.array();
+                        final int off = buffer.arrayOffset();
+                        cnt = inputStream.read(a, off, rem);
+                        if (cnt == -1) {
+                            synchronized (lock) {
+                                eof = true;
+                                return;
+                            }
+                        }
+                        skip(buffer, cnt);
+                    } else {
+                        final byte[] a = new byte[rem];
+                        cnt = inputStream.read(a);
+                        if (cnt == -1) {
+                            synchronized (lock) {
+                                eof = true;
+                                return;
+                            }
+                        }
+                        buffer.put(a);
                     }
+                    buffer.putShort(1, (short) cnt);
+                    synchronized (lock) {
+                        writing = flip(buffer);
+                    }
+                    channel.resumeWrites();
+                    return;
                 } catch (IOException e) {
-                    log.debug("Channel write failed: %s", e);
-                    IoUtils.safeClose(channel);
+                    synchronized (lock) {
+                        eof = true;
+                        try {
+                            // this could probably be improved upon
+                            writing = ByteBuffer.wrap((Character.toString((char) ERROR) + e.getMessage()).getBytes("utf-8"));
+                        } catch (UnsupportedEncodingException e1) {
+                            writing = ByteBuffer.wrap(new byte[] { ERROR });
+                        }
+                    }
                 }
             }
-            if (eof) {
-                IoUtils.safeClose(channel);
-            } else {
-                prepareChunk(channel);
-            }
         }
-
-        public void handleClosed(final StreamSinkChannel channel) {
-            synchronized (this) {
-                IoUtils.safeClose(inputStream);
-            }
-        }
     }
 
-    public static final class RemoteHandler implements IoHandler<StreamSourceChannel> {
+    public static final class RemoteHandler implements IoHandler<ReadableAllocatedMessageChannel> {
 
-        private enum DecoderState {
-            NEW_CHUNK,
-            IN_ERROR,
-            IN_DATA,
-        }
+        private final Object lock = new Object();
 
-        private final RemoteInputStream remoteInputStream;
-        private final ByteBuffer initialBuffer = ByteBuffer.allocate(5);
+        private ByteBuffer current;
+        private boolean done;
+        private IOException exception;
 
-        private volatile ByteBuffer dataBuffer = null;
+        private RemoteHandler() {
+        }
 
-        private volatile DecodingBuilder exceptionBuilder;
-        private volatile DecoderState decoderState = DecoderState.NEW_CHUNK;
-
-        private RemoteHandler(final RemoteInputStream remoteInputStream, final BufferAllocator<ByteBuffer> allocator) {
-            this.remoteInputStream = remoteInputStream;
+        public ByteBuffer getBuffer() throws IOException {
+            synchronized (lock) {
+                if (exception != null) {
+                    final IOException ex = new IOException("I/O exception from channel receive");
+                    ex.initCause(exception);
+                    throw ex;
+                }
+                try {
+                    while (current == null && ! done) {
+                        lock.wait();
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new InterruptedIOException("Interrupted while reading from input stream");
+                }
+                try {
+                    return current;
+                } finally {
+                    current = null;
+                }
+            }
         }
 
-        public void handleOpened(final StreamSourceChannel channel) {
+        public void handleOpened(final ReadableAllocatedMessageChannel channel) {
             channel.resumeReads();
         }
 
-        public void handleReadable(final StreamSourceChannel channel) {
-            try {
-                for (;;) switch (decoderState) {
-                    case NEW_CHUNK: {
-                        int n = channel.read(initialBuffer);
-                        if (n == -1) {
-                            IoUtils.safeClose(channel);
-                            return;
+        public void handleReadable(final ReadableAllocatedMessageChannel channel) {
+            synchronized (lock) {
+                if (current != null) {
+                    return;
+                }
+                try {
+                    final ByteBuffer buffer = channel.receive();
+                    if (buffer == null) {
+                        IoUtils.safeClose(channel);
+                        return;
+                    }
+                    if (! buffer.hasRemaining()) {
+                        channel.resumeReads();
+                        return;
+                    }
+                    final byte type = buffer.get();
+                    switch (type) {
+                        case DATA_CHUNK: {
+                            current = buffer;
+                            // only one waiter would be able to use this anyway
+                            lock.notify();
+                            break;
                         }
-                        if (n == 0) {
-                            remoteInputStream.scheduleResumeReads(channel);
-                            return;
-                        }
-                        if (initialBuffer.get(0) == DATA_CHUNK) {
-                            if (initialBuffer.hasRemaining()) {
-                                handleReadable(channel);
-                                return;
+                        case ERROR: {
+                            if (buffer.hasArray()) {
+                                IoUtils.safeClose(channel);
+                                final byte[] a = buffer.array();
+                                final int offs = buffer.arrayOffset();
+                                final int rem = buffer.remaining();
+                                exception = new IOException(new String(a, offs + 1, rem, "utf-8"));
                             }
-                            initialBuffer.flip();
-                            initialBuffer.get();
-                            final int length = (initialBuffer.getShort() - 1) & 0xffff + 1;
-                            dataBuffer = ByteBuffer.allocate(length);
-                            decoderState = DecoderState.IN_DATA;
                             break;
-                        } else if (initialBuffer.get(0) == ERROR) {
-                            decoderState = DecoderState.IN_ERROR;
-                            initialBuffer.flip();
-                            initialBuffer.get();
-                            exceptionBuilder.append(initialBuffer);
-                            initialBuffer.clear();
-                            break;
-                        } else {
-                            remoteInputStream.acceptException("Received garbage from remote side");
-                            IoUtils.safeClose(channel);
-                            return;
                         }
-                    }
-                    case IN_ERROR: {
-                        ByteBuffer buffer = ByteBuffer.allocate(256);
-                        int n = channel.read(buffer);
-                        if (n == -1) {
-                            remoteInputStream.acceptException(exceptionBuilder.finish().toString());
-                            exceptionBuilder = null;
+                        default: {
                             IoUtils.safeClose(channel);
-                            return;
+                            exception = new IOException("Remote data stream was corrupted");
+                            break;
                         }
-                        if (n == 0) {
-                            remoteInputStream.scheduleResumeReads(channel);
-                            return;
-                        }
-                        exceptionBuilder.append(buffer);
-                        break;
                     }
-                    case IN_DATA: {
-                        if (! dataBuffer.hasRemaining()) {
-                            dataBuffer.flip();
-                            remoteInputStream.acceptBuffer(dataBuffer);
-                            dataBuffer = null;
-                            decoderState = DecoderState.NEW_CHUNK;
-                        }
-                        int n = channel.read(dataBuffer);
-                        if (n == -1) {
-                            IoUtils.safeClose(channel);
-                            return;
-                        }
-                        if (n == 0) {
-                            remoteInputStream.scheduleResumeReads(channel);
-                            return;
-                        }
-                        break;
-                    }
+                } catch (IOException e) {
+                    IoUtils.safeClose(channel);
+                    // should only be one waiter, but just in case, notify em all so they all catch the exception...
+                    exception = e;
                 }
-            } catch (IOException e) {
-                remoteInputStream.acceptException("Read from remote input stream failed: " + e.getMessage());
-                IoUtils.safeClose(channel);
             }
         }
 
-        public void handleWritable(final StreamSourceChannel channel) {
+        public void handleWritable(final ReadableAllocatedMessageChannel channel) {
+            // empty
         }
 
-        public void handleClosed(final StreamSourceChannel channel) {
-            remoteInputStream.acceptEof();
+        public void handleClosed(final ReadableAllocatedMessageChannel channel) {
+            synchronized (lock) {
+                done = true;
+                lock.notifyAll();
+            }
         }
     }
 
     public static final class RemoteInputStream extends InputStream {
 
-        private enum StreamState {
-            RUNNING,
-            EOF,
-            CLOSED,
-        }
-
-        private final IoFuture<StreamSourceChannel> futureChannel;
         private final BufferAllocator<ByteBuffer> allocator;
 
         private final Object lock = new Object();
 
         // @protectedby lock
-        private StreamState state;
         private ByteBuffer current;
-        private ByteBuffer next;
-        private String pendingException;
-        private boolean pendingResumeReads = false;
 
-        private RemoteInputStream(final IoFuture<StreamSourceChannel> futureChannel, final BufferAllocator<ByteBuffer> allocator) {
+        private final IoFuture<? extends ReadableAllocatedMessageChannel> futureChannel;
+        private final RemoteHandler handler;
+
+        private RemoteInputStream(final IoFuture<? extends ReadableAllocatedMessageChannel> futureChannel, final BufferAllocator<ByteBuffer> allocator, final RemoteHandler handler) {
             this.futureChannel = futureChannel;
             this.allocator = allocator;
+            this.handler = handler;
         }
 
-        protected void acceptBuffer(ByteBuffer buffer) {
-            synchronized (lock) {
-                if (! buffer.hasRemaining()) {
-                    throw new IllegalArgumentException("empty buffer");
-                }
-                if (state == StreamState.CLOSED) {
-                    allocator.free(buffer);
-                }
-                if (current == null) {
+        // call under {@code lock}
+        private ByteBuffer getCurrent() throws IOException {
+            if (current != null) {
+                return current;
+            } else {
+                final ByteBuffer buffer = handler.getBuffer();
+                if (buffer != null) {
                     current = buffer;
-                    lock.notifyAll();
-                } else if (next == null) {
-                    next = buffer;
+                    return buffer;
                 } else {
-                    throw new IllegalStateException();
+                    return null;
                 }
             }
         }
 
-        protected void acceptException(String exception) {
-            synchronized (lock) {
-                pendingException = exception;
-                if (current == null) {
-                    lock.notifyAll();
-                }
-            }
-        }
-
-        protected void acceptEof() {
-            synchronized (lock) {
-                if (state == StreamState.RUNNING) {
-                    state = StreamState.EOF;
-                    if (current == null) {
-                        lock.notifyAll();
-                    }
-                }
-            }
-        }
-
-        protected void scheduleResumeReads(StreamSourceChannel channel) {
-            synchronized (lock) {
-                if (state == StreamState.CLOSED || state == StreamState.EOF) {
-                    return;
-                }
-                if (next == null || current == null) {
-                    channel.resumeReads();
-                } else {
-                    pendingResumeReads = true;
-                }
-            }
-        }
-
-        private ByteBuffer getCurrent() throws IOException {
-            boolean intr = false;
-            try {
-                while (current == null) {
-                    if (pendingException != null) {
-                        throw new IOException(pendingException);
-                    } else if (state == StreamState.EOF) {
-                        return null;
-                    }
-                    try {
-                        lock.wait();
-                    } catch (InterruptedException e) {
-                        intr = true;
-                    }
-                }
-                return current;
-            } finally {
-                if (intr) {
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-
         public int read() throws IOException {
             synchronized (lock) {
-                if (state == StreamState.CLOSED) {
-                    return -1;
-                }
                 final ByteBuffer buffer = getCurrent();
                 if (buffer == null) {
                     return -1;
                 }
                 final byte v = buffer.get();
                 if (! buffer.hasRemaining()) {
-                    current = next;
-                    next = null;
+                    current = null;
                     allocator.free(buffer);
-                    if (pendingResumeReads) {
-                        futureChannel.get().resumeReads();
-                        pendingResumeReads = false;
-                    }
                 }
                 return v & 0xff;
             }
         }
 
-        public int read(final byte b[], final int off, final int len) throws IOException {
+        public int read(final byte[] b, final int off, final int len) throws IOException {
             synchronized (lock) {
-                if (state == StreamState.CLOSED) {
-                    return -1;
-                }
                 final ByteBuffer buffer = getCurrent();
                 if (buffer == null) {
                     return -1;
@@ -402,13 +331,8 @@
                 final int cnt = Math.min(buffer.remaining(), len);
                 buffer.get(b, off, cnt);
                 if (! buffer.hasRemaining()) {
-                    current = next;
-                    next = null;
+                    current = null;
                     allocator.free(buffer);
-                    if (pendingResumeReads) {
-                        futureChannel.get().resumeReads();
-                        pendingResumeReads = false;
-                    }
                 }
                 return cnt;
             }
@@ -416,24 +340,17 @@
 
         public void close() throws IOException {
             synchronized (lock) {
-                if (state != StreamState.CLOSED) {
-                    if (current != null) {
-                        allocator.free(current);
-                        current = null;
-                    }
-                    if (next != null) {
-                        allocator.free(next);
-                        next = null;
-                    }
-                    state = StreamState.CLOSED;
-                    futureChannel.get().close();
+                if (current != null) {
+                    allocator.free(current);
+                    current = null;
                 }
+                futureChannel.get().close();
             }
         }
 
         public int available() throws IOException {
             synchronized (lock) {
-                return current == null ? 0 : current.remaining() + (next == null ? 0 : next.remaining());
+                return current == null ? 0 : current.remaining();
             }
         }
     }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java	2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/ObjectSourceStreamSerializerFactory.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -1,17 +1,23 @@
 package org.jboss.cx.remoting.core.stream;
 
 import java.io.IOException;
+import java.util.concurrent.Executor;
+import java.util.List;
+import java.util.ArrayList;
+import java.nio.ByteBuffer;
 import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.cx.remoting.spi.stream.StreamContext;
 import org.jboss.cx.remoting.stream.ObjectSource;
-import org.jboss.xnio.channels.StreamChannel;
-import org.jboss.xnio.channels.StreamSourceChannel;
-import org.jboss.xnio.channels.StreamSinkChannel;
-import org.jboss.xnio.channels.CommonOptions;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
 import org.jboss.xnio.IoHandler;
 import org.jboss.xnio.ChannelSource;
 import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.BufferAllocator;
+import static org.jboss.xnio.Buffers.flip;
 import org.jboss.xnio.log.Logger;
 import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.ByteOutput;
 
 /**
  *
@@ -22,7 +28,6 @@
 
     private static final Logger log = Logger.getLogger(ObjectSourceStreamSerializerFactory.class);
 
-
     private MarshallerFactory marshallerFactory;
 
     public MarshallerFactory getMarshallerFactory() {
@@ -33,55 +38,159 @@
         this.marshallerFactory = marshallerFactory;
     }
 
-    public IoHandler<? super StreamChannel> getLocalSide(final Object localSide) throws IOException {
-        
+    public IoHandler<? super AllocatedMessageChannel> getLocalSide(final Object localSide, final StreamContext streamContext) throws IOException {
         return null;
     }
 
-    public Object getRemoteSide(final ChannelSource<StreamChannel> remoteClient) throws IOException {
+    public Object getRemoteSide(final ChannelSource<AllocatedMessageChannel> channelSource, final StreamContext streamContext) throws IOException {
         return null;
     }
 
-    public static class LocalHandler implements IoHandler<StreamSinkChannel> {
+    public static class LocalHandler implements IoHandler<AllocatedMessageChannel> {
         private final ObjectSource objectSource;
+        private final Object lock = new Object();
+        private final Executor executor;
+        private final Marshaller marshaller;
+        private ByteBuffer[] current;
+        private final Runnable fillTask = new FillTask();
+        private final BufferAllocator<ByteBuffer> allocator;
 
-        public LocalHandler(final ObjectSource source) {
+        public LocalHandler(final ObjectSource source, final Executor executor, final Marshaller marshaller, final BufferAllocator<ByteBuffer> allocator) {
             objectSource = source;
+            this.executor = executor;
+            this.marshaller = marshaller;
+            this.allocator = allocator;
         }
 
-        public void handleOpened(final StreamSinkChannel channel) {
-            if (channel.getOptions().contains(CommonOptions.TCP_NODELAY)) try {
-                channel.setOption(CommonOptions.TCP_NODELAY, Boolean.TRUE);
-            } catch (Exception e) {
-                log.trace("Error setting TCP_NODELAY option: %s", e.getMessage());
-            }
-            channel.resumeWrites();
+        public void handleOpened(final AllocatedMessageChannel channel) {
+            executor.execute(fillTask);
         }
 
-        public void handleReadable(final StreamSinkChannel channel) {
+        public void handleReadable(final AllocatedMessageChannel channel) {
             // not invoked
         }
 
-        public void handleWritable(final StreamSinkChannel channel) {
+        public void handleWritable(final AllocatedMessageChannel channel) {
         }
 
-        public void handleClosed(final StreamSinkChannel channel) {
+        public void handleClosed(final AllocatedMessageChannel channel) {
             IoUtils.safeClose(objectSource);
         }
+
+        public class FillTask implements Runnable {
+            public void run() {
+                try {
+                    if (objectSource.hasNext()) {
+                        final BufferProducingByteOutput output = new BufferProducingByteOutput(allocator);
+                        try {
+                            marshaller.start(output);
+                            marshaller.writeObject(objectSource.next());
+                            marshaller.finish();
+                            output.flush();
+                            final ByteBuffer[] buffers = output.takeBuffers();
+                            
+                        } finally {
+                            IoUtils.safeClose(output);
+                        }
+
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
     }
 
-    public static class RemoteHandler implements IoHandler<StreamSourceChannel> {
+    public static class BufferProducingByteOutput implements ByteOutput {
 
-        public void handleOpened(final StreamSourceChannel channel) {
+        private final BufferAllocator<ByteBuffer> allocator;
+        private final List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+        private ByteBuffer current;
+
+        public BufferProducingByteOutput(final BufferAllocator<ByteBuffer> allocator) {
+            this.allocator = allocator;
         }
 
-        public void handleReadable(final StreamSourceChannel channel) {
+        public void write(final int i) throws IOException {
+            ByteBuffer buffer = current;
+            if (buffer == null) {
+                buffer = (current = allocator.allocate());
+            }
+            buffer.put((byte) i);
+            if (! buffer.hasRemaining()) {
+                buffers.add(flip(buffer));
+                current = null;
+            }
         }
 
-        public void handleWritable(final StreamSourceChannel channel) {
+        public void write(final byte[] bytes) throws IOException {
+            write(bytes, 0, bytes.length);
         }
 
-        public void handleClosed(final StreamSourceChannel channel) {
+        public void write(final byte[] bytes, int offs, int len) throws IOException {
+            while (len > 0) {
+                ByteBuffer buffer = current;
+                if (buffer == null) {
+                    buffer = (current = allocator.allocate());
+                }
+                final int rem = Math.min(buffer.remaining(), len);
+                buffer.put(bytes, offs, rem);
+                offs += rem;
+                len -= rem;
+                if (! buffer.hasRemaining()) {
+                    buffers.add(flip(buffer));
+                    current = null;
+                }
+            }
         }
+
+        public void close() throws IOException {
+            flush();
+        }
+
+        public void flush() throws IOException {
+            final ByteBuffer buffer = current;
+            if (buffer != null) {
+                buffers.add(buffer);
+                current = null;
+            }
+        }
+
+        public ByteBuffer[] takeBuffers() {
+            try {
+                return buffers.toArray(new ByteBuffer[buffers.size()]);
+            } finally {
+                buffers.clear();
+            }
+        }
     }
+
+    public static class RemoteHandler implements IoHandler<AllocatedMessageChannel> {
+
+        public void handleOpened(final AllocatedMessageChannel channel) {
+        }
+
+        public void handleReadable(final AllocatedMessageChannel channel) {
+        }
+
+        public void handleWritable(final AllocatedMessageChannel channel) {
+        }
+
+        public void handleClosed(final AllocatedMessageChannel channel) {
+        }
+    }
+
+    public static class RemoteObjectSource implements ObjectSource {
+
+        public boolean hasNext() throws IOException {
+            return false;
+        }
+
+        public Object next() throws IOException {
+            return null;
+        }
+
+        public void close() throws IOException {
+        }
+    }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java	2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/stream/OutputStreamStreamSerailizerFactory.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -2,13 +2,16 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channel;
-import java.util.concurrent.Semaphore;
+import java.util.concurrent.Executor;
 import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
-import org.jboss.xnio.channels.StreamChannel;
-import org.jboss.xnio.channels.StreamSourceChannel;
-import org.jboss.xnio.channels.StreamSinkChannel;
+import org.jboss.cx.remoting.spi.stream.StreamContext;
+import org.jboss.cx.remoting.util.OrderedExecutor;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.WritableMessageChannel;
 import org.jboss.xnio.IoHandler;
 import org.jboss.xnio.ChannelSource;
 import org.jboss.xnio.BufferAllocator;
@@ -25,123 +28,196 @@
 
     private static final long serialVersionUID = -5934238025840749071L;
 
-    public IoHandler<? super StreamChannel> getLocalSide(final Object localSide) throws IOException {
-        return new LocalHandler((OutputStream) localSide, new BufferAllocator<ByteBuffer>() {
-            public ByteBuffer allocate() {
-                return ByteBuffer.allocate(512);
-            }
+    private BufferAllocator<ByteBuffer> allocator;
 
-            public void free(final ByteBuffer byteBuffer) {
-            }
-        });
+    public BufferAllocator<ByteBuffer> getAllocator() {
+        return allocator;
     }
 
-    public Object getRemoteSide(final ChannelSource<StreamChannel> remoteClient) throws IOException {
-        final RemoteHandler handler = new RemoteHandler(new BufferAllocator<ByteBuffer>() {
-            public ByteBuffer allocate() {
-                return ByteBuffer.allocate(512);
-            }
+    public void setAllocator(final BufferAllocator<ByteBuffer> allocator) {
+        this.allocator = allocator;
+    }
 
-            public void free(final ByteBuffer byteBuffer) {
-            }
-        });
-        final IoFuture<StreamChannel> futureChannel = remoteClient.open(handler);
-        return new RemoteOutputStream(handler, futureChannel);
+    public IoHandler<? super AllocatedMessageChannel> getLocalSide(final Object localSide, final StreamContext streamContext) throws IOException {
+        return new LocalHandler((OutputStream) localSide, allocator, new OrderedExecutor(streamContext.getExecutor()));
     }
 
-    public static final class LocalHandler implements IoHandler<StreamSourceChannel> {
+    public Object getRemoteSide(final ChannelSource<AllocatedMessageChannel> remoteClient, final StreamContext streamContext) throws IOException {
+        final RemoteHandler handler = new RemoteHandler(allocator);
+        final IoFuture<AllocatedMessageChannel> futureChannel = remoteClient.open(handler);
+        return new RemoteOutputStream(handler, futureChannel, allocator);
+    }
 
+    public static final class LocalHandler implements IoHandler<AllocatedMessageChannel> {
+
         private final OutputStream outputStream;
         private final BufferAllocator<ByteBuffer> allocator;
+        private final Executor executor;
+        private volatile String exceptionMessage;
 
-        public LocalHandler(final OutputStream outputStream, final BufferAllocator<ByteBuffer> allocator) {
+        public LocalHandler(final OutputStream outputStream, final BufferAllocator<ByteBuffer> allocator, final Executor executor) {
             this.outputStream = outputStream;
             this.allocator = allocator;
+            this.executor = executor;
         }
 
-        public void handleOpened(final StreamSourceChannel channel) {
+        public void handleOpened(final AllocatedMessageChannel channel) {
             channel.resumeReads();
         }
 
-        public void handleReadable(final StreamSourceChannel channel) {
-            ByteBuffer buffer = allocator.allocate();
+        public void handleReadable(final AllocatedMessageChannel channel) {
             try {
-                for (;; buffer.clear()) {
-                    final int c = channel.read(buffer);
-                    if (c == 0) {
+                for (;;) {
+                    final ByteBuffer buffer = channel.receive();
+                    if (buffer == null) {
+                        IoUtils.safeClose(channel);
+                        log.trace("Remote output stream closed normally");
+                    } else if (! buffer.hasRemaining()) {
                         channel.resumeReads();
                         return;
-                    } else if (c == -1) {
-                        IoUtils.safeClose(channel);
-                        log.trace("Remote output stream closed normally");
                     } else {
                         buffer.flip();
-                        if (buffer.hasArray()) {
-                            outputStream.write(buffer.array(), buffer.arrayOffset(), buffer.remaining());
-                        } else {
-                            final byte[] bytes = new byte[buffer.remaining()];
-                            buffer.get(bytes);
-                            outputStream.write(bytes);
-                        }
+                        executor.execute(new Runnable() {
+                            public void run() {
+                                try {
+                                    if (buffer.hasArray()) {
+                                        outputStream.write(buffer.array(), buffer.arrayOffset(), buffer.remaining());
+                                    } else {
+                                        final byte[] bytes = new byte[buffer.remaining()];
+                                        buffer.get(bytes);
+                                        outputStream.write(bytes);
+                                    }
+                                    channel.resumeReads();
+                                } catch (Throwable t) {
+                                    exceptionMessage = t.getMessage();
+                                    channel.resumeWrites();
+                                    try {
+                                        channel.shutdownReads();
+                                    } catch (Throwable tt) {
+                                        log.warn(tt, "Unable to shutdown reads on a channel");
+                                    }
+                                } finally {
+                                    allocator.free(buffer);
+                                }
+                            }
+                        });
                     }
                 }
             } catch (IOException e) {
                 IoUtils.safeClose(channel);
                 log.trace("Remote output stream closed due to exception: %s", e.getMessage());
             } finally {
-                allocator.free(buffer);
             }
         }
 
-        public void handleWritable(final StreamSourceChannel channel) {
+        public void handleWritable(final AllocatedMessageChannel channel) {
+            final String msg = exceptionMessage;
+            if (msg == null) {
+                // spurious...
+                return;
+            }
+            try {
+                final ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes("utf-8"));
+                if (! channel.send(buffer)) {
+                    channel.resumeWrites();
+                    return;
+                }
+            } catch (UnsupportedEncodingException e) {
+                // should not happen; if it does, just close the channel
+            } catch (IOException e) {
+                // nothing we can do about it
+            }
+            IoUtils.safeClose(channel);
+            exceptionMessage = null;
         }
 
-        public void handleClosed(final StreamSourceChannel channel) {
+        public void handleClosed(final AllocatedMessageChannel channel) {
             IoUtils.safeClose(outputStream);
         }
     }
 
-    public static final class RemoteHandler implements IoHandler<StreamSinkChannel> {
+    public static final class RemoteHandler implements IoHandler<AllocatedMessageChannel> {
 
-        private final Semaphore semaphore = new Semaphore(0);
+        private final Object lock = new Object();
         private final BufferAllocator<ByteBuffer> allocator;
 
-        private volatile boolean closed;
-        private volatile ByteBuffer buffer;
+        private boolean closed;
+        private ByteBuffer buffer;
+        private IOException exception;
 
         private RemoteHandler(final BufferAllocator<ByteBuffer> allocator) {
             this.allocator = allocator;
         }
 
-        public void handleOpened(final StreamSinkChannel channel) {
-            // block sends until the channel is up
-            semaphore.release();
+        public void pushBuffer(final WritableMessageChannel channel, final ByteBuffer buffer) throws IOException {
+            synchronized (lock) {
+                final IOException exception = this.exception;
+                if (exception != null) {
+                    this.exception = null;
+                    IOException ioe = new IOException("Write failed");
+                    ioe.initCause(exception);
+                    throw ioe;
+                }
+                if (closed) {
+                    throw new IOException("Channel closed");
+                }
+                while (this.buffer != null) {
+                    try {
+                        lock.wait();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new InterruptedIOException("Operation interrupted");
+                    }
+                }
+                if (! channel.send(buffer)) {
+                    channel.resumeWrites();
+                    this.buffer = buffer;
+                }
+            }
         }
 
-        public void handleReadable(final StreamSinkChannel channel) {
+        public void handleOpened(final AllocatedMessageChannel channel) {
+            channel.resumeReads();
+            synchronized (lock) {
+                if (buffer != null) {
+                    channel.resumeWrites();
+                }
+            }
         }
 
-        public void handleWritable(final StreamSinkChannel channel) {
-            final ByteBuffer buffer = this.buffer;
-            if (buffer != null) {
+        public void handleReadable(final AllocatedMessageChannel channel) {
+            try {
+                final ByteBuffer buffer = channel.receive();
+                if (buffer == null) {
+                    // normal close
+                    IoUtils.safeClose(channel);
+                }
+            } catch (IOException e) {
+                exception = new IOException("Received unexpected I/O exception");
+                exception.initCause(e);
+                IoUtils.safeClose(channel);
+            }
+        }
+
+        public void handleWritable(final AllocatedMessageChannel channel) {
+            synchronized (lock) {
+                final ByteBuffer buffer = this.buffer;
+                if (buffer == null) {
+                    return;
+                }
                 try {
-                    while (buffer.hasRemaining()) {
-                        if (channel.write(buffer) == 0) {
-                            channel.resumeWrites();
-                            return;
-                        }
+                    if (channel.send(buffer)) {
+                        allocator.free(buffer);
+                        this.buffer = null;
                     }
                 } catch (IOException e) {
-                    log.trace("Send exception: %s", e.getMessage());
+                    exception = e;
                     IoUtils.safeClose(channel);
-                    semaphore.release();
                 }
-                this.buffer = null;
-                allocator.free(buffer);
             }
         }
 
-        public void handleClosed(final StreamSinkChannel channel) {
+        public void handleClosed(final AllocatedMessageChannel channel) {
             closed = true;
             final ByteBuffer buffer = this.buffer;
             if (buffer != null) {
@@ -149,80 +225,94 @@
                 allocator.free(buffer);
             }
         }
-
-        private void send(final ByteBuffer buffer) throws IOException {
-            if (closed) {
-                throw new IOException("Channel closed");
-            }
-            semaphore.acquireUninterruptibly();
-            if (closed) {
-                semaphore.release();
-                allocator.free(buffer);
-                throw new IOException("Channel closed");
-            }
-            this.buffer = buffer;
-        }
     }
 
     public static final class RemoteOutputStream extends OutputStream {
 
         private final RemoteHandler handler;
-        private final IoFuture<? extends Channel> futureChannel;
+        private final IoFuture<? extends WritableMessageChannel> futureChannel;
+        private final Object lock = new Object();
+        private final BufferAllocator<ByteBuffer> allocator;
         private ByteBuffer buffer;
 
-        public RemoteOutputStream(final RemoteHandler handler, final IoFuture<? extends Channel> futureChannel) {
+        public RemoteOutputStream(final RemoteHandler handler, final IoFuture<? extends WritableMessageChannel> futureChannel, final BufferAllocator<ByteBuffer> allocator) {
             this.handler = handler;
             this.futureChannel = futureChannel;
+            this.allocator = allocator;
+            synchronized (lock) {
+                buffer = allocator.allocate();
+            }
         }
 
         public void write(final int b) throws IOException {
-            if (handler.closed) {
-                throw new IOException("Channel closed");
+            synchronized (lock) {
+                final ByteBuffer buffer = this.buffer;
+                if (buffer == null) {
+                    throw new IOException("Channel closed");
+                }
+                buffer.put((byte)b);
+                if (! buffer.hasRemaining()) {
+                    flush();
+                }
             }
-            if (buffer == null) {
-                buffer = handler.allocator.allocate();
-            }
-            buffer.put((byte)b);
-            if (! buffer.hasRemaining()) {
-                flush();
-            }
         }
 
         public void write(final byte[] bytes, int offset, int length) throws IOException {
-            if (handler.closed) {
-                throw new IOException("Channel closed");
-            }
-            if (buffer == null) {
-                buffer = handler.allocator.allocate();
-            }
-            while (length > 0) {
-                int size = Math.min(buffer.remaining(), length);
-                buffer.put(bytes, offset, size);
-                length -= size; offset += size;
-                if (! buffer.hasRemaining()) {
-                    flush();
+            synchronized (lock) {
+                while (length > 0) {
+                    final ByteBuffer buffer = this.buffer;
+                    if (buffer == null) {
+                        throw new IOException("Channel closed");
+                    }
+                    int size = Math.min(buffer.remaining(), length);
+                    buffer.put(bytes, offset, size);
+                    length -= size; offset += size;
+                    if (! buffer.hasRemaining()) {
+                        flush();
+                    }
                 }
             }
         }
 
         public void flush() throws IOException {
-            try {
-                handler.send(buffer);
-            } finally {
-                buffer = null;
+            synchronized (lock) {
+                if (doFlush()) {
+                    buffer = allocator.allocate();
+                }
             }
         }
 
+        private boolean doFlush() throws IOException {
+            final ByteBuffer buffer = this.buffer;
+            if (buffer != null) {
+                handler.pushBuffer(futureChannel.get(), buffer);
+                return true;
+            } else {
+                return false;
+            }
+        }
+
         public void close() throws IOException {
             final Channel channel;
             try {
                 channel = futureChannel.get();
+                if (channel == null) {
+                    return;
+                }
             } catch (IOException ex) {
                 // throwing this exception would cause close() to appear to not be idempotent
                 log.trace("No channel to close: %s", ex.getMessage());
                 return;
             }
-            channel.close();
+            try {
+                synchronized (lock) {
+                    doFlush();
+                    buffer = null;
+                }
+                channel.close();
+            } finally {
+                IoUtils.safeClose(channel);
+            }
         }
     }
 }

Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java	2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/DelegatingObjectInput.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -1,103 +0,0 @@
-package org.jboss.cx.remoting.core.util;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-
-/**
- *
- */
-public class DelegatingObjectInput implements ObjectInput {
-    private final ObjectInput delegate;
-
-    public DelegatingObjectInput(final ObjectInput delegate) {
-        this.delegate = delegate;
-    }
-
-    public int read() throws IOException {
-        return delegate.read();
-    }
-
-    public int read(final byte[] data) throws IOException {
-        return delegate.read(data);
-    }
-
-    public int read(final byte[] data, final int offs, final int len) throws IOException {
-        return delegate.read(data, offs, len);
-    }
-
-    public void close() throws IOException {
-        delegate.close();
-    }
-
-    public Object readObject() throws ClassNotFoundException, IOException {
-        return delegate.readObject();
-    }
-
-    public long skip(final long n) throws IOException {
-        return delegate.skip(n);
-    }
-
-    public int available() throws IOException {
-        return delegate.available();
-    }
-
-    public void readFully(final byte[] b) throws IOException {
-        delegate.readFully(b);
-    }
-
-    public void readFully(final byte[] b, final int off, final int len) throws IOException {
-        delegate.readFully(b, off, len);
-    }
-
-    public int skipBytes(final int n) throws IOException {
-        return delegate.skipBytes(n);
-    }
-
-    public boolean readBoolean() throws IOException {
-        return delegate.readBoolean();
-    }
-
-    public byte readByte() throws IOException {
-        return delegate.readByte();
-    }
-
-    public int readUnsignedByte() throws IOException {
-        return delegate.readUnsignedByte();
-    }
-
-    public short readShort() throws IOException {
-        return delegate.readShort();
-    }
-
-    public int readUnsignedShort() throws IOException {
-        return delegate.readUnsignedShort();
-    }
-
-    public char readChar() throws IOException {
-        return delegate.readChar();
-    }
-
-    public int readInt() throws IOException {
-        return delegate.readInt();
-    }
-
-    public long readLong() throws IOException {
-        return delegate.readLong();
-    }
-
-    public float readFloat() throws IOException {
-        return delegate.readFloat();
-    }
-
-    public double readDouble() throws IOException {
-        return delegate.readDouble();
-    }
-
-    public String readLine() throws IOException {
-        return delegate.readLine();
-    }
-
-    public String readUTF() throws IOException {
-        return delegate.readUTF();
-    }
-}

Deleted: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java	2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -1,50 +0,0 @@
-package org.jboss.cx.remoting.core.util;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-/**
- *
- */
-public final class OrderedExecutorFactory {
-    private final Executor parent;
-    private final Set<ChildExecutor> runningChildren = Collections.synchronizedSet(new HashSet<ChildExecutor>());
-
-    public OrderedExecutorFactory(final Executor parent) {
-        this.parent = parent;
-    }
-
-    public Executor getOrderedExecutor() {
-        return new ChildExecutor();
-    }
-
-    private final class ChildExecutor implements Executor, Runnable {
-        private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
-
-        public void execute(Runnable command) {
-            synchronized(tasks) {
-                tasks.add(command);
-                if (tasks.size() == 1 && runningChildren.add(this)) {
-                    parent.execute(this);
-                }
-            }
-        }
-
-        public void run() {
-            for (;;) {
-                final Runnable task;
-                synchronized(tasks) {
-                    task = tasks.poll();
-                    if (task == null) {
-                        runningChildren.remove(this);
-                        return;
-                    }
-                }
-                task.run();
-            }
-        }
-    }
-}

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-09-09 14:05:02 UTC (rev 4565)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -83,9 +83,9 @@
     private final ConcurrentIntegerMap<RequestHandler> remoteClients = concurrentIntegerMap();
     // forwarded to remote side (handled on this side)
     private final ConcurrentIntegerMap<Handle<RequestHandler>> forwardedClients = concurrentIntegerMap();
-    // sequence for forwarded clients
+    // sequence for forwarded clients (unsigned; shift left one bit, add one)
     private final AtomicInteger forwardedClientSequence = new AtomicInteger();
-    // sequence for clients created from services forwarded to us
+    // sequence for clients created from services forwarded to us (unsigned; shift left one bit)
     private final AtomicInteger remoteClientSequence = new AtomicInteger();
 
     // services forwarded to us

Added: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutor.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutor.java	                        (rev 0)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutor.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.util;
+
+import java.util.concurrent.Executor;
+import java.util.LinkedList;
+
+/**
+ * An executor that always runs all tasks in order, using a delegate executor to run the tasks.
+ * <p/>
+ * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the
+ * same method, will result in B's task running after A's.
+ */
+public final class OrderedExecutor implements Executor {
+    // @protectedby tasks
+    private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
+    // @protectedby tasks
+    private boolean running;
+    private final Executor parent;
+    private final Runnable runner;
+
+    /**
+     * Construct a new instance.
+     *
+     * @param parent the parent executor
+     */
+    public OrderedExecutor(final Executor parent) {
+        this.parent = parent;
+        runner = new Runnable() {
+            public void run() {
+                for (;;) {
+                    final Runnable task;
+                    synchronized(tasks) {
+                        task = tasks.poll();
+                        if (task == null) {
+                            running = false;
+                            return;
+                        }
+                    }
+                    task.run();
+                }
+            }
+        };
+    }
+
+    /**
+     * Run a task.
+     *
+     * @param command the task to run.
+     */
+    public void execute(Runnable command) {
+        synchronized(tasks) {
+            tasks.add(command);
+            if (! running) {
+                running = true;
+                parent.execute(runner);
+            }
+        }
+    }
+}

Copied: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutorFactory.java (from rev 4514, remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/util/OrderedExecutorFactory.java)
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutorFactory.java	                        (rev 0)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/OrderedExecutorFactory.java	2008-09-11 19:54:37 UTC (rev 4566)
@@ -0,0 +1,28 @@
+package org.jboss.cx.remoting.util;
+
+import java.util.concurrent.Executor;
+
+/**
+ * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
+ */
+public final class OrderedExecutorFactory {
+    private final Executor parent;
+
+    /**
+     * Construct a new instance delegating to the given parent executor.
+     *
+     * @param parent the parent executor
+     */
+    public OrderedExecutorFactory(final Executor parent) {
+        this.parent = parent;
+    }
+
+    /**
+     * Get an executor that always executes tasks in order.
+     *
+     * @return an ordered executor
+     */
+    public Executor getOrderedExecutor() {
+        return new OrderedExecutor(parent);
+    }
+}




More information about the jboss-remoting-commits mailing list