[jboss-cvs] JBoss Messaging SVN: r3912 - in projects/network-benchmark: lib and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Mar 22 07:43:10 EDT 2008


Author: trustin
Date: 2008-03-22 07:43:10 -0400 (Sat, 22 Mar 2008)
New Revision: 3912

Added:
   projects/network-benchmark/src/network/AllInOneServer.java
   projects/network-benchmark/src/network/ClientSetting.java
   projects/network-benchmark/src/network/CommonSetting.java
   projects/network-benchmark/src/network/MINAAPRServer.java
   projects/network-benchmark/src/network/MINANIOServer.java
Removed:
   projects/network-benchmark/lib/mina-core-2.0.0-M1.jar
   projects/network-benchmark/lib/mina-core-2.0.0-M2-20080317.150334-8.jar
Modified:
   projects/network-benchmark/.classpath
   projects/network-benchmark/src/network/BIOServer.java
   projects/network-benchmark/src/network/MINAServer.java
   projects/network-benchmark/src/network/NIOServer.java
   projects/network-benchmark/src/network/NetworkClientTest.java
Log:
* Extracted configurable parameters into CommonSetting and ClientSetting
* Made sure request count and response count match exactly
* Added support for APR transport
* Fixed non-blocking MINA client
* Added AllInOneServer which launcher all four servers at once

Modified: projects/network-benchmark/.classpath
===================================================================
--- projects/network-benchmark/.classpath	2008-03-22 10:52:42 UTC (rev 3911)
+++ projects/network-benchmark/.classpath	2008-03-22 11:43:10 UTC (rev 3912)
@@ -6,7 +6,9 @@
 	<classpathentry kind="lib" path="lib/log4j.jar"/>
 	<classpathentry kind="lib" path="lib/slf4j-api-1.4.3.jar"/>
 	<classpathentry kind="lib" path="lib/slf4j-log4j12.jar"/>
-	<classpathentry kind="lib" path="lib/mina-core-2.0.0-M2-20080317.150334-8.jar"/>
 	<classpathentry kind="lib" path="lib/junit.jar"/>
+	<classpathentry kind="lib" path="lib/mina-core-2.0.0-M2-SNAPSHOT.jar"/>
+	<classpathentry kind="lib" path="lib/mina-transport-apr-2.0.0-M2-SNAPSHOT.jar"/>
+	<classpathentry kind="lib" path="lib/tomcat-apr-5.5.15.jar"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>

Deleted: projects/network-benchmark/lib/mina-core-2.0.0-M1.jar
===================================================================
(Binary files differ)

Deleted: projects/network-benchmark/lib/mina-core-2.0.0-M2-20080317.150334-8.jar
===================================================================
(Binary files differ)

