Author: david.lloyd(a)jboss.com
Date: 2009-03-04 20:28:47 -0500 (Wed, 04 Mar 2009)
New Revision: 4867
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/InputStreamHandlerFactory.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/OutputStreamHandlerFactory.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamContext.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandler.java
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandlerFactory.java
Log:
Stream SPI...?
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/InputStreamHandlerFactory.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/InputStreamHandlerFactory.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/InputStreamHandlerFactory.java 2009-03-05
01:28:47 UTC (rev 4867)
@@ -0,0 +1,158 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * A handler factory for automatic forwarding of input streams.
+ */
+public final class InputStreamHandlerFactory implements
StreamHandlerFactory<InputStream, StreamChannel> {
+
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.stream.inputstream");
+
+ /** {@inheritDoc} */
+ public StreamHandler<InputStream, StreamChannel> createStreamHandler(final
InputStream localInstance, final StreamContext streamContext) throws IOException {
+ return new Handler(localInstance, streamContext);
+ }
+
+ private static class Handler implements StreamHandler<InputStream,
StreamChannel> {
+
+ private static final long serialVersionUID = 731898100063706343L;
+
+ private final StreamContext streamContext;
+ private transient final InputStream localInstance;
+
+ private Handler(final InputStream instance, final StreamContext context) {
+ localInstance = instance;
+ streamContext = context;
+ }
+
+ public IoHandler<StreamChannel> getLocalHandler() {
+ return new IoHandler<StreamChannel>() {
+ public void handleOpened(final StreamChannel channel) {
+ streamContext.execute(new LocalRunnable(channel, localInstance));
+ }
+
+ public void handleClosed(final StreamChannel channel) {
+ IoUtils.safeClose(localInstance);
+ }
+
+ public void handleReadable(final StreamChannel channel) {
+ }
+
+ public void handleWritable(final StreamChannel channel) {
+ }
+ };
+ }
+
+ public IoHandler<StreamChannel> getRemoteHandler() {
+ return IoUtils.nullHandler();
+ }
+
+ public InputStream getRemoteProxy(final IoFuture<? extends StreamChannel>
futureChannel) {
+ return new ProxyInputStream(futureChannel);
+ }
+ }
+
+ private static class ProxyInputStream extends InputStream {
+
+ private final IoFuture<? extends StreamChannel> futureChannel;
+
+ public ProxyInputStream(final IoFuture<? extends StreamChannel>
futureChannel) {
+ this.futureChannel = futureChannel;
+ }
+
+ public int read() throws IOException {
+ return 0;
+ }
+
+ public int read(final byte[] b, final int off, final int len) throws IOException
{
+ final StreamChannel channel = futureChannel.get();
+ int res;
+ do {
+ res = channel.read(ByteBuffer.wrap(b, off, len));
+ if (res == 0) {
+ channel.awaitReadable();
+ }
+ } while (res == 0);
+ return res;
+ }
+
+ public void close() throws IOException {
+ futureChannel.get().close();
+ }
+ }
+
+ private static class LocalRunnable implements Runnable {
+
+ private static final int MIN_BUFFER_SIZE = 512;
+ private static final int MAX_BUFFER_SIZE = 1024;
+
+ private final StreamChannel channel;
+ private final InputStream localInstance;
+
+ public LocalRunnable(final StreamChannel channel, final InputStream instance) {
+ this.channel = channel;
+ localInstance = instance;
+ }
+
+ public void run() {
+ try {
+ final byte[] bytes = new byte[MAX_BUFFER_SIZE];
+ final ByteBuffer buf = ByteBuffer.wrap(bytes);
+ for (;;) {
+ int cnt = 0, pos;
+ while ((pos = buf.position()) < MIN_BUFFER_SIZE) {
+ cnt = localInstance.read(bytes, pos, buf.remaining());
+ if (cnt == -1) {
+ break;
+ }
+ }
+ buf.flip();
+ int wcnt;
+ do {
+ wcnt = channel.write(buf);
+ if (wcnt == 0) {
+ channel.awaitWritable();
+ }
+ } while (buf.hasRemaining());
+ if (cnt == -1) {
+ channel.close();
+ }
+ buf.clear();
+ }
+ } catch (IOException e) {
+ log.error(e, "Failed to read input stream data");
+ IoUtils.safeClose(channel);
+ }
+ }
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/OutputStreamHandlerFactory.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/OutputStreamHandlerFactory.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/OutputStreamHandlerFactory.java 2009-03-05
01:28:47 UTC (rev 4867)
@@ -0,0 +1,235 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import static java.lang.Math.min;
+import static java.lang.Thread.currentThread;
+import org.jboss.xnio.channels.StreamChannel;
+import org.jboss.xnio.channels.ChannelInputStream;
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.IoFuture;
+import static org.jboss.xnio.IoUtils.safeClose;
+import static org.jboss.xnio.IoUtils.nullHandler;
+
+/**
+ * A handler factory for automatic forwarding of output streams.
+ */
+public final class OutputStreamHandlerFactory implements
StreamHandlerFactory<OutputStream, StreamChannel> {
+
+ /** {@inheritDoc} */
+ public StreamHandler<OutputStream, StreamChannel> createStreamHandler(final
OutputStream localInstance, final StreamContext streamContext) throws IOException {
+ return new Handler(localInstance);
+ }
+
+ private static final class Handler implements StreamHandler<OutputStream,
StreamChannel> {
+
+ private static final long serialVersionUID = 3147719591239403750L;
+
+ private transient final OutputStream localInstance;
+
+ private Handler(final OutputStream instance) {
+ localInstance = instance;
+ }
+
+ public IoHandler<StreamChannel> getLocalHandler() {
+ return new LocalHandler(localInstance);
+ }
+
+ public IoHandler<StreamChannel> getRemoteHandler() {
+ return nullHandler();
+ }
+
+ public OutputStream getRemoteProxy(final IoFuture<? extends StreamChannel>
futureChannel) {
+ return new ProxyOutputStream(futureChannel);
+ }
+ }
+
+ private static final class LocalHandler implements IoHandler<StreamChannel> {
+ private final OutputStream localInstance;
+ private final byte[] bytes = new byte[1024];
+
+ private LocalHandler(final OutputStream instance) {
+ localInstance = instance;
+ }
+
+ public void handleOpened(final StreamChannel channel) {
+ }
+
+ public void handleClosed(final StreamChannel channel) {
+ safeClose(localInstance);
+ }
+
+ public void handleReadable(final StreamChannel channel) {
+ final byte[] bytes = this.bytes;
+ final ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ try {
+ for (;;) {
+ final int res = channel.read(buffer);
+ if (res == 0) {
+ channel.resumeReads();
+ return;
+ }
+ localInstance.write(bytes, 0, buffer.position());
+ buffer.clear();
+ }
+ } catch (IOException e) {
+ safeClose(channel);
+ }
+ }
+
+ public void handleWritable(final StreamChannel channel) {
+ }
+ }
+
+ private static final class ProxyOutputStream extends OutputStream {
+ private final ByteBuffer buffer = ByteBuffer.allocate(1024);
+ private final IoFuture<? extends StreamChannel> futureChannel;
+ private final Lock lock = new ReentrantLock();
+ private boolean open = true;
+
+ private ProxyOutputStream(final IoFuture<? extends StreamChannel> channel)
{
+ futureChannel = channel;
+ }
+
+ public void write(final int b) throws IOException {
+ final Lock lock = this.lock;
+ try {
+ lock.lockInterruptibly();
+ try {
+ checkOpen();
+ final ByteBuffer buffer = this.buffer;
+ buffer.put((byte) b);
+ if (! buffer.hasRemaining()) {
+ flush();
+ }
+ } finally {
+ lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ doInterrupted();
+ }
+ }
+
+ public void write(final byte[] b, int off, int len) throws IOException {
+ final Lock lock = this.lock;
+ try {
+ lock.lockInterruptibly();
+ try {
+ checkOpen();
+ final ByteBuffer buffer = this.buffer;
+ while (len > 0) {
+ final int cnt = min(len, buffer.remaining());
+ buffer.put(b, off, cnt);
+ off += cnt;
+ len -= cnt;
+ if (! buffer.hasRemaining()) {
+ flush();
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ doInterrupted();
+ }
+ }
+
+ public void flush() throws IOException {
+ final Lock lock = this.lock;
+ try {
+ lock.lockInterruptibly();
+ try {
+ checkOpen();
+ final StreamChannel channel = futureChannel.get();
+ final ByteBuffer buffer = this.buffer;
+ buffer.flip();
+ while (buffer.hasRemaining()) {
+ if (channel.write(buffer) == 0) {
+ channel.awaitWritable();
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ doInterrupted();
+ }
+ }
+
+ public void close() throws IOException {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ if (! open) {
+ return;
+ }
+ final StreamChannel channel = futureChannel.get();
+ try {
+ flush();
+ channel.shutdownWrites();
+ final ChannelInputStream is = new ChannelInputStream(channel);
+ int b = is.read();
+ switch (b) {
+ case -1: throw new IOException("Stream outcome
unknown");
+ case 0: {
+ final InputStreamReader reader = new InputStreamReader(is,
"UTF-8");
+ final StringBuilder builder = new StringBuilder("Remote
failure: ");
+ do {
+ b = reader.read();
+ if (b != -1) {
+ builder.append(b);
+ }
+ } while (b != -1);
+ throw new IOException(builder.toString());
+ }
+ case 1: return;
+ default: throw new IOException("Unknown response from remote
host");
+ }
+ } finally {
+ safeClose(channel);
+ }
+ } finally {
+ open = false;
+ lock.unlock();
+ }
+ }
+
+ private void doInterrupted() throws InterruptedIOException {
+ currentThread().interrupt();
+ throw new InterruptedIOException("I/O operation interrupted");
+ }
+
+ private void checkOpen() throws IOException {
+ if (! open) {
+ throw new IOException("Write to closed stream");
+ }
+ }
+ }
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamContext.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamContext.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamContext.java 2009-03-05
01:28:47 UTC (rev 4867)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import java.io.Serializable;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public interface StreamContext extends Executor, Serializable {
+ Marshaller createMarshaller();
+
+ Unmarshaller createUnmarshaller();
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandler.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandler.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandler.java 2009-03-05
01:28:47 UTC (rev 4867)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import org.jboss.xnio.IoHandler;
+import org.jboss.xnio.IoFuture;
+import java.nio.channels.Channel;
+import java.io.Serializable;
+
+/**
+ * A stream handler for an individual object instance. Instances of this class are used
on both
+ * the local and remote side. Stream handlers are non-reentrant; in other words, it is
an error
+ * for a stream handler to have a stream type as one of its serializable fields or for a
stream handler's
+ * {@code writeObject} method (if any) to write a stream type.
+ */
+public interface StreamHandler<T, C extends Channel> extends Serializable {
+
+ /**
+ * Get the local XNIO handler for this stream. If this handler is cached on the
object, it should be
+ * done in a {@code transient} fashion to prevent the local handler from being sent
to the remote side.
+ *
+ * @return the local XNIO handler
+ */
+ IoHandler<C> getLocalHandler();
+
+ /**
+ * Get the remote XNIO handler for this stream. The remote handler should not be
instantiated until the
+ * {@code StreamHandler} instance is on the remote side to avoid copying the handler
across the wire.
+ *
+ * @return the remote XNIO handler
+ */
+ IoHandler<C> getRemoteHandler();
+
+ /**
+ * Get the remote proxy instance for this stream. The remote proxy should not be
instantiated until the
+ * {@code StreamHandler} instance is on the remote side to avoid copying the proxy
instance across the wire.
+ * This method will be called after {@link #getRemoteHandler()}.
+ *
+ * @param futureChannel the future channel
+ * @return the remote proxy instance
+ */
+ T getRemoteProxy(final IoFuture<? extends C> futureChannel);
+}
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandlerFactory.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandlerFactory.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/StreamHandlerFactory.java 2009-03-05
01:28:47 UTC (rev 4867)
@@ -0,0 +1,43 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import java.nio.channels.Channel;
+import java.io.IOException;
+
+/**
+ * A stream handler factory. Produces stream handler instances for the given object,
which in turn uses a specified
+ * channel type to stream the data.
+ */
+public interface StreamHandlerFactory<T, C extends Channel> {
+
+ /**
+ * Create a stream handler instance for a local object.
+ *
+ * @param localInstance the local instance
+ * @param streamContext the stream context
+ * @return the stream handler for this type
+ * @throws IOException if an error occurs
+ */
+ StreamHandler<T, C> createStreamHandler(T localInstance, StreamContext
streamContext) throws IOException;
+}