Author: david.lloyd(a)jboss.com
Date: 2009-02-26 18:06:03 -0500 (Thu, 26 Feb 2009)
New Revision: 4863
Added:
remoting3/trunk/api/src/main/java/org/jboss/remoting3/stream/ObjectPipe.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting3/stream/ObjectSource.java
Log:
Clarify ObjectSource semantics. Implement an object pipe for connecting a remote
ObjectSink to a local ObjectSource.
Added: remoting3/trunk/api/src/main/java/org/jboss/remoting3/stream/ObjectPipe.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting3/stream/ObjectPipe.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting3/stream/ObjectPipe.java 2009-02-26
23:06:03 UTC (rev 4863)
@@ -0,0 +1,211 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.EOFException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.Queue;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+/**
+ * A pipe for objects. Typically, data is written to the sink side of the pipe from one
thread while being read from
+ * the source side in another thread. Object pipes are useful in the case that you send
an {@link
+ * org.jboss.remoting3.stream.ObjectSink ObjectSink} to a remote system in a request, but
you want to read the objects
+ * from an {@link org.jboss.remoting3.stream.ObjectSource ObjectSource}.
+ */
+public final class ObjectPipe<T> {
+
+ private final Lock queueLock = new ReentrantLock();
+ // signal on write, await on read
+ private final Condition writeCondition = queueLock.newCondition();
+ // signal on read, await on write
+ private final Condition readCondition = queueLock.newCondition();
+
+ private final Queue<T> queue = new LinkedList<T>();
+
+ private final Source source = new Source();
+ private final Sink sink = new Sink();
+ private final int max;
+ private boolean open = true;
+
+ /**
+ * Create an object pipe with the given maximum buffer size.
+ *
+ * @param max the maximum number of buffered objects
+ */
+ public ObjectPipe(int max) {
+ this.max = max;
+ }
+
+ /**
+ * Get the source end of the pipe, from which objects may be read.
+ *
+ * @return the source end
+ */
+ public ObjectSource<T> getSource() {
+ return source;
+ }
+
+ /**
+ * Get the sink end of the pipe, to which objects may be written.
+ *
+ * @return the sink end
+ */
+ public ObjectSink<T> getSink() {
+ return sink;
+ }
+
+ private class Source implements ObjectSource<T> {
+
+ public boolean hasNext() throws IOException {
+ final Lock queueLock = ObjectPipe.this.queueLock;
+ final Condition writeCondition = ObjectPipe.this.writeCondition;
+ final Queue<T> queue = ObjectPipe.this.queue;
+ try {
+ queueLock.lockInterruptibly();
+ try {
+ while (open && queue.isEmpty()) {
+ writeCondition.await();
+ }
+ return open || !queue.isEmpty();
+ } finally {
+ queueLock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("hasNext() was interrupted");
+ }
+ }
+
+ public T next() throws IOException {
+ final Lock queueLock = ObjectPipe.this.queueLock;
+ final Queue<T> queue = ObjectPipe.this.queue;
+ try {
+ queueLock.lockInterruptibly();
+ try {
+ final T t = queue.poll();
+ if (t == null) {
+ if (open) {
+ throw new NoSuchElementException();
+ } else {
+ throw new EOFException("EOF on next()");
+ }
+ }
+ readCondition.signal();
+ return t;
+ } finally {
+ queueLock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("hasNext() was interrupted");
+ }
+ }
+
+ /**
+ * Closing the reader breaks everything. All unread items are discarded, all
waiters are woken up.
+ */
+ public void close() {
+ final Lock queueLock = ObjectPipe.this.queueLock;
+ queueLock.lock();
+ try {
+ if (open) {
+ open = false;
+ queue.clear();
+ writeCondition.signalAll();
+ readCondition.signalAll();
+ }
+ } finally {
+ queueLock.unlock();
+ }
+ }
+
+ protected void finalize() throws Throwable {
+ close();
+ super.finalize();
+ }
+ }
+
+ private class Sink implements ObjectSink<T> {
+
+ public void accept(final T instance) throws IOException {
+ final int max = ObjectPipe.this.max;
+ final Queue<T> queue = ObjectPipe.this.queue;
+ final Lock queueLock = ObjectPipe.this.queueLock;
+ try {
+ queueLock.lockInterruptibly();
+ try {
+ while (open && queue.size() == max) {
+ readCondition.await();
+ }
+ if (!open) {
+ throw new EOFException("pipe closed");
+ }
+ } finally {
+ queueLock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("accept(T) was interrupted");
+ }
+ }
+
+ /**
+ * Do nothing since nothing can be done - clearing our internal state depends on
the reader.
+ */
+ public void flush() {
+ }
+
+ /**
+ * Closing the writer just clears the open flag and notifies waiters.
+ */
+ public void close() {
+ final Lock queueLock = ObjectPipe.this.queueLock;
+ queueLock.lock();
+ try {
+ if (!open) return;
+ open = false;
+ // readers might be waiting
+ if (queue.isEmpty()) {
+ readCondition.signalAll();
+ } else {
+ readCondition.signal();
+ }
+ // other writers might also be waiting - they should be killed
+ writeCondition.signalAll();
+ } finally {
+ queueLock.unlock();
+ }
+ }
+
+ protected void finalize() throws Throwable {
+ close();
+ super.finalize();
+ }
+ }
+}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting3/stream/ObjectSource.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/remoting3/stream/ObjectSource.java 2009-02-26
21:56:19 UTC (rev 4862)
+++
remoting3/trunk/api/src/main/java/org/jboss/remoting3/stream/ObjectSource.java 2009-02-26
23:06:03 UTC (rev 4863)
@@ -2,6 +2,7 @@
import java.io.Closeable;
import java.io.IOException;
+import java.util.NoSuchElementException;
/**
* A streaming source for objects.
@@ -12,29 +13,31 @@
/**
* Indicate whether there are more objects to retrieve. If this method returns
{@code true}, an object is
- * guaranteed to be available. If this method returns {@code false}, there is no
object available; however, an
- * object may become available at a later time, depending on the implementation.
+ * guaranteed to be available. If this method returns {@code false}, the end of
stream has been reached.
* <p/>
* If this method returns {@code true}, it will continue to return {@code true} on
every subsequent invocation until
- * the next object is pulled using the {@code next()} method, or until the object
source is closed.
+ * the next object is pulled using the {@code next()} method, or until the object
source is closed. This method
+ * may block until the presence of the next object in the stream has been
ascertained.
*
* @return {@code true} if there are more objects in this stream
*/
boolean hasNext() throws IOException;
/**
- * Get the next object in the stream.
+ * Get the next object in the stream. The {@code hasNext()} method should be called
before this method is called
+ * to avoid receiving a {@code NoSuchElementException}.
*
* @return the next object
*
- * @throws IOException if the stream can no longer be read
+ * @throws NoSuchElementException if no object is available
+ * @throws IOException if an I/O error occurs
*/
- T next() throws IOException;
+ T next() throws NoSuchElementException, IOException;
/**
* Close the stream. No more objects may be read from this stream after it has been
closed.
*
- * @throws IOException if an error occurs
+ * @throws IOException if an I/O error occurs
*/
void close() throws IOException;
}