[jboss-remoting-commits] JBoss Remoting SVN: r4867 - remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Wed Mar 4 20:28:48 EST 2009


Author: david.lloyd at jboss.com
Date: 2009-03-04 20:28:47 -0500 (Wed, 04 Mar 2009)
New Revision: 4867

Added:
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/InputStreamHandlerFactory.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/OutputStreamHandlerFactory.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamContext.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandler.java
   remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandlerFactory.java
Log:
Stream SPI...?

Added: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/InputStreamHandlerFactory.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/InputStreamHandlerFactory.java	                        (rev 0)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/InputStreamHandlerFactory.java	2009-03-05 01:28:47 UTC (rev 4867)
@@ -0,0 +1,158 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * A handler factory for automatic forwarding of input streams.
+ */
+public final class InputStreamHandlerFactory implements StreamHandlerFactory<InputStream, StreamChannel> {
+
+    private static final Logger log = Logger.getLogger("org.jboss.remoting.stream.inputstream");
+
+    /** {@inheritDoc} */
+    public StreamHandler<InputStream, StreamChannel> createStreamHandler(final InputStream localInstance, final StreamContext streamContext) throws IOException {
+        return new Handler(localInstance, streamContext);
+    }
+
+    private static class Handler implements StreamHandler<InputStream, StreamChannel> {
+
+        private static final long serialVersionUID = 731898100063706343L;
+
+        private final StreamContext streamContext;
+        private transient final InputStream localInstance;
+
+        private Handler(final InputStream instance, final StreamContext context) {
+            localInstance = instance;
+            streamContext = context;
+        }
+
+        public IoHandler<StreamChannel> getLocalHandler() {
+            return new IoHandler<StreamChannel>() {
+                public void handleOpened(final StreamChannel channel) {
+                    streamContext.execute(new LocalRunnable(channel, localInstance));
+                }
+
+                public void handleClosed(final StreamChannel channel) {
+                    IoUtils.safeClose(localInstance);
+                }
+
+                public void handleReadable(final StreamChannel channel) {
+                }
+
+                public void handleWritable(final StreamChannel channel) {
+                }
+            };
+        }
+
+        public IoHandler<StreamChannel> getRemoteHandler() {
+            return IoUtils.nullHandler();
+        }
+
+        public InputStream getRemoteProxy(final IoFuture<? extends StreamChannel> futureChannel) {
+            return new ProxyInputStream(futureChannel);
+        }
+    }
+
+    private static class ProxyInputStream extends InputStream {
+
+        private final IoFuture<? extends StreamChannel> futureChannel;
+
+        public ProxyInputStream(final IoFuture<? extends StreamChannel> futureChannel) {
+            this.futureChannel = futureChannel;
+        }
+
+        public int read() throws IOException {
+            return 0;
+        }
+
+        public int read(final byte[] b, final int off, final int len) throws IOException {
+            final StreamChannel channel = futureChannel.get();
+            int res;
+            do {
+                res = channel.read(ByteBuffer.wrap(b, off, len));
+                if (res == 0) {
+                    channel.awaitReadable();
+                }
+            } while (res == 0);
+            return res;
+        }
+
+        public void close() throws IOException {
+            futureChannel.get().close();
+        }
+    }
+
+    private static class LocalRunnable implements Runnable {
+
+        private static final int MIN_BUFFER_SIZE = 512;
+        private static final int MAX_BUFFER_SIZE = 1024;
+
+        private final StreamChannel channel;
+        private final InputStream localInstance;
+
+        public LocalRunnable(final StreamChannel channel, final InputStream instance) {
+            this.channel = channel;
+            localInstance = instance;
+        }
+
+        public void run() {
+            try {
+                final byte[] bytes = new byte[MAX_BUFFER_SIZE];
+                final ByteBuffer buf = ByteBuffer.wrap(bytes);
+                for (;;) {
+                    int cnt = 0, pos;
+                    while ((pos = buf.position()) < MIN_BUFFER_SIZE) {
+                        cnt = localInstance.read(bytes, pos, buf.remaining());
+                        if (cnt == -1) {
+                            break;
+                        }
+                    }
+                    buf.flip();
+                    int wcnt;
+                    do {
+                        wcnt = channel.write(buf);
+                        if (wcnt == 0) {
+                            channel.awaitWritable();
+                        }
+                    } while (buf.hasRemaining());
+                    if (cnt == -1) {
+                        channel.close();
+                    }
+                    buf.clear();
+                }
+            } catch (IOException e) {
+                log.error(e, "Failed to read input stream data");
+                IoUtils.safeClose(channel);
+            }
+        }
+    }
+}

