JBoss Remoting SVN: r5580 - remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2009-11-03 21:11:18 -0500 (Tue, 03 Nov 2009)
New Revision: 5580
Added:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Pair.java
Modified:
remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java
Log:
Interlude: beef up streams convenience methods
Added: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Pair.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Pair.java (rev 0)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Pair.java 2009-11-04 02:11:18 UTC (rev 5580)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.stream;
+
+import java.io.Serializable;
+
+/**
+ * A serializable pair of values.
+ *
+ * @param <A> the type of the first value
+ * @param <B> the type of the second value
+ */
+public final class Pair<A, B> implements Serializable {
+
+ private static final long serialVersionUID = -1812076980977921946L;
+
+ private final A a;
+ private final B b;
+
+ /**
+ * Construct a new instance.
+ *
+ * @param a the first value
+ * @param b the second value
+ */
+ public Pair(final A a, final B b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ /**
+ * Get the first value.
+ *
+ * @return the first value
+ */
+ public A getA() {
+ return a;
+ }
+
+ /**
+ * Get the second value.
+ *
+ * @return the second value
+ */
+ public B getB() {
+ return b;
+ }
+}
Modified: remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java 2009-11-03 18:45:12 UTC (rev 5579)
+++ remoting3/trunk/jboss-remoting/src/main/java/org/jboss/remoting3/stream/Streams.java 2009-11-04 02:11:18 UTC (rev 5580)
@@ -6,6 +6,12 @@
import java.util.Enumeration;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.FutureResult;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.IoUtils;
/**
* Handy utility methods for stream types.
@@ -37,6 +43,193 @@
}
/**
+ * Get an object source which reads from a collection.
+ *
+ * @param collection the collection to read from
+ * @param <T> the collection member type
+ * @return an object source
+ */
+ public static <T> ObjectSource<T> getCollectionObjectSource(Collection<T> collection) {
+ return getIteratorObjectSource(collection.iterator());
+ }
+
+ /**
+ * Get an object sink that appends to a map.
+ *
+ * @param target the target map
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return an object sink
+ */
+ public static <K, V> ObjectSink<Pair<K, V>> getMapObjectSink(Map<K, V> target) {
+ return new MapObjectSink<K, V>(target);
+ }
+
+ /**
+ * Get an object source that reads from an iterator over map entries.
+ *
+ * @param iterator the iterator object type
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return an object source
+ */
+ public static <K, V> ObjectSource<Pair<K, V>> getMapEntryIteratorObjectSource(Iterator<Map.Entry<K, V>> iterator) {
+ return new MapEntryIteratorObjectSource<K, V>(iterator);
+ }
+
+ /**
+ * Get an object source that reads from a map.
+ *
+ * @param map the map
+ * @param <K> the key type
+ * @param <V> the value type
+ * @return an object source
+ */
+ public static <K, V> ObjectSource<Pair<K, V>> getMapObjectSource(Map<K, V> map) {
+ return getMapEntryIteratorObjectSource(map.entrySet().iterator());
+ }
+
+ /**
+ * Populate a new collection from an object source. Since the collection may be only partially populated on error,
+ * it is recommended that the instance be discarded if an exception is thrown.
+ * <p>
+ * An example usage which meets this requirement would be: <code><pre>
+ * final List<Foo> fooList = getCollection(new ArrayList<Foo>(), fooSource);
+ * </pre></code>
+ *
+ * @param newCollection the new collection to populate
+ * @param objectSource the object source to fill the collection from
+ * @param <C> the collection type
+ * @param <T> the collection value type
+ * @return the new collection, populated
+ * @throws IOException if an error occurs
+ */
+ public static <C extends Collection<T>, T> C getCollection(C newCollection, ObjectSource<T> objectSource) throws IOException {
+ while (objectSource.hasNext()) {
+ newCollection.add(objectSource.next());
+ }
+ return newCollection;
+ }
+
+ /**
+ * Populate a new map from an object source. Since the map may be only partially populated on error,
+ * it is recommended that the instance be discarded if an exception is thrown.
+ * <p>
+ * An example usage which meets this requirement would be: <code><pre>
+ * final Map<Foo, Bar> fooBarMap = getMap(new HashMap<Foo, Bar>(), fooBarSource);
+ * </pre></code>
+ *
+ * @param newMap the new map to populate
+ * @param objectSource the object source to fill the map from
+ * @param <M> the map type
+ * @param <K> the map key type
+ * @param <V> the map value type
+ * @return the new map, populated
+ * @throws IOException if an error occurs
+ */
+ public static <M extends Map<K, V>, K, V> M getMap(M newMap, ObjectSource<Pair<K, V>> objectSource) throws IOException {
+ while (objectSource.hasNext()) {
+ final Pair<K, V> pair = objectSource.next();
+ newMap.put(pair.getA(), pair.getB());
+ }
+ return newMap;
+ }
+
+ /**
+ * Populate a new collection from an object source asynchronously. Since the collection may be only partially populated on error,
+ * it is recommended that the instance be discarded if an exception is thrown.
+ * <p>
+ * An example usage which meets this requirement would be: <code><pre>
+ * final IoFuture<List<Foo>> futureFooList = getFutureCollection(executor, new ArrayList<Foo>(), fooSource);
+ * </pre></code>
+ *
+ * @param executor the executor in which to run asynchronous tasks
+ * @param newCollection the new collection to populate
+ * @param objectSource the object source to fill the collection from
+ * @param <C> the collection type
+ * @param <T> the collection value type
+ * @return the new future collection, populated
+ * @throws IOException if an error occurs
+ */
+ public static <C extends Collection<T>, T> IoFuture<C> getFutureCollection(Executor executor, final C newCollection, final ObjectSource<T> objectSource) {
+ final FutureResult<C> futureResult = new FutureResult<C>(executor);
+ futureResult.addCancelHandler(new Cancellable() {
+ public Cancellable cancel() {
+ if (futureResult.setCancelled()) {
+ IoUtils.safeClose(objectSource);
+ }
+ return this;
+ }
+ });
+ try {
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ while (objectSource.hasNext()) {
+ newCollection.add(objectSource.next());
+ }
+ futureResult.setResult(newCollection);
+ } catch (IOException e) {
+ futureResult.setException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ final IOException ioe = new IOException("Failed to initiate asynchronous population of a collection");
+ ioe.initCause(e);
+ futureResult.setException(ioe);
+ }
+ return futureResult.getIoFuture();
+ }
+
+ /**
+ * Populate a new map from an object source asynchronously. Since the map may be only partially populated on error,
+ * it is recommended that the instance be discarded if an exception is thrown.
+ * <p>
+ * An example usage which meets this requirement would be: <code><pre>
+ * final IoFuture<Map<Foo, Bar>> futureFooBarMap = getFutureMap(executor, new HashMap<Foo, Bar>(), fooBarSource);
+ * </pre></code>
+ *
+ * @param newMap the new map to populate
+ * @param objectSource the object source to fill the map from
+ * @param <M> the map type
+ * @param <K> the map key type
+ * @param <V> the map value type
+ * @return the new map, populated
+ */
+ public static <M extends Map<K, V>, K, V> IoFuture<M> getFutureMap(Executor executor, final M newMap, final ObjectSource<Pair<K, V>> objectSource) {
+ final FutureResult<M> futureResult = new FutureResult<M>(executor);
+ futureResult.addCancelHandler(new Cancellable() {
+ public Cancellable cancel() {
+ if (futureResult.setCancelled()) {
+ IoUtils.safeClose(objectSource);
+ }
+ return this;
+ }
+ });
+ try {
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ while (objectSource.hasNext()) {
+ final Pair<K, V> pair = objectSource.next();
+ newMap.put(pair.getA(), pair.getB());
+ }
+ futureResult.setResult(newMap);
+ } catch (IOException e) {
+ futureResult.setException(e);
+ }
+ }
+ });
+ } catch (RuntimeException e) {
+ final IOException ioe = new IOException("Failed to initiate asynchronous population of a collection");
+ ioe.initCause(e);
+ futureResult.setException(ioe);
+ }
+ return futureResult.getIoFuture();
+ }
+
+ /**
* Get an object source that reads from an enumeration.
*
* @param <T> the enumeration object type
@@ -50,7 +243,7 @@
private static final class CollectionObjectSink<T> implements ObjectSink<T> {
private final Collection<T> target;
- public CollectionObjectSink(final Collection<T> target) {
+ private CollectionObjectSink(final Collection<T> target) {
this.target = target;
}
@@ -68,7 +261,7 @@
private static final class IteratorObjectSource<T> implements ObjectSource<T> {
private final Iterator<T> src;
- public IteratorObjectSource(final Iterator<T> src) {
+ private IteratorObjectSource(final Iterator<T> src) {
this.src = src;
}
@@ -94,7 +287,7 @@
private static final class EnumerationObjectSource<T> implements ObjectSource<T> {
private final Enumeration<T> src;
- public EnumerationObjectSource(final Enumeration<T> src) {
+ private EnumerationObjectSource(final Enumeration<T> src) {
this.src = src;
}
@@ -116,4 +309,44 @@
// empty
}
}
+
+ private static final class MapObjectSink<K, V> implements ObjectSink<Pair<K, V>> {
+
+ private final Map<K, V> target;
+
+ private MapObjectSink(final Map<K, V> target) {
+ this.target = target;
+ }
+
+ public void accept(final Pair<K, V> instance) throws IOException {
+ target.put(instance.getA(), instance.getB());
+ }
+
+ public void flush() throws IOException {
+ // empty
+ }
+
+ public void close() throws IOException {
+ // empty
+ }
+ }
+
+ private static final class MapEntryIteratorObjectSource<K, V> implements ObjectSource<Pair<K, V>> {
+ private final Iterator<Map.Entry<K, V>> source;
+
+ private MapEntryIteratorObjectSource(final Iterator<Map.Entry<K, V>> source) {
+ this.source = source;
+ }
+
+ public boolean hasNext() throws IOException {
+ return false;
+ }
+
+ public Pair<K, V> next() throws NoSuchElementException, IOException {
+ return null;
+ }
+
+ public void close() throws IOException {
+ }
+ }
}