Added: projects/network-benchmark/src/network/AllInOneServer.java
===================================================================
--- projects/network-benchmark/src/network/AllInOneServer.java	                        (rev 0)
+++ projects/network-benchmark/src/network/AllInOneServer.java	2008-03-22 11:43:10 UTC (rev 3912)
@@ -0,0 +1,31 @@
+package network;
+
+public class AllInOneServer {
+    public static void main(String[] args) throws Exception {
+        CommonSetting.print();
+        System.out.println();
+
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    BIOServer.main(new String[0]);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+        new Thread() {
+            @Override
+            public void run() {
+                try {
+                    NIOServer.main(new String[0]);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+        new Thread(new MINANIOServer()).start();
+        new Thread(new MINAAPRServer()).start();
+    }
+}


Property changes on: projects/network-benchmark/src/network/AllInOneServer.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Modified: projects/network-benchmark/src/network/BIOServer.java
===================================================================
--- projects/network-benchmark/src/network/BIOServer.java	2008-03-22 10:52:42 UTC (rev 3911)
+++ projects/network-benchmark/src/network/BIOServer.java	2008-03-22 11:43:10 UTC (rev 3912)
@@ -1,8 +1,14 @@
 package network;
+
+import static network.CommonSetting.*;
+
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 public class BIOServer
 {
@@ -19,42 +25,96 @@
    public static void main(String[] args) throws Exception
    {
       ServerSocket serverSocket = new ServerSocket();
-      if (NetworkClientTest.TCP_BUFFER_SIZE != -1)
-         serverSocket.setReceiveBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
-      serverSocket.bind(NetworkClientTest.BIO_ADDRESS);
-      System.out
-            .println("BIO Server bound to " + NetworkClientTest.BIO_ADDRESS);
+      if (TCP_BUFFER_SIZE != -1)
+      {
+         serverSocket.setReceiveBufferSize(TCP_BUFFER_SIZE);
+      }
+      serverSocket.bind(new InetSocketAddress(BIO_SERVER_PORT));
+      System.out.println(
+              "BIO Server bound to port " + BIO_SERVER_PORT);
 
-      byte[] bytes = new byte[NetworkClientTest.MESSAGE_SIZE];
-      
+      // Create the executor and start one worker thread in advance.
+      Executor executor = Executors.newCachedThreadPool();
+      executor.execute(new Runnable() { public void run() {} });
+
       while (true)
       {
          Socket clientSocket = serverSocket.accept();
-         if (NetworkClientTest.TCP_BUFFER_SIZE != -1)
-            clientSocket.setSendBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
-         System.out.println("new client...");
-         OutputStream os = clientSocket.getOutputStream();
+         executor.execute(new Worker(clientSocket));
+      }
+   }
 
-         InputStream is = clientSocket.getInputStream();
+   // Package protected ---------------------------------------------
 
-         while (is.read(bytes) != -1)
-         {
-            os.write(bytes, 0, NetworkClientTest.RESPONSE_SIZE);
-         }
+   // Protected -----------------------------------------------------
 
-         os.close();
+   // Private -------------------------------------------------------
 
-         is.close();
+   // Inner classes -------------------------------------------------
 
-         clientSocket.close();
+   private static class Worker implements Runnable
+   {
+      private final Socket s;
+
+      private Worker(Socket s)
+      {
+         this.s = s;
       }
-   }
 
-   // Package protected ---------------------------------------------
+      public void run()
+      {
+         byte[] response = new byte[RESPONSE_SIZE];
+         for (int i = 0; i < RESPONSE_SIZE; i ++) {
+             response[i] = (byte) ((i % 10) + '0');
+         }
 
-   // Protected -----------------------------------------------------
+         try
+         {
+            int bufferSize;
+            s.setTcpNoDelay(ENABLE_TCP_NO_DELAY);
+            if (TCP_BUFFER_SIZE != -1)
+            {
+               bufferSize = TCP_BUFFER_SIZE;
+               s.setSendBufferSize(TCP_BUFFER_SIZE);
+            }
+            else
+            {
+               bufferSize = s.getReceiveBufferSize();
+            }
+            byte[] bytes = new byte[bufferSize];
+            System.out.println("new client...");
+            OutputStream os = s.getOutputStream();
+            InputStream is = s.getInputStream();
 
-   // Private -------------------------------------------------------
+            long readBytes = 0;
+            int sentResponses = 0;
 
-   // Inner classes -------------------------------------------------
+            for (;;)
+            {
+               int localReadBytes = is.read(bytes);
+               if (localReadBytes < 0)
+               {
+                  break;
+               }
+
+               readBytes += localReadBytes;
+
+               int receivedRequests = (int) (readBytes / REQUEST_SIZE);
+               for (int i = receivedRequests - sentResponses; i > 0; i --)
+               {
+                  os.write(response, 0, RESPONSE_SIZE);
+                  sentResponses ++;
+               }
+            }
+
+            os.close();
+            is.close();
+            s.close();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+   }
 }

Added: projects/network-benchmark/src/network/ClientSetting.java
===================================================================
--- projects/network-benchmark/src/network/ClientSetting.java	                        (rev 0)
+++ projects/network-benchmark/src/network/ClientSetting.java	2008-03-22 11:43:10 UTC (rev 3912)
@@ -0,0 +1,14 @@
+package network;
+
+
+
+public class ClientSetting {
+    public static final String SERVER_HOSTNAME = "127.0.0.1";
+    public static final long DURATION = 10000; // in ms
+
+    public static void print()
+    {
+       System.out.println("Host: " + SERVER_HOSTNAME);
+       System.out.println("Duration: " + DURATION + " ms");
+    }
+}


Property changes on: projects/network-benchmark/src/network/ClientSetting.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: projects/network-benchmark/src/network/CommonSetting.java
===================================================================
--- projects/network-benchmark/src/network/CommonSetting.java	                        (rev 0)
+++ projects/network-benchmark/src/network/CommonSetting.java	2008-03-22 11:43:10 UTC (rev 3912)
@@ -0,0 +1,34 @@
+package network;
+
+import java.net.Socket;
+import java.net.SocketException;
+
+
+public class CommonSetting {
+    public static final int REQUEST_SIZE = 1024; // in bytes
+    public static final int RESPONSE_SIZE = 24; // in bytes
+    public static final boolean ENABLE_TCP_NO_DELAY = true;
+    public static final int TCP_BUFFER_SIZE = -1; // -1 to use the O/S defaults
+    public static final int BIO_SERVER_PORT = 5555;
+    public static final int NIO_SERVER_PORT = 6666;
+    public static final int MINA_NIO_SERVER_PORT= 7777;
+    public static final int MINA_APR_SERVER_PORT= 8888;
+
+    public static void print()
+    {
+       System.out.println("Request size: " + REQUEST_SIZE + " bytes");
+       System.out.println("Response size: " + RESPONSE_SIZE + " bytes");
+       System.out.println("TCP no delay: " + ENABLE_TCP_NO_DELAY);
+       try
+       {
+          System.out.format("TCP send buffer size: %d (default: %d)\n",
+                  TCP_BUFFER_SIZE, (new Socket()).getSendBufferSize());
+          System.out.format("TCP receive buffer size: %d (default: %d)\n",
+                  TCP_BUFFER_SIZE, (new Socket()).getReceiveBufferSize());
+       }
+       catch (SocketException e)
+       {
+          e.printStackTrace();
+       }
+    }
+}


Property changes on: projects/network-benchmark/src/network/CommonSetting.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Added: projects/network-benchmark/src/network/MINAAPRServer.java
===================================================================
--- projects/network-benchmark/src/network/MINAAPRServer.java	                        (rev 0)
+++ projects/network-benchmark/src/network/MINAAPRServer.java	2008-03-22 11:43:10 UTC (rev 3912)
@@ -0,0 +1,26 @@
+package network;
+
+import static network.CommonSetting.*;
+
+import org.apache.mina.transport.socket.SocketAcceptor;
+import org.apache.mina.transport.socket.apr.AprSocketAcceptor;
+
+public class MINAAPRServer extends MINAServer
+{
+    @Override
+    protected SocketAcceptor newAcceptor()
+    {
+        return new AprSocketAcceptor();
+    }
+
+    @Override
+    protected int getPort()
+    {
+        return MINA_APR_SERVER_PORT;
+    }
+
+    public static void main(String[] args)
+    {
+        new MINAAPRServer().run();
+    }
+}


Property changes on: projects/network-benchmark/src/network/MINAAPRServer.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Copied: projects/network-benchmark/src/network/MINANIOServer.java (from rev 3910, projects/network-benchmark/src/network/MINAServer.java)
===================================================================
--- projects/network-benchmark/src/network/MINANIOServer.java	                        (rev 0)
+++ projects/network-benchmark/src/network/MINANIOServer.java	2008-03-22 11:43:10 UTC (rev 3912)
@@ -0,0 +1,26 @@
+package network;
+
+import static network.CommonSetting.*;
+
+import org.apache.mina.transport.socket.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+
+public class MINANIOServer extends MINAServer
+{
+    @Override
+    protected SocketAcceptor newAcceptor()
+    {
+        return new NioSocketAcceptor();
+    }
+
+    @Override
+    protected int getPort()
+    {
+        return MINA_NIO_SERVER_PORT;
+    }
+
+    public static void main(String[] args)
+    {
+        new MINANIOServer().run();
+    }
+}

Modified: projects/network-benchmark/src/network/MINAServer.java
===================================================================
--- projects/network-benchmark/src/network/MINAServer.java	2008-03-22 10:52:42 UTC (rev 3911)
+++ projects/network-benchmark/src/network/MINAServer.java	2008-03-22 11:43:10 UTC (rev 3912)
@@ -1,58 +1,69 @@
 package network;
+import static network.CommonSetting.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
 import org.apache.mina.common.IoBuffer;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.mina.transport.socket.SocketAcceptor;
 
-public class MINAServer
+public abstract class MINAServer implements Runnable
 {
-   // Constants -----------------------------------------------------
+   protected abstract SocketAcceptor newAcceptor();
+   protected abstract int getPort();
 
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public static void main(String[] args) throws Exception
+   public final void run()
    {
-      final NioSocketAcceptor server = new NioSocketAcceptor();
-      if (NetworkClientTest.TCP_BUFFER_SIZE != -1)
+      final SocketAcceptor server = newAcceptor();
+      if (TCP_BUFFER_SIZE != -1)
       {
-         server.getSessionConfig().setSendBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
-         server.getSessionConfig().setReceiveBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
+         server.getSessionConfig().setSendBufferSize(TCP_BUFFER_SIZE);
+         server.getSessionConfig().setReceiveBufferSize(TCP_BUFFER_SIZE);
       }
-      server.setCloseOnDeactivation(false);
+      server.getSessionConfig().setTcpNoDelay(ENABLE_TCP_NO_DELAY);
+
+      final IoBuffer response = IoBuffer.allocate(RESPONSE_SIZE);
+      for (int i = 0; i < RESPONSE_SIZE; i ++) {
+          response.put(i, (byte) ((i % 10) + '0'));
+      }
+
       server.setHandler(new IoHandlerAdapter()
       {
+         @Override
          public void messageReceived(IoSession session, Object message)
                throws Exception
          {
-            IoBuffer buffer = (IoBuffer) message;
-            IoBuffer response = buffer.getSlice(NetworkClientTest.RESPONSE_SIZE);
-            session.write(response);
+
+            int receivedRequests = (int) (session.getReadBytes() / REQUEST_SIZE);
+            int sentResponses = (int) ((session.getWrittenBytes() + session.getScheduledWriteBytes()) / RESPONSE_SIZE);
+            for (int i = receivedRequests - sentResponses; i > 0; i --)
+            {
+               session.write(response.duplicate());
+            }
          }
 
          @Override
-         public void sessionCreated(IoSession session) throws Exception
+         public void sessionOpened(IoSession session) throws Exception
          {
             System.out.println("new client...");
          }
+
+         @Override
+         public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+         {
+            cause.printStackTrace();
+         }
       });
-      server.bind(NetworkClientTest.MINA_ADDRESS);
 
-      System.out.println("MINA Server bound to "
-            + NetworkClientTest.MINA_ADDRESS);
-
+      try {
+         server.bind(new InetSocketAddress(getPort()));
+         System.out.println(
+                 "MINA " + server.getTransportMetadata().getProviderName() +
+                 " Server bound to port " + getPort());
+      } catch (IOException e) {
+         e.printStackTrace();
+      }
    }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
 }

Modified: projects/network-benchmark/src/network/NIOServer.java
===================================================================
--- projects/network-benchmark/src/network/NIOServer.java	2008-03-22 10:52:42 UTC (rev 3911)
+++ projects/network-benchmark/src/network/NIOServer.java	2008-03-22 11:43:10 UTC (rev 3912)
@@ -1,11 +1,17 @@
 package network;
 
+import static network.CommonSetting.*;
+
+import java.net.InetSocketAddress;
+import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.Set;
 
 public class NIOServer
@@ -24,19 +30,24 @@
    {
       ServerSocketChannel server = ServerSocketChannel.open();
       server.configureBlocking(false);
-      if (NetworkClientTest.TCP_BUFFER_SIZE != -1)
-         server.socket().setReceiveBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
-      server.socket().bind(NetworkClientTest.NIO_ADDRESS);
+      if (TCP_BUFFER_SIZE != -1)
+      {
+         server.socket().setReceiveBufferSize(TCP_BUFFER_SIZE);
+      }
+      server.socket().bind(new InetSocketAddress(NIO_SERVER_PORT));
 
-      System.out
-            .println("NIO Server bound to " + NetworkClientTest.NIO_ADDRESS);
+      System.out.println(
+              "NIO Server bound to port " + NIO_SERVER_PORT);
 
       Selector selector = Selector.open();
       server.register(selector, SelectionKey.OP_ACCEPT);
 
-      ByteBuffer buf = ByteBuffer.allocate(NetworkClientTest.MESSAGE_SIZE - NetworkClientTest.RESPONSE_SIZE);
-      ByteBuffer response = ByteBuffer.allocate(NetworkClientTest.RESPONSE_SIZE);
 
+      ByteBuffer response = ByteBuffer.allocate(RESPONSE_SIZE);
+      for (int i = 0; i < RESPONSE_SIZE; i ++) {
+          response.put(i, (byte) ((i % 10) + '0'));
+      }
+
       // Wait for something of interest to happen
       while (selector.select() > 0)
       {
@@ -47,7 +58,6 @@
          // Walk through set
          while (readyItor.hasNext())
          {
-
             // Get key from set
             SelectionKey key = readyItor.next();
 
@@ -57,38 +67,71 @@
             if (key.isAcceptable())
             {
                // Get channel
-               ServerSocketChannel keyChannel = (ServerSocketChannel) key
-                     .channel();
+               ServerSocketChannel keyChannel = (ServerSocketChannel) key.channel();
 
                // Accept request
                SocketChannel socket = keyChannel.accept();
-               if (NetworkClientTest.TCP_BUFFER_SIZE != -1)
-                  socket.socket().setSendBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
+               socket.socket().setTcpNoDelay(ENABLE_TCP_NO_DELAY);
+               if (TCP_BUFFER_SIZE != -1) {
+                  socket.socket().setSendBufferSize(TCP_BUFFER_SIZE);
+               }
                System.out.println("new client...");
                socket.configureBlocking(false);
-               socket.register(selector, SelectionKey.OP_READ);
-            } else if (key.isReadable())
+               SelectionKey clientKey = socket.register(selector, SelectionKey.OP_READ);
+               clientKey.attach(new Context(socket));
+               continue;
+            }
+
+            SocketChannel channel = (SocketChannel) key.channel();
+            Context ctx = (Context) key.attachment();
+            if (key.isReadable())
             {
-               response.clear();
-               buf.clear();
-               SocketChannel channel = (SocketChannel) key.channel();
-               long readBytes = channel.read(new ByteBuffer[] {buf, response});
-               if (readBytes == 0)
+               ByteBuffer readBuf = ctx.readBuffer;
+               readBuf.clear();
+               int localReadBytes = channel.read(readBuf);
+               if (localReadBytes < 0)
                {
-                  System.err.println("READ BUFFER UNDERRUN");
-               } else if (readBytes < 0)
-               {
                   channel.close();
                   key.cancel();
-                  // server.close();
-                  // selector.close();
-               } else
+               }
+               else
                {
-                  response.flip();
-                  if (channel.write(response) == 0)
+                  ctx.readBytes += localReadBytes;
+               }
+
+               int receivedRequests = (int) (ctx.readBytes / REQUEST_SIZE);
+               int sentResponses = (int) (ctx.writtenBytes / RESPONSE_SIZE) + ctx.writeBufferQueue.size();
+               for (int i = receivedRequests - sentResponses; i > 0; i --)
+               {
+                  ctx.writeBufferQueue.offer(response.duplicate());
+               }
+            }
+
+            if (key.isValid() && (key.isWritable() || (key.interestOps() == SelectionKey.OP_READ && !ctx.writeBufferQueue.isEmpty())))
+            {
+               for (;;) {
+                  ByteBuffer writeBuf = ctx.writeBufferQueue.peek();
+                  if (writeBuf == null) {
+                      if (key.interestOps() != SelectionKey.OP_READ)
+                      {
+                         key.interestOps(SelectionKey.OP_READ);
+                      }
+                      break;
+                  }
+
+                  ctx.writtenBytes += channel.write(writeBuf);
+                  if (writeBuf.hasRemaining())
                   {
-                     System.err.println("WRITE BUFFER FULL");
+                     if (key.interestOps() == SelectionKey.OP_READ)
+                     {
+                        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+                     }
+                     break;
                   }
+                  else
+                  {
+                     ctx.writeBufferQueue.poll();
+                  }
                }
             }
          }
@@ -102,4 +145,15 @@
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
+
+   private static class Context {
+       private final ByteBuffer readBuffer;
+       private final Queue<ByteBuffer> writeBufferQueue = new LinkedList<ByteBuffer>();
+       private long readBytes;
+       private long writtenBytes;
+
+       private Context(SocketChannel ch) throws SocketException {
+           readBuffer = ByteBuffer.allocate(ch.socket().getReceiveBufferSize());
+       }
+   }
 }

Modified: projects/network-benchmark/src/network/NetworkClientTest.java
===================================================================
--- projects/network-benchmark/src/network/NetworkClientTest.java	2008-03-22 10:52:42 UTC (rev 3911)
+++ projects/network-benchmark/src/network/NetworkClientTest.java	2008-03-22 11:43:10 UTC (rev 3912)
@@ -1,6 +1,7 @@
 package network;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static network.ClientSetting.*;
+import static network.CommonSetting.*;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -10,7 +11,7 @@
 import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.UnknownHostException;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import junit.framework.TestCase;
@@ -19,114 +20,145 @@
 import org.apache.mina.common.IoBuffer;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.ReadFuture;
+import org.apache.mina.transport.socket.SocketConnector;
+import org.apache.mina.transport.socket.apr.AprSocketConnector;
 import org.apache.mina.transport.socket.nio.NioSocketConnector;
 
 public class NetworkClientTest extends TestCase
 {
-   // Configurable properties:
-   private static final String SERVER_HOST = "192.168.0.4";
-   private static final long DURATION = 10000; // in ms
-   public static final int MESSAGE_SIZE = 1024; // in bytes
-   public static final int RESPONSE_SIZE = 24; // in bytes
-   private static final boolean ENABLE_TCP_NO_DELAY = false;
-   static final int TCP_BUFFER_SIZE = -1; // -1 to not set it
-
-   // Constants -----------------------------------------------------
-
-   public static final SocketAddress BIO_ADDRESS = new InetSocketAddress(
-         SERVER_HOST, 5555);
-   public static final SocketAddress NIO_ADDRESS = new InetSocketAddress(
-         SERVER_HOST, 6666);
-   public static final SocketAddress MINA_ADDRESS = new InetSocketAddress(
-         SERVER_HOST, 7777);
-
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
 
    static {
-      System.out.println("Duration: " + DURATION + " ms");
-      System.out.println("Message size: " + MESSAGE_SIZE + " bytes");
-      System.out.println("Response size: " + RESPONSE_SIZE + " bytes");
-      System.out.println("TCP no delay: " + ENABLE_TCP_NO_DELAY);
-      try
-      {
-         System.out.format("TCP send buffer size: %d (default: %d)\n",
-         		TCP_BUFFER_SIZE, (new Socket()).getSendBufferSize());
-         System.out.format("TCP receive buffer size: %d (default: %d)\n",
-               TCP_BUFFER_SIZE, (new Socket()).getReceiveBufferSize());
-      } catch (SocketException e)
-      {
-         e.printStackTrace();
-      }
+      ClientSetting.print();
+      CommonSetting.print();
       System.out.println();
    }
 
    // Constructors --------------------------------------------------
 
-   // Public --------------------------------------------------------  
-   
+   // Public --------------------------------------------------------
+
    public void test_Blocking_BIOClient_To_BIOServer() throws Exception
    {
-      startBlockingBioClient(BIO_ADDRESS);
+      startBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, BIO_SERVER_PORT));
    }
 
    public void test_NonBlocking_BIOClient_To_BIOServer() throws Exception
    {
-      startNonBlockingBioClient(BIO_ADDRESS);
+      startNonBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, BIO_SERVER_PORT));
    }
 
    public void test_Blocking_BIOClient_To_NIOServer() throws Exception
    {
-      startBlockingBioClient(NIO_ADDRESS);
+      startBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, NIO_SERVER_PORT));
    }
 
    public void test_NonBlocking_BIOClient_To_NIOServer() throws Exception
    {
-      startNonBlockingBioClient(NIO_ADDRESS);
+      startNonBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, NIO_SERVER_PORT));
    }
 