Added: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/OutputStreamHandlerFactory.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/OutputStreamHandlerFactory.java	                        (rev 0)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/OutputStreamHandlerFactory.java	2009-03-05 01:28:47 UTC (rev 4867)
@@ -0,0 +1,235 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import static java.lang.Math.min;
+import static java.lang.Thread.currentThread;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.channels.ChannelInputStream;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.IoFuture;
+import static org.jboss.xnio.IoUtils.safeClose;
+import static org.jboss.xnio.IoUtils.nullHandler;
+
+/**
+ * A handler factory for automatic forwarding of output streams.
+ */
+public final class OutputStreamHandlerFactory implements StreamHandlerFactory<OutputStream, StreamChannel> {
+
+    /** {@inheritDoc} */
+    public StreamHandler<OutputStream, StreamChannel> createStreamHandler(final OutputStream localInstance, final StreamContext streamContext) throws IOException {
+        return new Handler(localInstance);
+    }
+
+    private static final class Handler implements StreamHandler<OutputStream, StreamChannel> {
+
+        private static final long serialVersionUID = 3147719591239403750L;
+
+        private transient final OutputStream localInstance;
+
+        private Handler(final OutputStream instance) {
+            localInstance = instance;
+        }
+
+        public IoHandler<StreamChannel> getLocalHandler() {
+            return new LocalHandler(localInstance);
+        }
+
+        public IoHandler<StreamChannel> getRemoteHandler() {
+            return nullHandler();
+        }
+
+        public OutputStream getRemoteProxy(final IoFuture<? extends StreamChannel> futureChannel) {
+            return new ProxyOutputStream(futureChannel);
+        }
+    }
+
+    private static final class LocalHandler implements IoHandler<StreamChannel> {
+        private final OutputStream localInstance;
+        private final byte[] bytes = new byte[1024];
+
+        private LocalHandler(final OutputStream instance) {
+            localInstance = instance;
+        }
+
+        public void handleOpened(final StreamChannel channel) {
+        }
+
+        public void handleClosed(final StreamChannel channel) {
+            safeClose(localInstance);
+        }
+
+        public void handleReadable(final StreamChannel channel) {
+            final byte[] bytes = this.bytes;
+            final ByteBuffer buffer = ByteBuffer.wrap(bytes);
+            try {
+                for (;;) {
+                    final int res = channel.read(buffer);
+                    if (res == 0) {
+                        channel.resumeReads();
+                        return;
+                    }
+                    localInstance.write(bytes, 0, buffer.position());
+                    buffer.clear();
+                }
+            } catch (IOException e) {
+                safeClose(channel);
+            }
+        }
+
+        public void handleWritable(final StreamChannel channel) {
+        }
+    }
+
+    private static final class ProxyOutputStream extends OutputStream {
+        private final ByteBuffer buffer = ByteBuffer.allocate(1024);
+        private final IoFuture<? extends StreamChannel> futureChannel;
+        private final Lock lock = new ReentrantLock();
+        private boolean open = true;
+
+        private ProxyOutputStream(final IoFuture<? extends StreamChannel> channel) {
+            futureChannel = channel;
+        }
+
+        public void write(final int b) throws IOException {
+            final Lock lock = this.lock;
+            try {
+                lock.lockInterruptibly();
+                try {
+                    checkOpen();
+                    final ByteBuffer buffer = this.buffer;
+                    buffer.put((byte) b);
+                    if (! buffer.hasRemaining()) {
+                        flush();
+                    }
+                } finally {
+                    lock.unlock();
+                }
+            } catch (InterruptedException e) {
+                doInterrupted();
+            }
+        }
+
+        public void write(final byte[] b, int off, int len) throws IOException {
+            final Lock lock = this.lock;
+            try {
+                lock.lockInterruptibly();
+                try {
+                    checkOpen();
+                    final ByteBuffer buffer = this.buffer;
+                    while (len > 0) {
+                        final int cnt = min(len, buffer.remaining());
+                        buffer.put(b, off, cnt);
+                        off += cnt;
+                        len -= cnt;
+                        if (! buffer.hasRemaining()) {
+                            flush();
+                        }
+                    }
+                } finally {
+                    lock.unlock();
+                }
+            } catch (InterruptedException e) {
+                doInterrupted();
+            }
+        }
+
+        public void flush() throws IOException {
+            final Lock lock = this.lock;
+            try {
+                lock.lockInterruptibly();
+                try {
+                    checkOpen();
+                    final StreamChannel channel = futureChannel.get();
+                    final ByteBuffer buffer = this.buffer;
+                    buffer.flip();
+                    while (buffer.hasRemaining()) {
+                        if (channel.write(buffer) == 0) {
+                            channel.awaitWritable();
+                        }
+                    }
+                } finally {
+                    lock.unlock();
+                }
+            } catch (InterruptedException e) {
+                doInterrupted();
+            }
+        }
+
+        public void close() throws IOException {
+            final Lock lock = this.lock;
+            lock.lock();
+            try {
+                if (! open) {
+                    return;
+                }
+                final StreamChannel channel = futureChannel.get();
+                try {
+                    flush();
+                    channel.shutdownWrites();
+                    final ChannelInputStream is = new ChannelInputStream(channel);
+                    int b = is.read();
+                    switch (b) {
+                        case -1: throw new IOException("Stream outcome unknown");
+                        case 0: {
+                            final InputStreamReader reader = new InputStreamReader(is, "UTF-8");
+                            final StringBuilder builder = new StringBuilder("Remote failure: ");
+                            do {
+                                b = reader.read();
+                                if (b != -1) {
+                                    builder.append(b);
+                                }
+                            } while (b != -1);
+                            throw new IOException(builder.toString());
+                        }
+                        case 1: return;
+                        default: throw new IOException("Unknown response from remote host");
+                    }
+                } finally {
+                    safeClose(channel);
+                }
+            } finally {
+                open = false;
+                lock.unlock();
+            }
+        }
+
+        private void doInterrupted() throws InterruptedIOException {
+            currentThread().interrupt();
+            throw new InterruptedIOException("I/O operation interrupted");
+        }
+
+        private void checkOpen() throws IOException {
+            if (! open) {
+                throw new IOException("Write to closed stream");
+            }
+        }
+    }
+}

