Author: ron.sigal(a)jboss.com
Date: 2010-10-04 23:01:44 -0400 (Mon, 04 Oct 2010)
New Revision: 6118
Added:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/stream/PrimaryExternalizerFactoryTestCase.java
Log:
JBREM-1228: New unit test.
Added:
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/stream/PrimaryExternalizerFactoryTestCase.java
===================================================================
---
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/stream/PrimaryExternalizerFactoryTestCase.java
(rev 0)
+++
remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/stream/PrimaryExternalizerFactoryTestCase.java 2010-10-05
03:01:44 UTC (rev 6118)
@@ -0,0 +1,537 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.test.stream;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.io.Reader;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+import org.jboss.remoting3.Client;
+import org.jboss.remoting3.Connection;
+import org.jboss.remoting3.RemoteExecutionException;
+import org.jboss.remoting3.RequestContext;
+import org.jboss.remoting3.RequestListener;
+import org.jboss.remoting3.stream.ObjectSink;
+import org.jboss.remoting3.stream.ObjectSource;
+import org.jboss.remoting3.stream.Streams;
+import org.jboss.remoting3.test.RemotingTestBase;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.log.Logger;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version $Revision: 1.1 $
+ * <p>
+ * Copyright September 25, 2010
+ */
+@Test(suiteName = "PrimaryExternalizerFactory")
+public class PrimaryExternalizerFactoryTestCase extends RemotingTestBase {
+
+ private static final Logger log =
Logger.getLogger(PrimaryExternalizerFactoryTestCase.class);
+ private static final String SERVICE_TYPE = "streamservicetype";
+ private static final String INSTANCE_NAME = "test";
+ private static final String READER_TEST_STRING = "reader test string";
+ private static final String WRITER_TEST_STRING = "writer test string";
+
+ private static int counter = new Random(new Date().getTime()).nextInt();
+
+ @BeforeMethod
+ public void setUp() {
+ }
+
+ @AfterMethod
+ public void tearDown() throws IOException {
+ }
+
+ @Test
+ public void testInputStream() throws Exception {
+ enter();
+
+ ServerPackage sp0 = null;
+ ServerPackage sp1 = null;
+ Connection connection = null;
+ Client<Object, Object> client = null;
+
+ try {
+ int id0 = counter++;
+ int id1 = counter++;
+
+ // Set up services.
+ sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+ sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+
+ // Set up connection and client.
+ connection = setupConnection(sp0, sp1);
+ client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+
+ // Write to streams.
+ final PipedInputStream pis = new PipedInputStream();
+ final PipedOutputStream pos = new PipedOutputStream(pis);
+ pos.write("before invocation".getBytes());
+ pos.flush();
+
+ class Holder {
+ public Object result;
+ };
+
+ final Holder holder = new Holder();
+ final Client<Object, Object> finalClient = client;
+ new Thread() {
+ public void run() {
+ try {
+ log.info(this + " calling invoke()");
+ Object o = finalClient.invoke(pis);
+ synchronized(holder) {
+ holder.result = o;
+ log.info("set result to: " + holder.result);
+ holder.notifyAll();
+ }
+ } catch (IOException e) {
+ log.error("unable to call Client.invoke()");
+ }
+ }
+ }.start();
+
+ Thread.sleep(1000);
+ pos.write(" | after invocation".getBytes());
+ pos.flush();
+
+ synchronized (holder) {
+ if (holder.result == null) {
+ holder.wait(3000);
+ }
+ }
+
+ pos.close();
+ pis.close();
+
+ Thread.sleep(1000);
+
+ log.info(this + " holder.result: " + holder.result);
+ assertEquals("before invocation | after invocation", holder.result);
+
+ log.info(getName() + " PASSES");
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ if (sp0 != null) {
+ sp0.close();
+ }
+ if (sp1 != null) {
+ sp1.close();
+ }
+ exit();
+ }
+ }
+
+ @Test
+ public void testOutputStream() throws Exception {
+ enter();
+
+ ServerPackage sp0 = null;
+ ServerPackage sp1 = null;
+ Connection connection = null;
+ Client<Object, Object> client = null;
+
+ try {
+ int id0 = counter++;
+ int id1 = counter++;
+
+ // Set up services.
+ sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+ sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+
+ // Set up connection and client.
+ connection = setupConnection(sp0, sp1);
+ client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+
+ // Test OutputStream.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ client.invoke(baos);
+ String result = new String(baos.toByteArray());
+ assertEquals(WRITER_TEST_STRING, result);
+ log.info("result: " + result);
+
+ log.info(getName() + " PASSES");
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ if (sp0 != null) {
+ sp0.close();
+ }
+ if (sp1 != null) {
+ sp1.close();
+ }
+ exit();
+ }
+ }
+
+ @Test
+ public void testReader() throws Exception {
+ enter();
+
+ ServerPackage sp0 = null;
+ ServerPackage sp1 = null;
+ Connection connection = null;
+ Client<Object, Object> client = null;
+
+ try {
+ int id0 = counter++;
+ int id1 = counter++;
+
+ // Set up services.
+ sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+ sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+
+ // Set up connection and client.
+ connection = setupConnection(sp0, sp1);
+ client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+
+ // Test Reader.
+ Reader reader = new StringReader(READER_TEST_STRING);
+ Object o = client.invoke(reader);
+ log.info(this + " response: " + o);
+ assertEquals(READER_TEST_STRING, o);
+
+ log.info(getName() + " PASSES");
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ if (sp0 != null) {
+ sp0.close();
+ }
+ if (sp1 != null) {
+ sp1.close();
+ }
+ exit();
+ }
+ }
+
+ @Test
+ public void testWriter() throws Exception {
+ enter();
+
+ ServerPackage sp0 = null;
+ ServerPackage sp1 = null;
+ Connection connection = null;
+ Client<Object, Object> client = null;
+
+ try {
+ int id0 = counter++;
+ int id1 = counter++;
+
+ // Set up services.
+ sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+ sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+
+ // Set up connection and client.
+ connection = setupConnection(sp0, sp1);
+ client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+
+ // Test Writer.
+ StringWriter writer = new StringWriter();
+ Object o = client.invoke(writer);
+ log.info(this + " response: " + o);
+ assertEquals(WRITER_TEST_STRING, o);
+ StringBuffer buffer = writer.getBuffer();
+ char[] chars = new char[buffer.length()];
+ buffer.getChars(0, buffer.length(), chars, 0);
+ String writerContents = new String(chars);
+ log.info(this + " returned in writer: " + writerContents);
+ assertEquals(WRITER_TEST_STRING, writerContents);
+
+ log.info(getName() + " PASSES");
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ if (sp0 != null) {
+ sp0.close();
+ }
+ if (sp1 != null) {
+ sp1.close();
+ }
+ exit();
+ }
+ }
+
+ @Test
+ public void testObjectSource() throws Exception {
+ enter();
+
+ ServerPackage sp0 = null;
+ ServerPackage sp1 = null;
+ Connection connection = null;
+ Client<Object, Object> client = null;
+
+ try {
+ int id0 = counter++;
+ int id1 = counter++;
+
+ // Set up services.
+ sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+ sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+
+ // Set up connection and client.
+ connection = setupConnection(sp0, sp1);
+ client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+
+ // Test ObjectSource.
+ ArrayList<Object> list = new ArrayList<Object>();
+ TestObject o1 = new TestObject(1);
+ TestObject o2 = new TestObject(2);
+ TestObject o3 = new TestObject(3);
+ list.add(o1);
+ list.add(o2);
+ list.add(o3);
+ ObjectSource<Object> source =
Streams.getIteratorObjectSource(list.iterator());
+ Object o = client.invoke(source);
+ log.info(this + " response: " + o);
+ assertTrue(o instanceof List<?>);
+ @SuppressWarnings("unchecked")
+ List<Object> result = (List<Object>) o;
+ assertEquals(3, result.size());
+ assertTrue(result.contains(o1));
+ assertTrue(result.contains(o2));
+ assertTrue(result.contains(o3));
+
+ log.info(getName() + " PASSES");
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ if (sp0 != null) {
+ sp0.close();
+ }
+ if (sp1 != null) {
+ sp1.close();
+ }
+ exit();
+ }
+ }
+
+ @Test
+ public void testObjectSink() throws Exception {
+ enter();
+
+ ServerPackage sp0 = null;
+ ServerPackage sp1 = null;
+ Connection connection = null;
+ Client<Object, Object> client = null;
+
+ try {
+ int id0 = counter++;
+ int id1 = counter++;
+
+ // Set up services.
+ sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+ sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE,
INSTANCE_NAME);
+
+ // Set up connection and client.
+ connection = setupConnection(sp0, sp1);
+ client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+
+ // Test ObjectSink.
+ List<String> set = new ArrayList<String>();
+ ObjectSink<String> objectSink = Streams.getCollectionObjectSink(set);
+ client.invoke(objectSink);
+ assertEquals(3, set.size());
+ assertTrue(set.contains("sink 1"));
+ assertTrue(set.contains("sink 2"));
+ assertTrue(set.contains("sink 3"));
+
+ log.info(getName() + " PASSES");
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ if (connection != null) {
+ connection.close();
+ }
+ if (sp0 != null) {
+ sp0.close();
+ }
+ if (sp1 != null) {
+ sp1.close();
+ }
+ exit();
+ }
+ }
+
+ protected OptionMap getClientOptionMap() {
+ return OptionMap.EMPTY;
+ }
+
+ public Client<Object, Object> setupClient(Connection connection, String
serviceType, String instanceName) throws IOException, URISyntaxException {
+ Client<Object, Object> client =
getFutureResult(connection.openClient(serviceType, instanceName, Object.class,
Object.class, getClientOptionMap()), 1000000, "unable to open client to " +
serviceType + ":" + instanceName);
+ return client;
+ }
+
+ static class TestRequestListener implements RequestListener<Object, Object> {
+ public void handleRequest(RequestContext<Object> context, Object request)
throws RemoteExecutionException {
+ try {
+ if (request instanceof InputStream) {
+ log.info(this + " got InputStream: " + request);
+ InputStream is = (InputStream) request;
+ byte[] bytes = new byte[1024];
+ int n = 0;
+ int pos = 0;
+ do {
+ n = is.read(bytes, pos, 1024 - pos);
+ log.info(this + " read " + n + " bytes");
+ if (n <= 0) {
+ break;
+ }
+ pos += n;
+ log.info(this + " read from InputStream: " + new
String(bytes));
+ } while (true);
+ byte[] trimmedBytes = Arrays.copyOfRange(bytes, 0, pos);
+ log.info(this + " sending reply: " + new String(trimmedBytes));
+ context.sendReply(new String(trimmedBytes));
+ }
+
+ else if (request instanceof OutputStream) {
+ log.info(this + " got OutputStream: " + request);
+ OutputStream outputStream = (OutputStream) request;
+ outputStream.write(WRITER_TEST_STRING.getBytes());
+ outputStream.flush();
+ outputStream.close();
+ context.sendReply(null);
+ }
+
+ else if (request instanceof Reader) {
+ log.info(this + " got Reader: " + request);
+ Reader reader = (Reader) request;
+ char[] chars = new char[1024];
+ int n = 0;
+ int pos = 0;
+ do {
+ n = reader.read(chars, pos, 1024 - pos);
+ log.info(this + " read " + n + " chars");
+ if (n <= 0) {
+ break;
+ }
+ pos += n;
+ log.info(this + " read from InputStream: " + new
String(chars));
+ } while (true);
+ char[] trimmedChars = Arrays.copyOfRange(chars, 0, pos);
+ log.info(this + " sending reply: " + new String(trimmedChars));
+ context.sendReply(new String(trimmedChars));
+ }
+
+ else if (request instanceof Writer) {
+ log.info(this + " got Writer: " + request);
+ Writer writer = (Writer) request;
+ writer.write(WRITER_TEST_STRING);
+ writer.close();
+ log.info(this + " sending reply: " + WRITER_TEST_STRING);
+ context.sendReply(WRITER_TEST_STRING);
+ }
+
+ else if (request instanceof ObjectSource<?>) {
+ log.info(this + " got ObjectSource: " + request);
+ @SuppressWarnings("unchecked")
+ ObjectSource<Object> source = (ObjectSource<Object>) request;
+ ArrayList<Object> list = new ArrayList<Object>();
+ while (source.hasNext()) {
+ Object o = source.next();
+ log.info(this + " adding " + o + " to list");
+ list.add(o);
+ }
+ context.sendReply(list);
+ }
+
+ else if (request instanceof ObjectSink<?>) {
+ log.info(this + " got ObjectSink: " + request);
+ @SuppressWarnings("unchecked")
+ ObjectSink<String> sink = (ObjectSink<String>) request;
+ sink.accept("sink 1");
+ sink.accept("sink 2");
+ sink.accept("sink 3");
+ sink.flush();
+ sink.close();
+ context.sendReply(null);
+ }
+ } catch (IOException e) {
+ log.error(this + " unable to send reply", e);
+ e.printStackTrace();
+ }
+ }
+ }
+
+ static class TestObject implements Serializable {
+ public int seed;
+ private static final long serialVersionUID = -6320960380815363629L;
+
+ public TestObject(int seed) {
+ this.seed = seed;
+ }
+ public int hashCode() {
+ return seed;
+ }
+ public boolean equals(Object o) {
+ if (! (o instanceof TestObject)) {
+ return false;
+ }
+ return seed == ((TestObject) o).seed;
+ }
+ }
+}