-   public void test_Blocking_BIOClient_To_MINAServer() throws Exception
+   public void test_Blocking_BIOClient_To_MINANIOServer() throws Exception
    {
-      startBlockingBioClient(MINA_ADDRESS);
+      startBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, MINA_NIO_SERVER_PORT));
    }
 
-   public void test_NonBlocking_BIOClient_To_MINAServer() throws Exception
+   public void test_NonBlocking_BIOClient_To_MINANIOServer() throws Exception
    {
-      startNonBlockingBioClient(MINA_ADDRESS);
+      startNonBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, MINA_NIO_SERVER_PORT));
    }
 
-   public void test_Blocking_MINAClient_To_BIOServer() throws Exception
+   public void test_Blocking_BIOClient_To_MINAAPRServer() throws Exception
    {
-      startBlockingMINAClient(BIO_ADDRESS);
+      startBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, MINA_APR_SERVER_PORT));
    }
 
-   public void _test_NonBlocking_MINAClient_To_BIOServer() throws Exception
+   public void test_NonBlocking_BIOClient_To_MINAAPRServer() throws Exception
    {
-      startNonBlockingMINAClient(BIO_ADDRESS);
+      startNonBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, MINA_APR_SERVER_PORT));
    }
 
-   public void test_Blocking_MINAClient_To_NIOServer() throws Exception
+   public void test_Blocking_MINANIOClient_To_BIOServer() throws Exception
    {
-      startBlockingMINAClient(NIO_ADDRESS);
+      startBlockingMINAClient(new NioSocketConnector(), BIO_SERVER_PORT);
    }
 