Added: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamContext.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamContext.java	                        (rev 0)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamContext.java	2009-03-05 01:28:47 UTC (rev 4867)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import java.io.Serializable;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public interface StreamContext extends Executor, Serializable {
+    Marshaller createMarshaller();
+
+    Unmarshaller createUnmarshaller();
+}

Added: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandler.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandler.java	                        (rev 0)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandler.java	2009-03-05 01:28:47 UTC (rev 4867)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.IoFuture;
+import java.nio.channels.Channel;
+import java.io.Serializable;
+
+/**
+ * A stream handler for an individual object instance.  Instances of this class are used on both
+ * the local and remote side.  Stream handlers are non-reentrant; in other words, it is an error
+ * for a stream handler to have a stream type as one of its serializable fields or for a stream handler's
+ * {@code writeObject} method (if any) to write a stream type.
+ */
+public interface StreamHandler<T, C extends Channel> extends Serializable {
+
+    /**
+     * Get the local XNIO handler for this stream.  If this handler is cached on the object, it should be
+     * done in a {@code transient} fashion to prevent the local handler from being sent to the remote side.
+     *
+     * @return the local XNIO handler
+     */
+    IoHandler<C> getLocalHandler();
+
+    /**
+     * Get the remote XNIO handler for this stream.  The remote handler should not be instantiated until the
+     * {@code StreamHandler} instance is on the remote side to avoid copying the handler across the wire.
+     *
+     * @return the remote XNIO handler
+     */
+    IoHandler<C> getRemoteHandler();
+
+    /**
+     * Get the remote proxy instance for this stream.  The remote proxy should not be instantiated until the
+     * {@code StreamHandler} instance is on the remote side to avoid copying the proxy instance across the wire.
+     * This method will be called after {@link #getRemoteHandler()}.
+     *
+     * @param futureChannel the future channel
+     * @return the remote proxy instance
+     */
+    T getRemoteProxy(final IoFuture<? extends C> futureChannel);
+}

Added: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandlerFactory.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandlerFactory.java	                        (rev 0)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandlerFactory.java	2009-03-05 01:28:47 UTC (rev 4867)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import java.nio.channels.Channel;
+import java.io.IOException;
+
+/**
+ * A stream handler factory.  Produces stream handler instances for the given object, which in turn uses a specified
+ * channel type to stream the data.
+ */
+public interface StreamHandlerFactory<T, C extends Channel> {
+
+    /**
+     * Create a stream handler instance for a local object.
+     *
+     * @param localInstance the local instance
+     * @param streamContext the stream context
+     * @return the stream handler for this type
+     * @throws IOException if an error occurs
+     */
+    StreamHandler<T, C> createStreamHandler(T localInstance, StreamContext streamContext) throws IOException;
+}




More information about the jboss-remoting-commits mailing list