[jboss-remoting-commits] JBoss Remoting SVN: r6118 - remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/stream.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Mon Oct 4 23:01:45 EDT 2010


Author: ron.sigal at jboss.com
Date: 2010-10-04 23:01:44 -0400 (Mon, 04 Oct 2010)
New Revision: 6118

Added:
   remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/stream/PrimaryExternalizerFactoryTestCase.java
Log:
JBREM-1228: New unit test.

Added: remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/stream/PrimaryExternalizerFactoryTestCase.java
===================================================================
--- remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/stream/PrimaryExternalizerFactoryTestCase.java	                        (rev 0)
+++ remoting3/trunk/jboss-remoting/src/test/java/org/jboss/remoting3/test/stream/PrimaryExternalizerFactoryTestCase.java	2010-10-05 03:01:44 UTC (rev 6118)
@@ -0,0 +1,537 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.test.stream;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.io.Reader;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+import org.jboss.remoting3.Client;
+import org.jboss.remoting3.Connection;
+import org.jboss.remoting3.RemoteExecutionException;
+import org.jboss.remoting3.RequestContext;
+import org.jboss.remoting3.RequestListener;
+import org.jboss.remoting3.stream.ObjectSink;
+import org.jboss.remoting3.stream.ObjectSource;
+import org.jboss.remoting3.stream.Streams;
+import org.jboss.remoting3.test.RemotingTestBase;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.log.Logger;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
+ * @version $Revision: 1.1 $
+ * <p>
+ * Copyright September 25, 2010
+ */
+ at Test(suiteName = "PrimaryExternalizerFactory")
+public class PrimaryExternalizerFactoryTestCase extends RemotingTestBase {
+   
+   private static final Logger log = Logger.getLogger(PrimaryExternalizerFactoryTestCase.class);
+   private static final String SERVICE_TYPE = "streamservicetype";
+   private static final String INSTANCE_NAME = "test";
+   private static final String READER_TEST_STRING = "reader test string";
+   private static final String WRITER_TEST_STRING = "writer test string";
+   
+   private static int counter = new Random(new Date().getTime()).nextInt();
+
+   @BeforeMethod
+   public void setUp() {
+   }
+
+   @AfterMethod
+   public void tearDown() throws IOException {
+   }
+
+   @Test
+   public void testInputStream() throws Exception {
+      enter();
+      
+      ServerPackage sp0 = null;
+      ServerPackage sp1 = null;
+      Connection connection = null;
+      Client<Object, Object> client = null;
+      
+      try {
+         int id0 = counter++;
+         int id1 = counter++;
+
+         // Set up services.
+         sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+         sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+
+         // Set up connection and client.
+         connection = setupConnection(sp0, sp1);
+         client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+         
+         // Write to streams.
+         final PipedInputStream pis = new PipedInputStream();
+         final PipedOutputStream pos = new PipedOutputStream(pis);
+         pos.write("before invocation".getBytes());
+         pos.flush();
+         
+         class Holder {
+            public Object result;
+         };
+
+         final Holder holder = new Holder();
+         final Client<Object, Object> finalClient = client;
+         new Thread() {
+            public void run() {
+               try {
+                  log.info(this + " calling invoke()");
+                  Object o = finalClient.invoke(pis);
+                  synchronized(holder) {
+                     holder.result = o;
+                     log.info("set result to: " + holder.result);
+                     holder.notifyAll();
+                  }
+               } catch (IOException e) {
+                  log.error("unable to call Client.invoke()");
+               }
+            }
+         }.start();
+         
+         Thread.sleep(1000);
+         pos.write(" | after invocation".getBytes());
+         pos.flush();
+         
+         synchronized (holder) {
+            if (holder.result == null) {
+               holder.wait(3000);
+            }
+         }
+         
+         pos.close();
+         pis.close();
+
+         Thread.sleep(1000);
+         
+         log.info(this + " holder.result: " + holder.result);
+         assertEquals("before invocation | after invocation", holder.result);
+         
+         log.info(getName() + " PASSES");
+      } finally {
+         if (client != null) {
+            client.close();
+         }
+         if (connection != null) {
+            connection.close();
+         }
+         if (sp0 != null) {
+            sp0.close();
+         }
+         if (sp1 != null) {
+            sp1.close();
+         }
+         exit();
+      }
+   }  
+ 
+   @Test
+   public void testOutputStream() throws Exception {
+      enter();
+      
+      ServerPackage sp0 = null;
+      ServerPackage sp1 = null;
+      Connection connection = null;
+      Client<Object, Object> client = null;
+      
+      try {
+         int id0 = counter++;
+         int id1 = counter++;
+
+         // Set up services.
+         sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+         sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+
+         // Set up connection and client.
+         connection = setupConnection(sp0, sp1);
+         client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+         
+         // Test OutputStream.
+         ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         client.invoke(baos);
+         String result = new String(baos.toByteArray());
+         assertEquals(WRITER_TEST_STRING, result);
+         log.info("result: " + result);
+         
+         log.info(getName() + " PASSES");
+      } finally {
+         if (client != null) {
+            client.close();
+         }
+         if (connection != null) {
+            connection.close();
+         }
+         if (sp0 != null) {
+            sp0.close();
+         }
+         if (sp1 != null) {
+            sp1.close();
+         }
+         exit();
+      }
+   }  
+   
+   @Test
+   public void testReader() throws Exception {
+      enter();
+      
+      ServerPackage sp0 = null;
+      ServerPackage sp1 = null;
+      Connection connection = null;
+      Client<Object, Object> client = null;
+      
+      try {
+         int id0 = counter++;
+         int id1 = counter++;
+
+         // Set up services.
+         sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+         sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+
+         // Set up connection and client.
+         connection = setupConnection(sp0, sp1);
+         client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+   
+         // Test Reader.
+         Reader reader = new StringReader(READER_TEST_STRING);
+         Object o = client.invoke(reader);
+         log.info(this + " response: " + o);
+         assertEquals(READER_TEST_STRING, o);
+         
+         log.info(getName() + " PASSES");
+      } finally {
+         if (client != null) {
+            client.close();
+         }
+         if (connection != null) {
+            connection.close();
+         }
+         if (sp0 != null) {
+            sp0.close();
+         }
+         if (sp1 != null) {
+            sp1.close();
+         }
+         exit();
+      }
+   }  
+   
+   @Test
+   public void testWriter() throws Exception {
+      enter();
+      
+      ServerPackage sp0 = null;
+      ServerPackage sp1 = null;
+      Connection connection = null;
+      Client<Object, Object> client = null;
+      
+      try {
+         int id0 = counter++;
+         int id1 = counter++;
+
+         // Set up services.
+         sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+         sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+
+         // Set up connection and client.
+         connection = setupConnection(sp0, sp1);
+         client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+         
+         // Test Writer.
+         StringWriter writer = new StringWriter();
+         Object o = client.invoke(writer);
+         log.info(this + " response: " + o);
+         assertEquals(WRITER_TEST_STRING, o);
+         StringBuffer buffer = writer.getBuffer();
+         char[] chars = new char[buffer.length()];
+         buffer.getChars(0, buffer.length(), chars, 0);
+         String writerContents = new String(chars);
+         log.info(this + " returned in writer: " + writerContents);
+         assertEquals(WRITER_TEST_STRING, writerContents);
+         
+         log.info(getName() + " PASSES");
+      } finally {
+         if (client != null) {
+            client.close();
+         }
+         if (connection != null) {
+            connection.close();
+         }
+         if (sp0 != null) {
+            sp0.close();
+         }
+         if (sp1 != null) {
+            sp1.close();
+         }
+         exit();
+      }
+   }  
+   
+   @Test
+   public void testObjectSource() throws Exception {
+      enter();
+      
+      ServerPackage sp0 = null;
+      ServerPackage sp1 = null;
+      Connection connection = null;
+      Client<Object, Object> client = null;
+      
+      try {
+         int id0 = counter++;
+         int id1 = counter++;
+
+         // Set up services.
+         sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+         sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+
+         // Set up connection and client.
+         connection = setupConnection(sp0, sp1);
+         client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+         
+         // Test ObjectSource.
+         ArrayList<Object> list = new ArrayList<Object>();
+         TestObject o1 = new TestObject(1);
+         TestObject o2 = new TestObject(2);
+         TestObject o3 = new TestObject(3);
+         list.add(o1);
+         list.add(o2);
+         list.add(o3);
+         ObjectSource<Object> source = Streams.getIteratorObjectSource(list.iterator());
+         Object o = client.invoke(source);
+         log.info(this + " response: " + o);
+         assertTrue(o instanceof List<?>);
+         @SuppressWarnings("unchecked")
+         List<Object> result = (List<Object>) o;
+         assertEquals(3, result.size());
+         assertTrue(result.contains(o1));
+         assertTrue(result.contains(o2));
+         assertTrue(result.contains(o3));
+         
+         log.info(getName() + " PASSES");
+      } finally {
+         if (client != null) {
+            client.close();
+         }
+         if (connection != null) {
+            connection.close();
+         }
+         if (sp0 != null) {
+            sp0.close();
+         }
+         if (sp1 != null) {
+            sp1.close();
+         }
+         exit();
+      }
+   }  
+   
+   @Test
+   public void testObjectSink() throws Exception {
+      enter();
+      
+      ServerPackage sp0 = null;
+      ServerPackage sp1 = null;
+      Connection connection = null;
+      Client<Object, Object> client = null;
+      
+      try {
+         int id0 = counter++;
+         int id1 = counter++;
+
+         // Set up services.
+         sp0 = setupServer(null, id0, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+         sp1 = setupServer(null, id1, new TestRequestListener(), SERVICE_TYPE, INSTANCE_NAME);
+
+         // Set up connection and client.
+         connection = setupConnection(sp0, sp1);
+         client = setupClient(connection, SERVICE_TYPE, INSTANCE_NAME);
+         
+         // Test ObjectSink.
+         List<String> set = new ArrayList<String>();
+         ObjectSink<String> objectSink = Streams.getCollectionObjectSink(set);
+         client.invoke(objectSink);
+         assertEquals(3, set.size());
+         assertTrue(set.contains("sink 1"));
+         assertTrue(set.contains("sink 2"));
+         assertTrue(set.contains("sink 3"));
+         
+         log.info(getName() + " PASSES");
+      } finally {
+         if (client != null) {
+            client.close();
+         }
+         if (connection != null) {
+            connection.close();
+         }
+         if (sp0 != null) {
+            sp0.close();
+         }
+         if (sp1 != null) {
+            sp1.close();
+         }
+         exit();
+      }
+   }  
+   
+   protected OptionMap getClientOptionMap() {
+      return OptionMap.EMPTY;
+   }
+   
+   public Client<Object, Object> setupClient(Connection connection, String serviceType, String instanceName) throws IOException, URISyntaxException {
+      Client<Object, Object> client = getFutureResult(connection.openClient(serviceType, instanceName, Object.class, Object.class, getClientOptionMap()), 1000000, "unable to open client to " + serviceType + ":" + instanceName);
+      return client;
+   }
+   
+   static class TestRequestListener implements RequestListener<Object, Object> {
+      public void handleRequest(RequestContext<Object> context, Object request) throws RemoteExecutionException {
+         try {
+            if (request instanceof InputStream) {
+               log.info(this + " got InputStream: " + request);
+               InputStream is = (InputStream) request;
+               byte[] bytes = new byte[1024];
+               int n = 0;
+               int pos = 0;
+               do {
+                  n = is.read(bytes, pos, 1024 - pos);
+                  log.info(this + " read " + n + " bytes");
+                  if (n <= 0) {
+                     break;
+                  }
+                  pos += n;
+                  log.info(this + " read from InputStream: " + new String(bytes));
+               } while (true);
+               byte[] trimmedBytes = Arrays.copyOfRange(bytes, 0, pos);
+               log.info(this + " sending reply: " + new String(trimmedBytes));
+               context.sendReply(new String(trimmedBytes));
+            } 
+            
+            else if (request instanceof OutputStream) {
+               log.info(this + " got OutputStream: " + request);
+               OutputStream outputStream = (OutputStream) request;
+               outputStream.write(WRITER_TEST_STRING.getBytes());
+               outputStream.flush();
+               outputStream.close();
+               context.sendReply(null);
+            }
+            
+            else if (request instanceof Reader) {
+               log.info(this + " got Reader: " + request);
+               Reader reader = (Reader) request;
+               char[] chars = new char[1024];
+               int n = 0;
+               int pos = 0;
+               do {
+                  n =  reader.read(chars, pos, 1024 - pos);
+                  log.info(this + " read " + n + " chars");
+                  if (n <= 0) {
+                     break;
+                  }
+                  pos += n;
+                  log.info(this + " read from InputStream: " + new String(chars));
+               } while (true);
+               char[] trimmedChars = Arrays.copyOfRange(chars, 0, pos);
+               log.info(this + " sending reply: " + new String(trimmedChars));
+               context.sendReply(new String(trimmedChars));
+            }
+            
+            else if (request instanceof Writer) {
+               log.info(this + " got Writer: " + request);
+               Writer writer = (Writer) request;
+               writer.write(WRITER_TEST_STRING);
+               writer.close();
+               log.info(this + " sending reply: " + WRITER_TEST_STRING);
+               context.sendReply(WRITER_TEST_STRING);
+            }
+            
+            else if (request instanceof ObjectSource<?>) {
+               log.info(this + " got ObjectSource: " + request);
+               @SuppressWarnings("unchecked")
+               ObjectSource<Object> source = (ObjectSource<Object>) request;
+               ArrayList<Object> list = new ArrayList<Object>();
+               while (source.hasNext()) {
+                  Object o = source.next();
+                  log.info(this + " adding " + o + " to list");
+                  list.add(o);
+               }
+               context.sendReply(list);
+            }
+            
+            else if (request instanceof ObjectSink<?>) {
+               log.info(this + " got ObjectSink: " + request);
+               @SuppressWarnings("unchecked")
+               ObjectSink<String> sink = (ObjectSink<String>) request;
+               sink.accept("sink 1");
+               sink.accept("sink 2");
+               sink.accept("sink 3");
+               sink.flush();
+               sink.close();
+               context.sendReply(null);
+            }
+         } catch (IOException e) {
+            log.error(this + " unable to send reply", e);
+            e.printStackTrace();
+         }
+      }
+   }
+   
+   static class TestObject implements Serializable {
+      public int seed;
+      private static final long serialVersionUID = -6320960380815363629L;
+      
+      public TestObject(int seed) {
+         this.seed = seed;
+      }
+      public int hashCode() {
+         return seed;
+      }
+      public boolean equals(Object o) {
+         if (! (o instanceof TestObject)) {
+            return false;
+         }
+         return seed == ((TestObject) o).seed;
+      }
+   }
+}



More information about the jboss-remoting-commits mailing list