-   public void _test_NonBlocking_MINAClient_To_NIOServer() throws Exception
+   public void test_NonBlocking_MINANIOClient_To_BIOServer() throws Exception
    {
-      startNonBlockingMINAClient(NIO_ADDRESS);
+      startNonBlockingMINAClient(new NioSocketConnector(), BIO_SERVER_PORT);
    }
 
-   public void test_Blocking_MINAClient_To_MINAServer() throws Exception
+   public void test_Blocking_MINAAPRClient_To_BIOServer() throws Exception
    {
-      startBlockingMINAClient(MINA_ADDRESS);
+      startBlockingMINAClient(new AprSocketConnector(), BIO_SERVER_PORT);
    }
 
-   public void _test_NonBlocking_MINAClient_To_MINAServer() throws Exception
+   public void test_NonBlocking_MINAAPRClient_To_BIOServer() throws Exception
    {
-      startNonBlockingMINAClient(MINA_ADDRESS);
+      startNonBlockingMINAClient(new AprSocketConnector(), BIO_SERVER_PORT);
    }
 
+   public void test_Blocking_MINANIOClient_To_NIOServer() throws Exception
+   {
+      startBlockingMINAClient(new NioSocketConnector(), NIO_SERVER_PORT);
+   }
+
+   public void test_NonBlocking_MINANIOClient_To_NIOServer() throws Exception
+   {
+      startNonBlockingMINAClient(new NioSocketConnector(), NIO_SERVER_PORT);
+   }
+
+   public void test_Blocking_MINAAPRClient_To_NIOServer() throws Exception
+   {
+      startBlockingMINAClient(new AprSocketConnector(), NIO_SERVER_PORT);
+   }
+
+   public void test_NonBlocking_MINAAPRClient_To_NIOServer() throws Exception
+   {
+      startNonBlockingMINAClient(new AprSocketConnector(), NIO_SERVER_PORT);
+   }
+
+   public void test_Blocking_MINANIOClient_To_MINANIOServer() throws Exception
+   {
+      startBlockingMINAClient(new NioSocketConnector(), MINA_NIO_SERVER_PORT);
+   }
+
+   public void test_NonBlocking_MINANIOClient_To_MINANIOServer() throws Exception
+   {
+      startNonBlockingMINAClient(new NioSocketConnector(), MINA_NIO_SERVER_PORT);
+   }
+
+   public void test_Blocking_MINANIOClient_To_MINAAPRServer() throws Exception
+   {
+      startBlockingMINAClient(new NioSocketConnector(), MINA_APR_SERVER_PORT);
+   }
+
+   public void test_NonBlocking_MINANIOClient_To_MINAAPRServer() throws Exception
+   {
+      startNonBlockingMINAClient(new NioSocketConnector(), MINA_APR_SERVER_PORT);
+   }
+
+   public void test_Blocking_MINAAPRClient_To_MINANIOServer() throws Exception
+   {
+      startBlockingMINAClient(new AprSocketConnector(), MINA_NIO_SERVER_PORT);
+   }
+
+   public void test_NonBlocking_MINAAPRClient_To_MINANIOServer() throws Exception
+   {
+      startNonBlockingMINAClient(new AprSocketConnector(), MINA_NIO_SERVER_PORT);
+   }
+
+   public void test_Blocking_MINAAPRClient_To_MINAAPRServer() throws Exception
+   {
+      startBlockingMINAClient(new AprSocketConnector(), MINA_APR_SERVER_PORT);
+   }
+
+   public void test_NonBlocking_MINAAprClient_To_MINAAPRServer() throws Exception
+   {
+      startNonBlockingMINAClient(new AprSocketConnector(), MINA_APR_SERVER_PORT);
+   }
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -147,67 +179,64 @@
       clientSocket.connect(address);
       return clientSocket;
    }
