Author: david.lloyd(a)jboss.com
Date: 2008-07-02 18:54:29 -0400 (Wed, 02 Jul 2008)
New Revision: 4341
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferInputStream.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferOutputStream.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferReader.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferWriter.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
Log:
New, improved, slimmed-down, simplified, more flexible protocol SPI
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java 2008-07-01
14:45:17 UTC (rev 4340)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -6,7 +6,6 @@
* @param <T> the type that is passed to the close handler
*/
public interface Closeable<T> extends java.io.Closeable {
- // TODO - do we need an async close method?
/**
* Close, waiting for any outstanding processing to finish.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-01
14:45:17 UTC (rev 4340)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -3,6 +3,8 @@
import java.net.URI;
import java.util.concurrent.ConcurrentMap;
import org.jboss.cx.remoting.util.AttributeMap;
+import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
/**
* A potential participant in a JBoss Remoting communications relationship.
@@ -44,7 +46,7 @@
String getName();
/**
- * Create a client that can be used to invoke a request listener on this endpoint.
The client may be passed to a
+ * Create a client endpoint that can be used to receive incoming requests on this
endpoint. The client may be passed to a
* remote endpoint as part of a request or a reply, or it may be used locally.
*
* You must have the TODO permission to invoke this method.
@@ -54,7 +56,7 @@
* @param requestListener the request listener
* @return the client
*/
- <I, O> Client<I, O> createClient(RequestListener<I, O>
requestListener);
+ <I, O> RemoteClientEndpoint<I, O> createClient(RequestListener<I,
O> requestListener);
/**
* Create a client source that can be used to acquire clients associated with a
request listener on this endpoint.
@@ -68,7 +70,7 @@
* @param requestListener the request listener
* @return the context source
*/
- <I, O> ClientSource<I, O> createService(RequestListener<I, O>
requestListener);
+ <I, O> RemoteServiceEndpoint<I, O> createService(RequestListener<I,
O> requestListener);
/**
* Add a listener that is notified when a session is created.
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -0,0 +1,54 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+
+/**
+ * A handle to a local resource.
+ */
+public interface Handle<T> extends Closeable<Handle<T>> {
+
+ /**
+ * Get the resource.
+ *
+ * @return the resource
+ */
+ T getResource();
+
+ /**
+ * Close this reference.
+ *
+ * @throws RemotingException if the close failed
+ */
+ void close() throws RemotingException;
+
+ /**
+ * Add a handler that is invoked when this handle is closed.
+ *
+ * @param handler the handler
+ */
+ void addCloseHandler(final CloseHandler<Handle<T>> handler);
+}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -0,0 +1,86 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.Client;
+
+/**
+ * A remote client endpoint, which can be passed to remote endpoints. Remote systems can
then use the client endpoint
+ * to make invocations, or they may pass the client endpoint on to other remote systems.
+ */
+public interface RemoteClientEndpoint<I, O> extends
Closeable<RemoteClientEndpoint<I, O>> {
+
+ /**
+ * Receive a request from a remote system. This method is intended to be called by
protocol handlers. If the
+ * request cannot be accepted for some reason, the
+ * {@link org.jboss.cx.remoting.spi.remote.ReplyHandler#handleException(Throwable)}
+ * method is called immediately.
+ *
+ * @param request the request
+ * @param replyHandler a handler for the reply
+ * @return a context which may be used to cancel the request
+ */
+ RemoteRequestContext receiveRequest(I request, ReplyHandler<O> replyHandler);
+
+ /**
+ * Get a handle to this client endpoint. The client endpoint will not auto-close as
long as there is at least
+ * one open handle or local client instance. If a handle is "leaked", it
will be closed
+ * automatically if/when the garbage collector invokes its {@link Object#finalize()}
method, with a log message
+ * warning of the leak.
+ *
+ * @return the handle
+ * @throws RemotingException if a handle could not be acquired
+ */
+ Handle<RemoteClientEndpoint<I, O>> getHandle() throws RemotingException;
+
+ /**
+ * Get a local client which can be used to make invocations.
+ *
+ * @return the client
+ * @throws RemotingException if a client could not be acquired
+ */
+ Client<I, O> getClient() throws RemotingException;
+
+ /**
+ * Automatically close this client endpoint when all handles and local client
instances are closed.
+ */
+ void autoClose();
+
+ /**
+ * Close this client endpoint. The outcome of any outstanding requests is not
defined, though implementations
+ * should make an effort to cancel any outstanding requests.
+ *
+ * @throws RemotingException if the client endpoint could not be closed
+ */
+ void close() throws RemotingException;
+
+ /**
+ * Add a handler that is called when the client endpoint is closed.
+ *
+ * @param handler the handler to be called
+ */
+ void addCloseHandler(final CloseHandler<RemoteClientEndpoint<I, O>>
handler);
+}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteRequestContext.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+/**
+ * The context of an outstanding remote request. This instance should be discarded when
a reply (of any sort)
+ * is received for the request.
+ */
+public interface RemoteRequestContext {
+
+ /**
+ * Signal that the request should be cancelled, if possible.
+ */
+ void cancel();
+}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.ClientSource;
+
+/**
+ * A remote service endpoint, which can be passed to remote endpoints. Remote systems
can then use the service endpoint
+ * to acquire client endpoints, or they may pass it on to other systems.
+ */
+public interface RemoteServiceEndpoint<I, O> extends
Closeable<RemoteServiceEndpoint<I, O>> {
+
+ /**
+ * Create a client endpoint for the service corresponding to this service endpoint.
+ *
+ * @return a client endpoint
+ * @throws RemotingException if a client could not be opened
+ */
+ RemoteClientEndpoint<I, O> openClient() throws RemotingException;
+
+ /**
+ * Get a handle to this service endpoint. The service endpoint will not auto-close
as long as there is at least
+ * one open handle,remote client endpoint, or client source. If a handle is
"leaked", it will be closed
+ * automatically if/when the garbage collector invokes its {@link Object#finalize()}
method, with a log message
+ * warning of the leak.
+ *
+ * @return the handle
+ * @throws RemotingException if a handle could not be acquired
+ */
+ Handle<RemoteServiceEndpoint<I, O>> getHandle() throws
RemotingException;
+
+ /**
+ * Get a local client source which can be used to access this service.
+ *
+ * @return the client source
+ */
+ ClientSource<I, O> getClientSource() throws RemotingException;
+
+ /**
+ * Automatically close this service endpoint when all handles and local client source
instances
+ * are closed.
+ */
+ void autoClose();
+
+ /**
+ * Close this service endpoint immediately.
+ */
+ void close() throws RemotingException;
+
+ /**
+ * Add a handler that is called when the service endpoint is closed.
+ *
+ * @param handler the handler to be called
+ */
+ void addCloseHandler(final CloseHandler<RemoteServiceEndpoint<I, O>>
handler);
+}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -0,0 +1,49 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi.remote;
+
+/**
+ * A handler for replies from a request. The handler should respect the first invocation
made on it, and ignore
+ * any subsequent invocations.
+ */
+public interface ReplyHandler<O> {
+
+ /**
+ * Handle a successful reply.
+ *
+ * @param reply the reply
+ */
+ void handleReply(O reply);
+
+ /**
+ * Handle a remote exception.
+ *
+ * @param cause the cause
+ */
+ void handleException(Throwable cause);
+
+ /**
+ * Handle a cancellation request.
+ */
+ void handleCancellation();
+}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferInputStream.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferInputStream.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferInputStream.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -0,0 +1,121 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.stream;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+
+/**
+ * An input stream that reads from byte buffers. Instances of this class are not safe to
use concurrently from
+ * multiple threads.
+ */
+public final class ByteBufferInputStream extends InputStream {
+ private final ObjectSource<ByteBuffer> bufferSource;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ private boolean closed;
+ private ByteBuffer current;
+
+ public ByteBufferInputStream(final ObjectSource<ByteBuffer> bufferSource, final
BufferAllocator<ByteBuffer> allocator) {
+ this.bufferSource = bufferSource;
+ this.allocator = allocator;
+ }
+
+ public int read() throws IOException {
+ if (closed) {
+ return -1;
+ }
+ ByteBuffer buffer = getBuffer();
+ if (buffer == null) {
+ return -1;
+ }
+ try {
+ return buffer.get() & 0xff;
+ } finally {
+ if (! buffer.hasRemaining()) {
+ current = null;
+ allocator.free(buffer);
+ }
+ }
+ }
+
+ public int read(final byte[] b, int off, int len) throws IOException {
+ if (closed) {
+ return -1;
+ }
+ int t = 0;
+ while (len > 0) {
+ ByteBuffer buffer = getBuffer();
+ if (buffer == null) {
+ return t == 0 ? -1 : t;
+ }
+ final int rem = Math.min(len, buffer.remaining());
+ if (rem > 0) {
+ buffer.get(b, off, rem);
+ off += rem;
+ len -= rem;
+ t += rem;
+ }
+ if (! buffer.hasRemaining()) {
+ current = null;
+ allocator.free(buffer);
+ }
+ }
+ return t;
+ }
+
+ public int available() throws IOException {
+ final ByteBuffer buffer = current;
+ return (buffer == null ? 0 : buffer.remaining());
+ }
+
+ public void close() throws IOException {
+ try {
+ final ByteBuffer buffer = current;
+ current = null;
+ allocator.free(buffer);
+ bufferSource.close();
+ } finally {
+ closed = true;
+ IoUtils.safeClose(bufferSource);
+ }
+ }
+
+ private ByteBuffer getBuffer() throws IOException {
+ final ByteBuffer buffer = current;
+ if (buffer == null) {
+ if (bufferSource.hasNext()) {
+ final ByteBuffer newBuffer = bufferSource.next();
+ current = newBuffer;
+ return newBuffer;
+ } else {
+ return null;
+ }
+ } else {
+ return buffer;
+ }
+ }
+}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferOutputStream.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferOutputStream.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ByteBufferOutputStream.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -0,0 +1,132 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.stream;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Buffers;
+
+/**
+ * An output stream that writes to buffers. Instances of this class are not normally
safe to use from multiple threads
+ * concurrently.
+ */
+public final class ByteBufferOutputStream extends OutputStream {
+ private final ObjectSink<ByteBuffer> bufferSink;
+ private final BufferAllocator<ByteBuffer> allocator;
+
+ private ByteBuffer current;
+ private boolean closed;
+
+ /**
+ * Construct a new stream instance.
+ *
+ * @param bufferSink the buffer sink to which full buffers will be written
+ * @param allocator the allocator from which empty buffers will be allocated
+ */
+ public ByteBufferOutputStream(final ObjectSink<ByteBuffer> bufferSink, final
BufferAllocator<ByteBuffer> allocator) {
+ this.bufferSink = bufferSink;
+ this.allocator = allocator;
+ }
+
+ private ByteBuffer getBuffer() throws IOException {
+ final ByteBuffer buffer = current;
+ if (buffer == null) {
+ ByteBuffer newbuf = allocator.allocate();
+ if (newbuf == null) {
+ throw new IOException("No buffers available");
+ }
+ current = newbuf;
+ return newbuf;
+ } else {
+ return buffer;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void write(final int b) throws IOException {
+ if (closed) {
+ throw new IOException("Write to closed outputstream");
+ }
+ final ByteBuffer buffer = getBuffer();
+ buffer.put((byte)b);
+ if (! buffer.hasRemaining()) {
+ localFlush();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void write(final byte[] b, int off, int len) throws IOException {
+ if (closed) {
+ throw new IOException("Write to closed outputstream");
+ }
+ do {
+ final ByteBuffer buffer = getBuffer();
+ final int rem = Math.min(len, buffer.remaining());
+ buffer.put(b, off, rem);
+ if (! buffer.hasRemaining()) {
+ localFlush();
+ }
+ len -= rem; off += rem;
+ } while (len > 0);
+ }
+
+ private void localFlush() throws IOException {
+ if (closed) {
+ throw new IOException("Flush on closed outputstream");
+ }
+ final ByteBuffer buffer = current;
+ if (buffer != null) try {
+ bufferSink.accept(Buffers.flip(buffer));
+ } finally {
+ current = null;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void flush() throws IOException {
+ localFlush();
+ bufferSink.flush();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close() throws IOException {
+ if (! closed) try {
+ flush();
+ bufferSink.close();
+ } finally {
+ closed = true;
+ IoUtils.safeClose(bufferSink);
+ }
+ }
+}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferReader.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferReader.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferReader.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -0,0 +1,161 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.stream;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.CharBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+
+/**
+ * A reader that reads from char buffers. Instances of this class are not safe to use
concurrently from
+ * multiple threads.
+ */
+public final class CharBufferReader extends Reader {
+ private final ObjectSource<CharBuffer> bufferSource;
+ private final BufferAllocator<CharBuffer> allocator;
+
+ private boolean closed;
+ private CharBuffer current;
+
+ public CharBufferReader(final ObjectSource<CharBuffer> bufferSource, final
BufferAllocator<CharBuffer> allocator) {
+ this.bufferSource = bufferSource;
+ this.allocator = allocator;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int read() throws IOException {
+ if (closed) {
+ return -1;
+ }
+ CharBuffer buffer = getBuffer();
+ if (buffer == null) {
+ return -1;
+ }
+ try {
+ return buffer.get() & 0xff;
+ } finally {
+ if (! buffer.hasRemaining()) {
+ current = null;
+ allocator.free(buffer);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int read(final char[] cbuf, int off, int len) throws IOException {
+ if (closed) {
+ return -1;
+ }
+ int t = 0;
+ while (len > 0) {
+ CharBuffer buffer = getBuffer();
+ if (buffer == null) {
+ return t == 0 ? -1 : t;
+ }
+ final int rem = Math.min(len, buffer.remaining());
+ if (rem > 0) {
+ buffer.get(cbuf, off, rem);
+ off += rem;
+ len -= rem;
+ t += rem;
+ }
+ if (! buffer.hasRemaining()) {
+ current = null;
+ allocator.free(buffer);
+ }
+ }
+ return t;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close() throws IOException {
+ try {
+ final CharBuffer buffer = current;
+ current = null;
+ allocator.free(buffer);
+ bufferSource.close();
+ } finally {
+ closed = true;
+ IoUtils.safeClose(bufferSource);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int read(final CharBuffer target) throws IOException {
+ if (closed) {
+ return -1;
+ }
+ int t = 0;
+ int len = target.remaining();
+ while (len > 0) {
+ CharBuffer buffer = getBuffer();
+ if (buffer == null) {
+ return t == 0 ? -1 : t;
+ }
+ final int rem = Math.min(len, buffer.remaining());
+ if (rem > 0) {
+ buffer.read(target);
+ len -= rem;
+ t += rem;
+ }
+ if (! buffer.hasRemaining()) {
+ current = null;
+ allocator.free(buffer);
+ }
+ }
+ return t;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean ready() throws IOException {
+ final CharBuffer buffer = current;
+ return buffer != null && buffer.hasRemaining();
+ }
+
+ private CharBuffer getBuffer() throws IOException {
+ final CharBuffer buffer = current;
+ if (buffer == null) {
+ if (bufferSource.hasNext()) {
+ final CharBuffer newBuffer = bufferSource.next();
+ current = newBuffer;
+ return newBuffer;
+ } else {
+ return null;
+ }
+ } else {
+ return buffer;
+ }
+ }
+}
\ No newline at end of file
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferWriter.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferWriter.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/CharBufferWriter.java 2008-07-02
22:54:29 UTC (rev 4341)
@@ -0,0 +1,150 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.stream;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.nio.CharBuffer;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Buffers;
+
+/**
+ * A writer that writes to buffers. Instances of this class are not normally safe to use
from multiple threads
+ * concurrently.
+ */
+public final class CharBufferWriter extends Writer {
+ private final ObjectSink<CharBuffer> bufferSink;
+ private final BufferAllocator<CharBuffer> allocator;
+
+ private CharBuffer current;
+ private boolean closed;
+
+ /**
+ * Construct a new stream instance.
+ *
+ * @param bufferSink the buffer sink to which full buffers will be written
+ * @param allocator the allocator from which empty buffers will be allocated
+ */
+ public CharBufferWriter(final ObjectSink<CharBuffer> bufferSink, final
BufferAllocator<CharBuffer> allocator) {
+ this.bufferSink = bufferSink;
+ this.allocator = allocator;
+ }
+
+ private CharBuffer getBuffer() throws IOException {
+ final CharBuffer buffer = current;
+ if (buffer == null) {
+ CharBuffer newbuf = allocator.allocate();
+ if (newbuf == null) {
+ throw new IOException("No buffers available");
+ }
+ current = newbuf;
+ return newbuf;
+ } else {
+ return buffer;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void write(final int b) throws IOException {
+ if (closed) {
+ throw new IOException("Write to closed writer");
+ }
+ final CharBuffer buffer = getBuffer();
+ buffer.put((char)b);
+ if (! buffer.hasRemaining()) {
+ localFlush();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void write(final char[] b, int off, int len) throws IOException {
+ if (closed) {
+ throw new IOException("Write to closed writer");
+ }
+ do {
+ final CharBuffer buffer = getBuffer();
+ final int rem = Math.min(len, buffer.remaining());
+ buffer.put(b, off, rem);
+ if (! buffer.hasRemaining()) {
+ localFlush();
+ }
+ len -= rem; off += rem;
+ } while (len > 0);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void write(final String str, int off, int len) throws IOException {
+ if (closed) {
+ throw new IOException("Write to closed writer");
+ }
+ do {
+ final CharBuffer buffer = getBuffer();
+ final int rem = Math.min(len, buffer.remaining());
+ buffer.put(str, off, rem);
+ if (! buffer.hasRemaining()) {
+ localFlush();
+ }
+ len -= rem; off += rem;
+ } while (len > 0);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void flush() throws IOException {
+ localFlush();
+ bufferSink.flush();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void close() throws IOException {
+ if (! closed) try {
+ flush();
+ bufferSink.close();
+ } finally {
+ closed = true;
+ IoUtils.safeClose(bufferSink);
+ }
+ }
+
+ private void localFlush() throws IOException {
+ if (closed) {
+ throw new IOException("Flush on closed writer");
+ }
+ final CharBuffer buffer = current;
+ if (buffer != null) try {
+ bufferSink.accept(Buffers.flip(buffer));
+ } finally {
+ current = null;
+ }
+ }
+}
\ No newline at end of file