[jboss-remoting-commits] JBoss Remoting SVN: r4341 - in remoting3/trunk/api/src/main/java/org/jboss/cx/remoting: spi and 2 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Wed Jul 2 18:54:29 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-07-02 18:54:29 -0400 (Wed, 02 Jul 2008)
New Revision: 4341

Added:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferInputStream.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferOutputStream.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferReader.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferWriter.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
Log:
New, improved, slimmed-down, simplified, more flexible protocol SPI

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java	2008-07-01 14:45:17 UTC (rev 4340)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -6,7 +6,6 @@
  * @param <T> the type that is passed to the close handler
  */
 public interface Closeable<T> extends java.io.Closeable {
-    // TODO - do we need an async close method?
 
     /**
      * Close, waiting for any outstanding processing to finish.

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-07-01 14:45:17 UTC (rev 4340)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -3,6 +3,8 @@
 import java.net.URI;
 import java.util.concurrent.ConcurrentMap;
 import org.jboss.cx.remoting.util.AttributeMap;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
 
 /**
  * A potential participant in a JBoss Remoting communications relationship.
@@ -44,7 +46,7 @@
     String getName();
 
     /**
-     * Create a client that can be used to invoke a request listener on this endpoint.  The client may be passed to a
+     * Create a client endpoint that can be used to receive incoming requests on this endpoint.  The client may be passed to a
      * remote endpoint as part of a request or a reply, or it may be used locally.
      *
      * You must have the TODO permission to invoke this method.
@@ -54,7 +56,7 @@
      * @param requestListener the request listener
      * @return the client
      */
-    <I, O> Client<I, O> createClient(RequestListener<I, O> requestListener);
+    <I, O> RemoteClientEndpoint<I, O> createClient(RequestListener<I, O> requestListener);
 
     /**
      * Create a client source that can be used to acquire clients associated with a request listener on this endpoint.
@@ -68,7 +70,7 @@
      * @param requestListener the request listener
      * @return the context source
      */
-    <I, O> ClientSource<I, O> createService(RequestListener<I, O> requestListener);
+    <I, O> RemoteServiceEndpoint<I, O> createService(RequestListener<I, O> requestListener);
 
     /**
      * Add a listener that is notified when a session is created.

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -0,0 +1,54 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+
+/**
+ * A handle to a local resource.
+ */
+public interface Handle<T> extends Closeable<Handle<T>> {
+
+    /**
+     * Get the resource.
+     *
+     * @return the resource
+     */
+    T getResource();
+
+    /**
+     * Close this reference.
+     *
+     * @throws RemotingException if the close failed
+     */
+    void close() throws RemotingException;
+
+    /**
+     * Add a handler that is invoked when this handle is closed.
+     *
+     * @param handler the handler
+     */
+    void addCloseHandler(final CloseHandler<Handle<T>> handler);
+}

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.Client;
+
+/**
+ * A remote client endpoint, which can be passed to remote endpoints.  Remote systems can then use the client endpoint
+ * to make invocations, or they may pass the client endpoint on to other remote systems.
+ */
+public interface RemoteClientEndpoint<I, O> extends Closeable<RemoteClientEndpoint<I, O>> {
+
+    /**
+     * Receive a request from a remote system.  This method is intended to be called by protocol handlers.  If the
+     * request cannot be accepted for some reason, the
+     * {@link org.jboss.cx.remoting.spi.remote.ReplyHandler#handleException(Throwable)}
+     * method is called immediately.
+     *
+     * @param request the request
+     * @param replyHandler a handler for the reply
+     * @return a context which may be used to cancel the request
+     */
+    RemoteRequestContext receiveRequest(I request, ReplyHandler<O> replyHandler);
+
+    /**
+     * Get a handle to this client endpoint.  The client endpoint will not auto-close as long as there is at least
+     * one open handle or local client instance.  If a handle is "leaked", it will be closed
+     * automatically if/when the garbage collector invokes its {@link Object#finalize()} method, with a log message
+     * warning of the leak.
+     *
+     * @return the handle
+     * @throws RemotingException if a handle could not be acquired
+     */
+    Handle<RemoteClientEndpoint<I, O>> getHandle() throws RemotingException;
+
+    /**
+     * Get a local client which can be used to make invocations.
+     *
+     * @return the client
+     * @throws RemotingException if a client could not be acquired
+     */
+    Client<I, O> getClient() throws RemotingException;
+
+    /**
+     * Automatically close this client endpoint when all handles and local client instances are closed.
+     */
+    void autoClose();
+
+    /**
+     * Close this client endpoint.  The outcome of any outstanding requests is not defined, though implementations
+     * should make an effort to cancel any outstanding requests.
+     *
+     * @throws RemotingException if the client endpoint could not be closed
+     */
+    void close() throws RemotingException;
+
+    /**
+     * Add a handler that is called when the client endpoint is closed.
+     *
+     * @param handler the handler to be called
+     */
+    void addCloseHandler(final CloseHandler<RemoteClientEndpoint<I, O>> handler);
+}

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+/**
+ * The context of an outstanding remote request.  This instance should be discarded when a reply (of any sort)
+ * is received for the request.
+ */
+public interface RemoteRequestContext {
+
+    /**
+     * Signal that the request should be cancelled, if possible.
+     */
+    void cancel();
+}

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.ClientSource;
+
+/**
+ * A remote service endpoint, which can be passed to remote endpoints.  Remote systems can then use the service endpoint
+ * to acquire client endpoints, or they may pass it on to other systems.
+ */
+public interface RemoteServiceEndpoint<I, O> extends Closeable<RemoteServiceEndpoint<I, O>> {
+
+    /**
+     * Create a client endpoint for the service corresponding to this service endpoint.
+     *
+     * @return a client endpoint
+     * @throws RemotingException if a client could not be opened
+     */
+    RemoteClientEndpoint<I, O> openClient() throws RemotingException;
+
+    /**
+     * Get a handle to this service endpoint.  The service endpoint will not auto-close as long as there is at least
+     * one open handle,remote client endpoint, or client source.  If a handle is "leaked", it will be closed
+     * automatically if/when the garbage collector invokes its {@link Object#finalize()} method, with a log message
+     * warning of the leak.
+     *
+     * @return the handle
+     * @throws RemotingException if a handle could not be acquired
+     */
+    Handle<RemoteServiceEndpoint<I, O>> getHandle() throws RemotingException;
+
+    /**
+     * Get a local client source which can be used to access this service.
+     *
+     * @return the client source
+     */
+    ClientSource<I, O> getClientSource() throws RemotingException;
+
+    /**
+     * Automatically close this service endpoint when all handles and local client source instances
+     * are closed.
+     */
+    void autoClose();
+
+    /**
+     * Close this service endpoint immediately.
+     */
+    void close() throws RemotingException;
+
+    /**
+     * Add a handler that is called when the service endpoint is closed.
+     *
+     * @param handler the handler to be called
+     */
+    void addCloseHandler(final CloseHandler<RemoteServiceEndpoint<I, O>> handler);
+}

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+/**
+ * A handler for replies from a request.  The handler should respect the first invocation made on it, and ignore
+ * any subsequent invocations.
+ */
+public interface ReplyHandler<O> {
+
+    /**
+     * Handle a successful reply.
+     *
+     * @param reply the reply
+     */
+    void handleReply(O reply);
+
+    /**
+     * Handle a remote exception.
+     *
+     * @param cause the cause
+     */
+    void handleException(Throwable cause);
+
+    /**
+     * Handle a cancellation request.
+     */
+    void handleCancellation();
+}

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferInputStream.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferInputStream.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferInputStream.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -0,0 +1,121 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.stream;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+
+/**
+ * An input stream that reads from byte buffers.  Instances of this class are not safe to use concurrently from
+ * multiple threads.
+ */
+public final class ByteBufferInputStream extends InputStream {
+    private final ObjectSource<ByteBuffer> bufferSource;
+    private final BufferAllocator<ByteBuffer> allocator;
+
+    private boolean closed;
+    private ByteBuffer current;
+
+    public ByteBufferInputStream(final ObjectSource<ByteBuffer> bufferSource, final BufferAllocator<ByteBuffer> allocator) {
+        this.bufferSource = bufferSource;
+        this.allocator = allocator;
+    }
+
+    public int read() throws IOException {
+        if (closed) {
+            return -1;
+        }
+        ByteBuffer buffer = getBuffer();
+        if (buffer == null) {
+            return -1;
+        }
+        try {
+            return buffer.get() & 0xff;
+        } finally {
+            if (! buffer.hasRemaining()) {
+                current = null;
+                allocator.free(buffer);
+            }
+        }
+    }
+
+    public int read(final byte[] b, int off, int len) throws IOException {
+        if (closed) {
+            return -1;
+        }
+        int t = 0;
+        while (len > 0) {
+            ByteBuffer buffer = getBuffer();
+            if (buffer == null) {
+                return t == 0 ? -1 : t;
+            }
+            final int rem = Math.min(len, buffer.remaining());
+            if (rem > 0) {
+                buffer.get(b, off, rem);
+                off += rem;
+                len -= rem;
+                t += rem;
+            }
+            if (! buffer.hasRemaining()) {
+                current = null;
+                allocator.free(buffer);
+            }
+        }
+        return t;
+    }
+
+    public int available() throws IOException {
+        final ByteBuffer buffer = current;
+        return (buffer == null ? 0 : buffer.remaining());
+    }
+
+    public void close() throws IOException {
+        try {
+            final ByteBuffer buffer = current;
+            current = null;
+            allocator.free(buffer);
+            bufferSource.close();
+        } finally {
+            closed = true;
+            IoUtils.safeClose(bufferSource);
+        }
+    }
+
+    private ByteBuffer getBuffer() throws IOException {
+        final ByteBuffer buffer = current;
+        if (buffer == null) {
+            if (bufferSource.hasNext()) {
+                final ByteBuffer newBuffer = bufferSource.next();
+                current = newBuffer;
+                return newBuffer;
+            } else {
+                return null;
+            }
+        } else {
+            return buffer;
+        }
+    }
+}

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferOutputStream.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferOutputStream.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferOutputStream.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -0,0 +1,132 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.stream;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Buffers;
+
+/**
+ * An output stream that writes to buffers.  Instances of this class are not normally safe to use from multiple threads
+ * concurrently.
+ */
+public final class ByteBufferOutputStream extends OutputStream {
+    private final ObjectSink<ByteBuffer> bufferSink;
+    private final BufferAllocator<ByteBuffer> allocator;
+
+    private ByteBuffer current;
+    private boolean closed;
+
+    /**
+     * Construct a new stream instance.
+     *
+     * @param bufferSink the buffer sink to which full buffers will be written
+     * @param allocator the allocator from which empty buffers will be allocated
+     */
+    public ByteBufferOutputStream(final ObjectSink<ByteBuffer> bufferSink, final BufferAllocator<ByteBuffer> allocator) {
+        this.bufferSink = bufferSink;
+        this.allocator = allocator;
+    }
+
+    private ByteBuffer getBuffer() throws IOException {
+        final ByteBuffer buffer = current;
+        if (buffer == null) {
+            ByteBuffer newbuf = allocator.allocate();
+            if (newbuf == null) {
+                throw new IOException("No buffers available");
+            }
+            current = newbuf;
+            return newbuf;
+        } else {
+            return buffer;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void write(final int b) throws IOException {
+        if (closed) {
+            throw new IOException("Write to closed outputstream");
+        }
+        final ByteBuffer buffer = getBuffer();
+        buffer.put((byte)b);
+        if (! buffer.hasRemaining()) {
+            localFlush();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void write(final byte[] b, int off, int len) throws IOException {
+        if (closed) {
+            throw new IOException("Write to closed outputstream");
+        }
+        do {
+            final ByteBuffer buffer = getBuffer();
+            final int rem = Math.min(len, buffer.remaining());
+            buffer.put(b, off, rem);
+            if (! buffer.hasRemaining()) {
+                localFlush();
+            }
+            len -= rem; off += rem;
+        } while (len > 0);
+    }
+
+    private void localFlush() throws IOException {
+        if (closed) {
+            throw new IOException("Flush on closed outputstream");
+        }
+        final ByteBuffer buffer = current;
+        if (buffer != null) try {
+            bufferSink.accept(Buffers.flip(buffer));
+        } finally {
+            current = null;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void flush() throws IOException {
+        localFlush();
+        bufferSink.flush();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void close() throws IOException {
+        if (! closed) try {
+            flush();
+            bufferSink.close();
+        } finally {
+            closed = true;
+            IoUtils.safeClose(bufferSink);
+        }
+    }
+}

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferReader.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferReader.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferReader.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -0,0 +1,161 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.stream;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.CharBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+
+/**
+ * A reader that reads from char buffers.  Instances of this class are not safe to use concurrently from
+ * multiple threads.
+ */
+public final class CharBufferReader extends Reader {
+    private final ObjectSource<CharBuffer> bufferSource;
+    private final BufferAllocator<CharBuffer> allocator;
+
+    private boolean closed;
+    private CharBuffer current;
+
+    public CharBufferReader(final ObjectSource<CharBuffer> bufferSource, final BufferAllocator<CharBuffer> allocator) {
+        this.bufferSource = bufferSource;
+        this.allocator = allocator;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public int read() throws IOException {
+        if (closed) {
+            return -1;
+        }
+        CharBuffer buffer = getBuffer();
+        if (buffer == null) {
+            return -1;
+        }
+        try {
+            return buffer.get() & 0xff;
+        } finally {
+            if (! buffer.hasRemaining()) {
+                current = null;
+                allocator.free(buffer);
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public int read(final char[] cbuf, int off, int len) throws IOException {
+        if (closed) {
+            return -1;
+        }
+        int t = 0;
+        while (len > 0) {
+            CharBuffer buffer = getBuffer();
+            if (buffer == null) {
+                return t == 0 ? -1 : t;
+            }
+            final int rem = Math.min(len, buffer.remaining());
+            if (rem > 0) {
+                buffer.get(cbuf, off, rem);
+                off += rem;
+                len -= rem;
+                t += rem;
+            }
+            if (! buffer.hasRemaining()) {
+                current = null;
+                allocator.free(buffer);
+            }
+        }
+        return t;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void close() throws IOException {
+        try {
+            final CharBuffer buffer = current;
+            current = null;
+            allocator.free(buffer);
+            bufferSource.close();
+        } finally {
+            closed = true;
+            IoUtils.safeClose(bufferSource);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public int read(final CharBuffer target) throws IOException {
+        if (closed) {
+            return -1;
+        }
+        int t = 0;
+        int len = target.remaining();
+        while (len > 0) {
+            CharBuffer buffer = getBuffer();
+            if (buffer == null) {
+                return t == 0 ? -1 : t;
+            }
+            final int rem = Math.min(len, buffer.remaining());
+            if (rem > 0) {
+                buffer.read(target);
+                len -= rem;
+                t += rem;
+            }
+            if (! buffer.hasRemaining()) {
+                current = null;
+                allocator.free(buffer);
+            }
+        }
+        return t;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public boolean ready() throws IOException {
+        final CharBuffer buffer = current;
+        return buffer != null && buffer.hasRemaining();
+    }
+
+    private CharBuffer getBuffer() throws IOException {
+        final CharBuffer buffer = current;
+        if (buffer == null) {
+            if (bufferSource.hasNext()) {
+                final CharBuffer newBuffer = bufferSource.next();
+                current = newBuffer;
+                return newBuffer;
+            } else {
+                return null;
+            }
+        } else {
+            return buffer;
+        }
+    }
+}
\ No newline at end of file

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferWriter.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferWriter.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferWriter.java	2008-07-02 22:54:29 UTC (rev 4341)
@@ -0,0 +1,150 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.stream;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.nio.CharBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Buffers;
+
+/**
+ * A writer that writes to buffers.  Instances of this class are not normally safe to use from multiple threads
+ * concurrently.
+ */
+public final class CharBufferWriter extends Writer {
+    private final ObjectSink<CharBuffer> bufferSink;
+    private final BufferAllocator<CharBuffer> allocator;
+
+    private CharBuffer current;
+    private boolean closed;
+
+    /**
+     * Construct a new stream instance.
+     *
+     * @param bufferSink the buffer sink to which full buffers will be written
+     * @param allocator the allocator from which empty buffers will be allocated
+     */
+    public CharBufferWriter(final ObjectSink<CharBuffer> bufferSink, final BufferAllocator<CharBuffer> allocator) {
+        this.bufferSink = bufferSink;
+        this.allocator = allocator;
+    }
+
+    private CharBuffer getBuffer() throws IOException {
+        final CharBuffer buffer = current;
+        if (buffer == null) {
+            CharBuffer newbuf = allocator.allocate();
+            if (newbuf == null) {
+                throw new IOException("No buffers available");
+            }
+            current = newbuf;
+            return newbuf;
+        } else {
+            return buffer;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void write(final int b) throws IOException {
+        if (closed) {
+            throw new IOException("Write to closed writer");
+        }
+        final CharBuffer buffer = getBuffer();
+        buffer.put((char)b);
+        if (! buffer.hasRemaining()) {
+            localFlush();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void write(final char[] b, int off, int len) throws IOException {
+        if (closed) {
+            throw new IOException("Write to closed writer");
+        }
+        do {
+            final CharBuffer buffer = getBuffer();
+            final int rem = Math.min(len, buffer.remaining());
+            buffer.put(b, off, rem);
+            if (! buffer.hasRemaining()) {
+                localFlush();
+            }
+            len -= rem; off += rem;
+        } while (len > 0);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void write(final String str, int off, int len) throws IOException {
+        if (closed) {
+            throw new IOException("Write to closed writer");
+        }
+        do {
+            final CharBuffer buffer = getBuffer();
+            final int rem = Math.min(len, buffer.remaining());
+            buffer.put(str, off, rem);
+            if (! buffer.hasRemaining()) {
+                localFlush();
+            }
+            len -= rem; off += rem;
+        } while (len > 0);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void flush() throws IOException {
+        localFlush();
+        bufferSink.flush();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void close() throws IOException {
+        if (! closed) try {
+            flush();
+            bufferSink.close();
+        } finally {
+            closed = true;
+            IoUtils.safeClose(bufferSink);
+        }
+    }
+
+    private void localFlush() throws IOException {
+        if (closed) {
+            throw new IOException("Flush on closed writer");
+        }
+        final CharBuffer buffer = current;
+        if (buffer != null) try {
+            bufferSink.accept(Buffers.flip(buffer));
+        } finally {
+            current = null;
+        }
+    }
+}
\ No newline at end of file




More information about the jboss-remoting-commits mailing list