-   
-   private static NioSocketConnector newConfiguredConnector()
+
+   private static void configureConnector(SocketConnector connector)
    {
-      NioSocketConnector client = new NioSocketConnector();
-      client.getSessionConfig().setTcpNoDelay(ENABLE_TCP_NO_DELAY);
+      connector.getSessionConfig().setTcpNoDelay(ENABLE_TCP_NO_DELAY);
       if (TCP_BUFFER_SIZE != -1)
       {
-         client.getSessionConfig().setSendBufferSize(TCP_BUFFER_SIZE);
-         client.getSessionConfig().setReceiveBufferSize(TCP_BUFFER_SIZE);
+         connector.getSessionConfig().setSendBufferSize(TCP_BUFFER_SIZE);
+         connector.getSessionConfig().setReceiveBufferSize(TCP_BUFFER_SIZE);
       }
-      client.getSessionConfig().setReuseAddress(false);
-      client.getSessionConfig().setUseReadOperation(true);
-      return client;
    }
 
    private byte[] createMessage()
    {
-      byte[] b = new byte[MESSAGE_SIZE];
+      byte[] b = new byte[REQUEST_SIZE];
       for (int i = 0; i < b.length; i++)
       {
-         b[i] = 66;
+         b[i] = (byte) ((i % 10) + '0');
       }
       return b;
    }
-   
-   private byte[] lastMessage()
-   {
-      byte[] b = new byte[MESSAGE_SIZE];
-      for (int i = 0; i < b.length; i++)
-      {
-         b[i] = 99;
-      }
-      return b;
-   }
-   
+
    private void startBlockingBioClient(SocketAddress address)
          throws UnknownHostException, IOException
    {
       Socket clientSocket = newConfiguredSocket(address);
       OutputStream os = clientSocket.getOutputStream();
       InputStream is = clientSocket.getInputStream();
-      
+
       long start = System.currentTimeMillis();
       int count = 0;
       byte[] message = createMessage();
       byte[] response = new byte[RESPONSE_SIZE];
+
+      outerLoop:
       while (System.currentTimeMillis() - start < DURATION)
       {
          os.write(message);
-         
-         int size = is.read(response);
-         if (size != RESPONSE_SIZE)
-         {
-            throw new IllegalStateException("Wrong message size");
+         int readBytes = 0;
+         for (;;) {
+            int localReadBytes = is.read(response, readBytes, response.length - readBytes);
+            if (localReadBytes < 0) {
+                System.out.println("Connection closed by server.");
+                break outerLoop;
+            }
+            readBytes += localReadBytes;
+            if (readBytes == RESPONSE_SIZE) {
+                count ++;
+                break;
+            }
          }
-         count++;
       }
+
+      is.close();
+      os.close();
       clientSocket.close();
 
       long periodInMs = System.currentTimeMillis() - start;
-      display(count, periodInMs);
+      display(count, count, periodInMs);
    }
 
    private void startNonBlockingBioClient(SocketAddress address)
@@ -216,164 +245,175 @@
       final Socket clientSocket = newConfiguredSocket(address);
       OutputStream os = clientSocket.getOutputStream();
       final InputStream is = clientSocket.getInputStream();
-      
-      final AtomicLong receivedCount = new AtomicLong(0);
-      final CountDownLatch latch = new CountDownLatch(1);
+
+      final AtomicLong receivedBytes = new AtomicLong(0);
       Thread receiver = new Thread()
       {
          @Override
          public void run()
          {
-            byte[] b = new byte[RESPONSE_SIZE];
-            
-            while (true)
+            try
             {
-               try
+               byte[] response = new byte[clientSocket.getReceiveBufferSize()];
+
+               while (true)
                {
-                  int size = is.read(b);
-                  if (size != RESPONSE_SIZE)
-                     throw new IllegalStateException("Wrong size: " + size);
-                  if (b[0] == 99)
-                  {
-                     latch.countDown();
-                     return;
+                  int readBytes = is.read(response);
+                  if (readBytes < 0) {
+                      System.out.println("Connection closed by server.");
+                      break;
                   }
-                  if (b[0] != 66 && b[0] != 99)
-                  {
-                     throw new IllegalStateException("Wrong byte");
-                  }
-                  receivedCount.incrementAndGet();
-               } catch (IOException e)
-               {
+                  receivedBytes.addAndGet(readBytes);
+               }
+            }
+            catch (SocketException e)
+            {
+               if (e.getMessage().indexOf("closed") < 0) {
                   e.printStackTrace();
-                  return;
                }
             }
+            catch (IOException e)
+            {
+               e.printStackTrace();
+            }
          }
       };
 
       receiver.start();
 
-      byte[] message = createMessage();
+      byte[] request = createMessage();
       long start = System.currentTimeMillis();
+      int sentRequests = 0;
       while (System.currentTimeMillis() - start < DURATION)
       {
-         os.write(message);
+         os.write(request);
+         sentRequests ++;
       }
-      os.write(lastMessage());
-      
-      assertTrue("did not receive all responses", latch.await(4 * DURATION, MILLISECONDS));
-      long periodInMs = System.currentTimeMillis() - start;
-      
+
+      while (receivedBytes.get() / RESPONSE_SIZE < sentRequests) {
+          Thread.yield();
+      }
+
+      is.close();
+      os.close();
       clientSocket.close();
 
-      display(receivedCount.longValue(), periodInMs);
+      long periodInMs = System.currentTimeMillis() - start;
+
+      display(sentRequests, receivedBytes.get() / RESPONSE_SIZE, periodInMs);
    }
 
-   private void startBlockingMINAClient(SocketAddress address)
+   private void startBlockingMINAClient(SocketConnector client, int port)
    {
-      NioSocketConnector client = newConfiguredConnector();
-      
-      final AtomicLong receivedCount = new AtomicLong(0);
+      configureConnector(client);
+      InetSocketAddress address = new InetSocketAddress(SERVER_HOSTNAME, port);
+      final IoBuffer request = IoBuffer.wrap(createMessage());
+      final AtomicBoolean shutdown = new AtomicBoolean();
       client.setHandler(new IoHandlerAdapter()
       {
          @Override
+         public void sessionOpened(IoSession session) {
+            // Send the first message
+            session.write(request.duplicate());
+         }
+         @Override
          public void messageReceived(IoSession session, Object message)
                throws Exception
          {
-            receivedCount.incrementAndGet();
+            if (!shutdown.get())
+            {
+               int sentRequests = (int) (session.getWrittenBytes() / REQUEST_SIZE);
+               int receivedResponses = (int) (session.getReadBytes() / RESPONSE_SIZE);
+               if (receivedResponses != 0 && receivedResponses == sentRequests) {
+                   session.write(request.duplicate());
+               }
+            }
          }
       });
       ConnectFuture future = client.connect(address).awaitUninterruptibly();
       IoSession session = future.getSession();
 
-      IoBuffer buffer = IoBuffer.allocate(MESSAGE_SIZE);
-      buffer.put(createMessage());
-      buffer.flip();
-
-      long sentCount = 0;
       long start = System.currentTimeMillis();
-      while (System.currentTimeMillis() - start < DURATION)
-      {
-         session.write(buffer.duplicate());
-         sentCount++;
-         ReadFuture readFuture = session.read();
-         readFuture.awaitUninterruptibly();
-         readFuture.getMessage();
+      try {
+         Thread.sleep(DURATION);
+      } catch (InterruptedException e) {
+         e.printStackTrace();
       }
-      session.close().awaitUninterruptibly(DURATION, MILLISECONDS);
+      shutdown.set(true);
 
-      assertEquals(sentCount, receivedCount.longValue());
+      while (client.getReadBytes() / RESPONSE_SIZE < (client.getWrittenBytes() + client.getScheduledWriteBytes()) / REQUEST_SIZE) {
+          Thread.yield();
+      }
+
+      session.closeOnFlush().awaitUninterruptibly();
       long periodInMs = System.currentTimeMillis() - start;
-      display(receivedCount.longValue(), periodInMs);
+      display(client.getWrittenBytes() / REQUEST_SIZE, client.getReadBytes() / RESPONSE_SIZE, periodInMs);
 
       client.dispose();
    }
 
-   private void startNonBlockingMINAClient(SocketAddress address)
-         throws InterruptedException
+   private void startNonBlockingMINAClient(SocketConnector client, int port)
    {
-      NioSocketConnector client = newConfiguredConnector();
+       configureConnector(client);
+       InetSocketAddress address = new InetSocketAddress(SERVER_HOSTNAME, port);
+       final IoBuffer request = IoBuffer.wrap(createMessage());
+       final AtomicBoolean shutdown = new AtomicBoolean();
+       final int maxScheduledWriteBytes;
+       if (TCP_BUFFER_SIZE != -1)
+       {
+          maxScheduledWriteBytes = TCP_BUFFER_SIZE * 4;
+       }
+       else
+       {
+          maxScheduledWriteBytes = client.getSessionConfig().getSendBufferSize() * 4;
+       }
 
-      final AtomicLong receivedCount = new AtomicLong(0);
-      final CountDownLatch latch = new CountDownLatch(1);
+       client.setHandler(new IoHandlerAdapter()
+       {
+          @Override
+          public void sessionOpened(IoSession session) {
+             // Push the requests in advance.
+             do {
+                 session.write(request.duplicate());
+             } while (session.getScheduledWriteBytes() < maxScheduledWriteBytes);
+          }
 
-      client.setHandler(new IoHandlerAdapter()
-      {
-         byte[] b = new byte[RESPONSE_SIZE];
-         
-         @Override
-         public void messageReceived(IoSession session, Object message)
-               throws Exception
-         {
-            IoBuffer buffer = (IoBuffer) message;
-            buffer.get(b);
-            if (b[0] == 99)
-            {
-               latch.countDown();
-            }
-         }
-      });
-      ConnectFuture future = client.connect(address).awaitUninterruptibly();
-      IoSession session = future.getSession();
+          @Override
+          public void messageSent(IoSession session, Object message) {
+             if (!shutdown.get()) {
+                do {
+                   session.write(request.duplicate());
+                } while (session.getScheduledWriteBytes() < maxScheduledWriteBytes);
+             }
+          }
+       });
+       ConnectFuture future = client.connect(address).awaitUninterruptibly();
+       IoSession session = future.getSession();
 
-      IoBuffer buffer = IoBuffer.allocate(MESSAGE_SIZE);
-      buffer.put(createMessage());
-      buffer.flip();
+       long start = System.currentTimeMillis();
+       try {
+          Thread.sleep(DURATION);
+       } catch (InterruptedException e) {
+          e.printStackTrace();
+       }
+       shutdown.set(true);
 
-      long sentCount = 0;
-      long start = System.currentTimeMillis();
-      while (System.currentTimeMillis() - start < DURATION)
-      {
-         session.write(buffer.duplicate());
-         sentCount++;
-      }
+       while (client.getReadBytes() / RESPONSE_SIZE < (client.getWrittenBytes() + client.getScheduledWriteBytes()) / REQUEST_SIZE) {
+           Thread.yield();
+       }
 
-      buffer = IoBuffer.allocate(MESSAGE_SIZE);
-      buffer.put(lastMessage());
-      buffer.flip();
-      session.write(buffer).awaitUninterruptibly(DURATION, MILLISECONDS);
-      sentCount++;
+       session.closeOnFlush().awaitUninterruptibly();
+       long periodInMs = System.currentTimeMillis() - start;
+       display(client.getWrittenBytes() / REQUEST_SIZE, client.getReadBytes() / RESPONSE_SIZE, periodInMs);
 
-      boolean receivedLastResponse = latch.await(4 * DURATION, MILLISECONDS);
-      if (!receivedLastResponse)
-      {
-         fail("received " + receivedCount.longValue() + " responses on " + sentCount + "expected");
-      }
-      session.close().awaitUninterruptibly(DURATION, MILLISECONDS);
-
-      assertEquals(sentCount, receivedCount.longValue());
-      long periodInMs = System.currentTimeMillis() - start;
-      display(receivedCount.longValue(), periodInMs);
-
-      client.dispose();
+       client.dispose();
    }
 
-   private void display(long count, long periodInMs)
+   private void display(long requestCount, long responseCount, long periodInMs)
    {
       String name = getName().replace("test_", "").replace('_', ' ');
-      double rate = 1000 * (double) count / periodInMs;
-      System.out.format("%-36s: %6.0f inv./s (%d inv. in %dms)\n", name, rate, count, periodInMs);
+      double rate = 1000 * (double) responseCount / periodInMs;
+      System.out.format("%41s: %6.0f inv./s (%8d inv. in %d ms, sent %8d)\n", name, rate, responseCount, periodInMs, requestCount);
    }
 
    // Inner classes -------------------------------------------------




More information about the jboss-cvs-commits mailing list