[jboss-cvs] JBoss Messaging SVN: r3912 - in projects/network-benchmark: lib and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Mar 22 07:43:10 EDT 2008
Author: trustin
Date: 2008-03-22 07:43:10 -0400 (Sat, 22 Mar 2008)
New Revision: 3912
Added:
projects/network-benchmark/src/network/AllInOneServer.java
projects/network-benchmark/src/network/ClientSetting.java
projects/network-benchmark/src/network/CommonSetting.java
projects/network-benchmark/src/network/MINAAPRServer.java
projects/network-benchmark/src/network/MINANIOServer.java
Removed:
projects/network-benchmark/lib/mina-core-2.0.0-M1.jar
projects/network-benchmark/lib/mina-core-2.0.0-M2-20080317.150334-8.jar
Modified:
projects/network-benchmark/.classpath
projects/network-benchmark/src/network/BIOServer.java
projects/network-benchmark/src/network/MINAServer.java
projects/network-benchmark/src/network/NIOServer.java
projects/network-benchmark/src/network/NetworkClientTest.java
Log:
* Extracted configurable parameters into CommonSetting and ClientSetting
* Made sure request count and response count match exactly
* Added support for APR transport
* Fixed non-blocking MINA client
* Added AllInOneServer which launcher all four servers at once
Modified: projects/network-benchmark/.classpath
===================================================================
--- projects/network-benchmark/.classpath 2008-03-22 10:52:42 UTC (rev 3911)
+++ projects/network-benchmark/.classpath 2008-03-22 11:43:10 UTC (rev 3912)
@@ -6,7 +6,9 @@
<classpathentry kind="lib" path="lib/log4j.jar"/>
<classpathentry kind="lib" path="lib/slf4j-api-1.4.3.jar"/>
<classpathentry kind="lib" path="lib/slf4j-log4j12.jar"/>
- <classpathentry kind="lib" path="lib/mina-core-2.0.0-M2-20080317.150334-8.jar"/>
<classpathentry kind="lib" path="lib/junit.jar"/>
+ <classpathentry kind="lib" path="lib/mina-core-2.0.0-M2-SNAPSHOT.jar"/>
+ <classpathentry kind="lib" path="lib/mina-transport-apr-2.0.0-M2-SNAPSHOT.jar"/>
+ <classpathentry kind="lib" path="lib/tomcat-apr-5.5.15.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Deleted: projects/network-benchmark/lib/mina-core-2.0.0-M1.jar
===================================================================
(Binary files differ)
Deleted: projects/network-benchmark/lib/mina-core-2.0.0-M2-20080317.150334-8.jar
===================================================================
(Binary files differ)
Added: projects/network-benchmark/src/network/AllInOneServer.java
===================================================================
--- projects/network-benchmark/src/network/AllInOneServer.java (rev 0)
+++ projects/network-benchmark/src/network/AllInOneServer.java 2008-03-22 11:43:10 UTC (rev 3912)
@@ -0,0 +1,31 @@
+package network;
+
+public class AllInOneServer {
+ public static void main(String[] args) throws Exception {
+ CommonSetting.print();
+ System.out.println();
+
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ BIOServer.main(new String[0]);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ NIOServer.main(new String[0]);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ new Thread(new MINANIOServer()).start();
+ new Thread(new MINAAPRServer()).start();
+ }
+}
Property changes on: projects/network-benchmark/src/network/AllInOneServer.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Modified: projects/network-benchmark/src/network/BIOServer.java
===================================================================
--- projects/network-benchmark/src/network/BIOServer.java 2008-03-22 10:52:42 UTC (rev 3911)
+++ projects/network-benchmark/src/network/BIOServer.java 2008-03-22 11:43:10 UTC (rev 3912)
@@ -1,8 +1,14 @@
package network;
+
+import static network.CommonSetting.*;
+
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
public class BIOServer
{
@@ -19,42 +25,96 @@
public static void main(String[] args) throws Exception
{
ServerSocket serverSocket = new ServerSocket();
- if (NetworkClientTest.TCP_BUFFER_SIZE != -1)
- serverSocket.setReceiveBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
- serverSocket.bind(NetworkClientTest.BIO_ADDRESS);
- System.out
- .println("BIO Server bound to " + NetworkClientTest.BIO_ADDRESS);
+ if (TCP_BUFFER_SIZE != -1)
+ {
+ serverSocket.setReceiveBufferSize(TCP_BUFFER_SIZE);
+ }
+ serverSocket.bind(new InetSocketAddress(BIO_SERVER_PORT));
+ System.out.println(
+ "BIO Server bound to port " + BIO_SERVER_PORT);
- byte[] bytes = new byte[NetworkClientTest.MESSAGE_SIZE];
-
+ // Create the executor and start one worker thread in advance.
+ Executor executor = Executors.newCachedThreadPool();
+ executor.execute(new Runnable() { public void run() {} });
+
while (true)
{
Socket clientSocket = serverSocket.accept();
- if (NetworkClientTest.TCP_BUFFER_SIZE != -1)
- clientSocket.setSendBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
- System.out.println("new client...");
- OutputStream os = clientSocket.getOutputStream();
+ executor.execute(new Worker(clientSocket));
+ }
+ }
- InputStream is = clientSocket.getInputStream();
+ // Package protected ---------------------------------------------
- while (is.read(bytes) != -1)
- {
- os.write(bytes, 0, NetworkClientTest.RESPONSE_SIZE);
- }
+ // Protected -----------------------------------------------------
- os.close();
+ // Private -------------------------------------------------------
- is.close();
+ // Inner classes -------------------------------------------------
- clientSocket.close();
+ private static class Worker implements Runnable
+ {
+ private final Socket s;
+
+ private Worker(Socket s)
+ {
+ this.s = s;
}
- }
- // Package protected ---------------------------------------------
+ public void run()
+ {
+ byte[] response = new byte[RESPONSE_SIZE];
+ for (int i = 0; i < RESPONSE_SIZE; i ++) {
+ response[i] = (byte) ((i % 10) + '0');
+ }
- // Protected -----------------------------------------------------
+ try
+ {
+ int bufferSize;
+ s.setTcpNoDelay(ENABLE_TCP_NO_DELAY);
+ if (TCP_BUFFER_SIZE != -1)
+ {
+ bufferSize = TCP_BUFFER_SIZE;
+ s.setSendBufferSize(TCP_BUFFER_SIZE);
+ }
+ else
+ {
+ bufferSize = s.getReceiveBufferSize();
+ }
+ byte[] bytes = new byte[bufferSize];
+ System.out.println("new client...");
+ OutputStream os = s.getOutputStream();
+ InputStream is = s.getInputStream();
- // Private -------------------------------------------------------
+ long readBytes = 0;
+ int sentResponses = 0;
- // Inner classes -------------------------------------------------
+ for (;;)
+ {
+ int localReadBytes = is.read(bytes);
+ if (localReadBytes < 0)
+ {
+ break;
+ }
+
+ readBytes += localReadBytes;
+
+ int receivedRequests = (int) (readBytes / REQUEST_SIZE);
+ for (int i = receivedRequests - sentResponses; i > 0; i --)
+ {
+ os.write(response, 0, RESPONSE_SIZE);
+ sentResponses ++;
+ }
+ }
+
+ os.close();
+ is.close();
+ s.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
}
Added: projects/network-benchmark/src/network/ClientSetting.java
===================================================================
--- projects/network-benchmark/src/network/ClientSetting.java (rev 0)
+++ projects/network-benchmark/src/network/ClientSetting.java 2008-03-22 11:43:10 UTC (rev 3912)
@@ -0,0 +1,14 @@
+package network;
+
+
+
+public class ClientSetting {
+ public static final String SERVER_HOSTNAME = "127.0.0.1";
+ public static final long DURATION = 10000; // in ms
+
+ public static void print()
+ {
+ System.out.println("Host: " + SERVER_HOSTNAME);
+ System.out.println("Duration: " + DURATION + " ms");
+ }
+}
Property changes on: projects/network-benchmark/src/network/ClientSetting.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: projects/network-benchmark/src/network/CommonSetting.java
===================================================================
--- projects/network-benchmark/src/network/CommonSetting.java (rev 0)
+++ projects/network-benchmark/src/network/CommonSetting.java 2008-03-22 11:43:10 UTC (rev 3912)
@@ -0,0 +1,34 @@
+package network;
+
+import java.net.Socket;
+import java.net.SocketException;
+
+
+public class CommonSetting {
+ public static final int REQUEST_SIZE = 1024; // in bytes
+ public static final int RESPONSE_SIZE = 24; // in bytes
+ public static final boolean ENABLE_TCP_NO_DELAY = true;
+ public static final int TCP_BUFFER_SIZE = -1; // -1 to use the O/S defaults
+ public static final int BIO_SERVER_PORT = 5555;
+ public static final int NIO_SERVER_PORT = 6666;
+ public static final int MINA_NIO_SERVER_PORT= 7777;
+ public static final int MINA_APR_SERVER_PORT= 8888;
+
+ public static void print()
+ {
+ System.out.println("Request size: " + REQUEST_SIZE + " bytes");
+ System.out.println("Response size: " + RESPONSE_SIZE + " bytes");
+ System.out.println("TCP no delay: " + ENABLE_TCP_NO_DELAY);
+ try
+ {
+ System.out.format("TCP send buffer size: %d (default: %d)\n",
+ TCP_BUFFER_SIZE, (new Socket()).getSendBufferSize());
+ System.out.format("TCP receive buffer size: %d (default: %d)\n",
+ TCP_BUFFER_SIZE, (new Socket()).getReceiveBufferSize());
+ }
+ catch (SocketException e)
+ {
+ e.printStackTrace();
+ }
+ }
+}
Property changes on: projects/network-benchmark/src/network/CommonSetting.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Added: projects/network-benchmark/src/network/MINAAPRServer.java
===================================================================
--- projects/network-benchmark/src/network/MINAAPRServer.java (rev 0)
+++ projects/network-benchmark/src/network/MINAAPRServer.java 2008-03-22 11:43:10 UTC (rev 3912)
@@ -0,0 +1,26 @@
+package network;
+
+import static network.CommonSetting.*;
+
+import org.apache.mina.transport.socket.SocketAcceptor;
+import org.apache.mina.transport.socket.apr.AprSocketAcceptor;
+
+public class MINAAPRServer extends MINAServer
+{
+ @Override
+ protected SocketAcceptor newAcceptor()
+ {
+ return new AprSocketAcceptor();
+ }
+
+ @Override
+ protected int getPort()
+ {
+ return MINA_APR_SERVER_PORT;
+ }
+
+ public static void main(String[] args)
+ {
+ new MINAAPRServer().run();
+ }
+}
Property changes on: projects/network-benchmark/src/network/MINAAPRServer.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Copied: projects/network-benchmark/src/network/MINANIOServer.java (from rev 3910, projects/network-benchmark/src/network/MINAServer.java)
===================================================================
--- projects/network-benchmark/src/network/MINANIOServer.java (rev 0)
+++ projects/network-benchmark/src/network/MINANIOServer.java 2008-03-22 11:43:10 UTC (rev 3912)
@@ -0,0 +1,26 @@
+package network;
+
+import static network.CommonSetting.*;
+
+import org.apache.mina.transport.socket.SocketAcceptor;
+import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+
+public class MINANIOServer extends MINAServer
+{
+ @Override
+ protected SocketAcceptor newAcceptor()
+ {
+ return new NioSocketAcceptor();
+ }
+
+ @Override
+ protected int getPort()
+ {
+ return MINA_NIO_SERVER_PORT;
+ }
+
+ public static void main(String[] args)
+ {
+ new MINANIOServer().run();
+ }
+}
Modified: projects/network-benchmark/src/network/MINAServer.java
===================================================================
--- projects/network-benchmark/src/network/MINAServer.java 2008-03-22 10:52:42 UTC (rev 3911)
+++ projects/network-benchmark/src/network/MINAServer.java 2008-03-22 11:43:10 UTC (rev 3912)
@@ -1,58 +1,69 @@
package network;
+import static network.CommonSetting.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
-import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.apache.mina.transport.socket.SocketAcceptor;
-public class MINAServer
+public abstract class MINAServer implements Runnable
{
- // Constants -----------------------------------------------------
+ protected abstract SocketAcceptor newAcceptor();
+ protected abstract int getPort();
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public static void main(String[] args) throws Exception
+ public final void run()
{
- final NioSocketAcceptor server = new NioSocketAcceptor();
- if (NetworkClientTest.TCP_BUFFER_SIZE != -1)
+ final SocketAcceptor server = newAcceptor();
+ if (TCP_BUFFER_SIZE != -1)
{
- server.getSessionConfig().setSendBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
- server.getSessionConfig().setReceiveBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
+ server.getSessionConfig().setSendBufferSize(TCP_BUFFER_SIZE);
+ server.getSessionConfig().setReceiveBufferSize(TCP_BUFFER_SIZE);
}
- server.setCloseOnDeactivation(false);
+ server.getSessionConfig().setTcpNoDelay(ENABLE_TCP_NO_DELAY);
+
+ final IoBuffer response = IoBuffer.allocate(RESPONSE_SIZE);
+ for (int i = 0; i < RESPONSE_SIZE; i ++) {
+ response.put(i, (byte) ((i % 10) + '0'));
+ }
+
server.setHandler(new IoHandlerAdapter()
{
+ @Override
public void messageReceived(IoSession session, Object message)
throws Exception
{
- IoBuffer buffer = (IoBuffer) message;
- IoBuffer response = buffer.getSlice(NetworkClientTest.RESPONSE_SIZE);
- session.write(response);
+
+ int receivedRequests = (int) (session.getReadBytes() / REQUEST_SIZE);
+ int sentResponses = (int) ((session.getWrittenBytes() + session.getScheduledWriteBytes()) / RESPONSE_SIZE);
+ for (int i = receivedRequests - sentResponses; i > 0; i --)
+ {
+ session.write(response.duplicate());
+ }
}
@Override
- public void sessionCreated(IoSession session) throws Exception
+ public void sessionOpened(IoSession session) throws Exception
{
System.out.println("new client...");
}
+
+ @Override
+ public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+ {
+ cause.printStackTrace();
+ }
});
- server.bind(NetworkClientTest.MINA_ADDRESS);
- System.out.println("MINA Server bound to "
- + NetworkClientTest.MINA_ADDRESS);
-
+ try {
+ server.bind(new InetSocketAddress(getPort()));
+ System.out.println(
+ "MINA " + server.getTransportMetadata().getProviderName() +
+ " Server bound to port " + getPort());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
}
Modified: projects/network-benchmark/src/network/NIOServer.java
===================================================================
--- projects/network-benchmark/src/network/NIOServer.java 2008-03-22 10:52:42 UTC (rev 3911)
+++ projects/network-benchmark/src/network/NIOServer.java 2008-03-22 11:43:10 UTC (rev 3912)
@@ -1,11 +1,17 @@
package network;
+import static network.CommonSetting.*;
+
+import java.net.InetSocketAddress;
+import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
import java.util.Set;
public class NIOServer
@@ -24,19 +30,24 @@
{
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
- if (NetworkClientTest.TCP_BUFFER_SIZE != -1)
- server.socket().setReceiveBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
- server.socket().bind(NetworkClientTest.NIO_ADDRESS);
+ if (TCP_BUFFER_SIZE != -1)
+ {
+ server.socket().setReceiveBufferSize(TCP_BUFFER_SIZE);
+ }
+ server.socket().bind(new InetSocketAddress(NIO_SERVER_PORT));
- System.out
- .println("NIO Server bound to " + NetworkClientTest.NIO_ADDRESS);
+ System.out.println(
+ "NIO Server bound to port " + NIO_SERVER_PORT);
Selector selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
- ByteBuffer buf = ByteBuffer.allocate(NetworkClientTest.MESSAGE_SIZE - NetworkClientTest.RESPONSE_SIZE);
- ByteBuffer response = ByteBuffer.allocate(NetworkClientTest.RESPONSE_SIZE);
+ ByteBuffer response = ByteBuffer.allocate(RESPONSE_SIZE);
+ for (int i = 0; i < RESPONSE_SIZE; i ++) {
+ response.put(i, (byte) ((i % 10) + '0'));
+ }
+
// Wait for something of interest to happen
while (selector.select() > 0)
{
@@ -47,7 +58,6 @@
// Walk through set
while (readyItor.hasNext())
{
-
// Get key from set
SelectionKey key = readyItor.next();
@@ -57,38 +67,71 @@
if (key.isAcceptable())
{
// Get channel
- ServerSocketChannel keyChannel = (ServerSocketChannel) key
- .channel();
+ ServerSocketChannel keyChannel = (ServerSocketChannel) key.channel();
// Accept request
SocketChannel socket = keyChannel.accept();
- if (NetworkClientTest.TCP_BUFFER_SIZE != -1)
- socket.socket().setSendBufferSize(NetworkClientTest.TCP_BUFFER_SIZE);
+ socket.socket().setTcpNoDelay(ENABLE_TCP_NO_DELAY);
+ if (TCP_BUFFER_SIZE != -1) {
+ socket.socket().setSendBufferSize(TCP_BUFFER_SIZE);
+ }
System.out.println("new client...");
socket.configureBlocking(false);
- socket.register(selector, SelectionKey.OP_READ);
- } else if (key.isReadable())
+ SelectionKey clientKey = socket.register(selector, SelectionKey.OP_READ);
+ clientKey.attach(new Context(socket));
+ continue;
+ }
+
+ SocketChannel channel = (SocketChannel) key.channel();
+ Context ctx = (Context) key.attachment();
+ if (key.isReadable())
{
- response.clear();
- buf.clear();
- SocketChannel channel = (SocketChannel) key.channel();
- long readBytes = channel.read(new ByteBuffer[] {buf, response});
- if (readBytes == 0)
+ ByteBuffer readBuf = ctx.readBuffer;
+ readBuf.clear();
+ int localReadBytes = channel.read(readBuf);
+ if (localReadBytes < 0)
{
- System.err.println("READ BUFFER UNDERRUN");
- } else if (readBytes < 0)
- {
channel.close();
key.cancel();
- // server.close();
- // selector.close();
- } else
+ }
+ else
{
- response.flip();
- if (channel.write(response) == 0)
+ ctx.readBytes += localReadBytes;
+ }
+
+ int receivedRequests = (int) (ctx.readBytes / REQUEST_SIZE);
+ int sentResponses = (int) (ctx.writtenBytes / RESPONSE_SIZE) + ctx.writeBufferQueue.size();
+ for (int i = receivedRequests - sentResponses; i > 0; i --)
+ {
+ ctx.writeBufferQueue.offer(response.duplicate());
+ }
+ }
+
+ if (key.isValid() && (key.isWritable() || (key.interestOps() == SelectionKey.OP_READ && !ctx.writeBufferQueue.isEmpty())))
+ {
+ for (;;) {
+ ByteBuffer writeBuf = ctx.writeBufferQueue.peek();
+ if (writeBuf == null) {
+ if (key.interestOps() != SelectionKey.OP_READ)
+ {
+ key.interestOps(SelectionKey.OP_READ);
+ }
+ break;
+ }
+
+ ctx.writtenBytes += channel.write(writeBuf);
+ if (writeBuf.hasRemaining())
{
- System.err.println("WRITE BUFFER FULL");
+ if (key.interestOps() == SelectionKey.OP_READ)
+ {
+ key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+ }
+ break;
}
+ else
+ {
+ ctx.writeBufferQueue.poll();
+ }
}
}
}
@@ -102,4 +145,15 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
+ private static class Context {
+ private final ByteBuffer readBuffer;
+ private final Queue<ByteBuffer> writeBufferQueue = new LinkedList<ByteBuffer>();
+ private long readBytes;
+ private long writtenBytes;
+
+ private Context(SocketChannel ch) throws SocketException {
+ readBuffer = ByteBuffer.allocate(ch.socket().getReceiveBufferSize());
+ }
+ }
}
Modified: projects/network-benchmark/src/network/NetworkClientTest.java
===================================================================
--- projects/network-benchmark/src/network/NetworkClientTest.java 2008-03-22 10:52:42 UTC (rev 3911)
+++ projects/network-benchmark/src/network/NetworkClientTest.java 2008-03-22 11:43:10 UTC (rev 3912)
@@ -1,6 +1,7 @@
package network;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static network.ClientSetting.*;
+import static network.CommonSetting.*;
import java.io.IOException;
import java.io.InputStream;
@@ -10,7 +11,7 @@
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
@@ -19,114 +20,145 @@
import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.ReadFuture;
+import org.apache.mina.transport.socket.SocketConnector;
+import org.apache.mina.transport.socket.apr.AprSocketConnector;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class NetworkClientTest extends TestCase
{
- // Configurable properties:
- private static final String SERVER_HOST = "192.168.0.4";
- private static final long DURATION = 10000; // in ms
- public static final int MESSAGE_SIZE = 1024; // in bytes
- public static final int RESPONSE_SIZE = 24; // in bytes
- private static final boolean ENABLE_TCP_NO_DELAY = false;
- static final int TCP_BUFFER_SIZE = -1; // -1 to not set it
-
- // Constants -----------------------------------------------------
-
- public static final SocketAddress BIO_ADDRESS = new InetSocketAddress(
- SERVER_HOST, 5555);
- public static final SocketAddress NIO_ADDRESS = new InetSocketAddress(
- SERVER_HOST, 6666);
- public static final SocketAddress MINA_ADDRESS = new InetSocketAddress(
- SERVER_HOST, 7777);
-
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
static {
- System.out.println("Duration: " + DURATION + " ms");
- System.out.println("Message size: " + MESSAGE_SIZE + " bytes");
- System.out.println("Response size: " + RESPONSE_SIZE + " bytes");
- System.out.println("TCP no delay: " + ENABLE_TCP_NO_DELAY);
- try
- {
- System.out.format("TCP send buffer size: %d (default: %d)\n",
- TCP_BUFFER_SIZE, (new Socket()).getSendBufferSize());
- System.out.format("TCP receive buffer size: %d (default: %d)\n",
- TCP_BUFFER_SIZE, (new Socket()).getReceiveBufferSize());
- } catch (SocketException e)
- {
- e.printStackTrace();
- }
+ ClientSetting.print();
+ CommonSetting.print();
System.out.println();
}
// Constructors --------------------------------------------------
- // Public --------------------------------------------------------
-
+ // Public --------------------------------------------------------
+
public void test_Blocking_BIOClient_To_BIOServer() throws Exception
{
- startBlockingBioClient(BIO_ADDRESS);
+ startBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, BIO_SERVER_PORT));
}
public void test_NonBlocking_BIOClient_To_BIOServer() throws Exception
{
- startNonBlockingBioClient(BIO_ADDRESS);
+ startNonBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, BIO_SERVER_PORT));
}
public void test_Blocking_BIOClient_To_NIOServer() throws Exception
{
- startBlockingBioClient(NIO_ADDRESS);
+ startBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, NIO_SERVER_PORT));
}
public void test_NonBlocking_BIOClient_To_NIOServer() throws Exception
{
- startNonBlockingBioClient(NIO_ADDRESS);
+ startNonBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, NIO_SERVER_PORT));
}
- public void test_Blocking_BIOClient_To_MINAServer() throws Exception
+ public void test_Blocking_BIOClient_To_MINANIOServer() throws Exception
{
- startBlockingBioClient(MINA_ADDRESS);
+ startBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, MINA_NIO_SERVER_PORT));
}
- public void test_NonBlocking_BIOClient_To_MINAServer() throws Exception
+ public void test_NonBlocking_BIOClient_To_MINANIOServer() throws Exception
{
- startNonBlockingBioClient(MINA_ADDRESS);
+ startNonBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, MINA_NIO_SERVER_PORT));
}
- public void test_Blocking_MINAClient_To_BIOServer() throws Exception
+ public void test_Blocking_BIOClient_To_MINAAPRServer() throws Exception
{
- startBlockingMINAClient(BIO_ADDRESS);
+ startBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, MINA_APR_SERVER_PORT));
}
- public void _test_NonBlocking_MINAClient_To_BIOServer() throws Exception
+ public void test_NonBlocking_BIOClient_To_MINAAPRServer() throws Exception
{
- startNonBlockingMINAClient(BIO_ADDRESS);
+ startNonBlockingBioClient(new InetSocketAddress(SERVER_HOSTNAME, MINA_APR_SERVER_PORT));
}
- public void test_Blocking_MINAClient_To_NIOServer() throws Exception
+ public void test_Blocking_MINANIOClient_To_BIOServer() throws Exception
{
- startBlockingMINAClient(NIO_ADDRESS);
+ startBlockingMINAClient(new NioSocketConnector(), BIO_SERVER_PORT);
}
- public void _test_NonBlocking_MINAClient_To_NIOServer() throws Exception
+ public void test_NonBlocking_MINANIOClient_To_BIOServer() throws Exception
{
- startNonBlockingMINAClient(NIO_ADDRESS);
+ startNonBlockingMINAClient(new NioSocketConnector(), BIO_SERVER_PORT);
}
- public void test_Blocking_MINAClient_To_MINAServer() throws Exception
+ public void test_Blocking_MINAAPRClient_To_BIOServer() throws Exception
{
- startBlockingMINAClient(MINA_ADDRESS);
+ startBlockingMINAClient(new AprSocketConnector(), BIO_SERVER_PORT);
}
- public void _test_NonBlocking_MINAClient_To_MINAServer() throws Exception
+ public void test_NonBlocking_MINAAPRClient_To_BIOServer() throws Exception
{
- startNonBlockingMINAClient(MINA_ADDRESS);
+ startNonBlockingMINAClient(new AprSocketConnector(), BIO_SERVER_PORT);
}
+ public void test_Blocking_MINANIOClient_To_NIOServer() throws Exception
+ {
+ startBlockingMINAClient(new NioSocketConnector(), NIO_SERVER_PORT);
+ }
+
+ public void test_NonBlocking_MINANIOClient_To_NIOServer() throws Exception
+ {
+ startNonBlockingMINAClient(new NioSocketConnector(), NIO_SERVER_PORT);
+ }
+
+ public void test_Blocking_MINAAPRClient_To_NIOServer() throws Exception
+ {
+ startBlockingMINAClient(new AprSocketConnector(), NIO_SERVER_PORT);
+ }
+
+ public void test_NonBlocking_MINAAPRClient_To_NIOServer() throws Exception
+ {
+ startNonBlockingMINAClient(new AprSocketConnector(), NIO_SERVER_PORT);
+ }
+
+ public void test_Blocking_MINANIOClient_To_MINANIOServer() throws Exception
+ {
+ startBlockingMINAClient(new NioSocketConnector(), MINA_NIO_SERVER_PORT);
+ }
+
+ public void test_NonBlocking_MINANIOClient_To_MINANIOServer() throws Exception
+ {
+ startNonBlockingMINAClient(new NioSocketConnector(), MINA_NIO_SERVER_PORT);
+ }
+
+ public void test_Blocking_MINANIOClient_To_MINAAPRServer() throws Exception
+ {
+ startBlockingMINAClient(new NioSocketConnector(), MINA_APR_SERVER_PORT);
+ }
+
+ public void test_NonBlocking_MINANIOClient_To_MINAAPRServer() throws Exception
+ {
+ startNonBlockingMINAClient(new NioSocketConnector(), MINA_APR_SERVER_PORT);
+ }
+
+ public void test_Blocking_MINAAPRClient_To_MINANIOServer() throws Exception
+ {
+ startBlockingMINAClient(new AprSocketConnector(), MINA_NIO_SERVER_PORT);
+ }
+
+ public void test_NonBlocking_MINAAPRClient_To_MINANIOServer() throws Exception
+ {
+ startNonBlockingMINAClient(new AprSocketConnector(), MINA_NIO_SERVER_PORT);
+ }
+
+ public void test_Blocking_MINAAPRClient_To_MINAAPRServer() throws Exception
+ {
+ startBlockingMINAClient(new AprSocketConnector(), MINA_APR_SERVER_PORT);
+ }
+
+ public void test_NonBlocking_MINAAprClient_To_MINAAPRServer() throws Exception
+ {
+ startNonBlockingMINAClient(new AprSocketConnector(), MINA_APR_SERVER_PORT);
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -147,67 +179,64 @@
clientSocket.connect(address);
return clientSocket;
}
-
- private static NioSocketConnector newConfiguredConnector()
+
+ private static void configureConnector(SocketConnector connector)
{
- NioSocketConnector client = new NioSocketConnector();
- client.getSessionConfig().setTcpNoDelay(ENABLE_TCP_NO_DELAY);
+ connector.getSessionConfig().setTcpNoDelay(ENABLE_TCP_NO_DELAY);
if (TCP_BUFFER_SIZE != -1)
{
- client.getSessionConfig().setSendBufferSize(TCP_BUFFER_SIZE);
- client.getSessionConfig().setReceiveBufferSize(TCP_BUFFER_SIZE);
+ connector.getSessionConfig().setSendBufferSize(TCP_BUFFER_SIZE);
+ connector.getSessionConfig().setReceiveBufferSize(TCP_BUFFER_SIZE);
}
- client.getSessionConfig().setReuseAddress(false);
- client.getSessionConfig().setUseReadOperation(true);
- return client;
}
private byte[] createMessage()
{
- byte[] b = new byte[MESSAGE_SIZE];
+ byte[] b = new byte[REQUEST_SIZE];
for (int i = 0; i < b.length; i++)
{
- b[i] = 66;
+ b[i] = (byte) ((i % 10) + '0');
}
return b;
}
-
- private byte[] lastMessage()
- {
- byte[] b = new byte[MESSAGE_SIZE];
- for (int i = 0; i < b.length; i++)
- {
- b[i] = 99;
- }
- return b;
- }
-
+
private void startBlockingBioClient(SocketAddress address)
throws UnknownHostException, IOException
{
Socket clientSocket = newConfiguredSocket(address);
OutputStream os = clientSocket.getOutputStream();
InputStream is = clientSocket.getInputStream();
-
+
long start = System.currentTimeMillis();
int count = 0;
byte[] message = createMessage();
byte[] response = new byte[RESPONSE_SIZE];
+
+ outerLoop:
while (System.currentTimeMillis() - start < DURATION)
{
os.write(message);
-
- int size = is.read(response);
- if (size != RESPONSE_SIZE)
- {
- throw new IllegalStateException("Wrong message size");
+ int readBytes = 0;
+ for (;;) {
+ int localReadBytes = is.read(response, readBytes, response.length - readBytes);
+ if (localReadBytes < 0) {
+ System.out.println("Connection closed by server.");
+ break outerLoop;
+ }
+ readBytes += localReadBytes;
+ if (readBytes == RESPONSE_SIZE) {
+ count ++;
+ break;
+ }
}
- count++;
}
+
+ is.close();
+ os.close();
clientSocket.close();
long periodInMs = System.currentTimeMillis() - start;
- display(count, periodInMs);
+ display(count, count, periodInMs);
}
private void startNonBlockingBioClient(SocketAddress address)
@@ -216,164 +245,175 @@
final Socket clientSocket = newConfiguredSocket(address);
OutputStream os = clientSocket.getOutputStream();
final InputStream is = clientSocket.getInputStream();
-
- final AtomicLong receivedCount = new AtomicLong(0);
- final CountDownLatch latch = new CountDownLatch(1);
+
+ final AtomicLong receivedBytes = new AtomicLong(0);
Thread receiver = new Thread()
{
@Override
public void run()
{
- byte[] b = new byte[RESPONSE_SIZE];
-
- while (true)
+ try
{
- try
+ byte[] response = new byte[clientSocket.getReceiveBufferSize()];
+
+ while (true)
{
- int size = is.read(b);
- if (size != RESPONSE_SIZE)
- throw new IllegalStateException("Wrong size: " + size);
- if (b[0] == 99)
- {
- latch.countDown();
- return;
+ int readBytes = is.read(response);
+ if (readBytes < 0) {
+ System.out.println("Connection closed by server.");
+ break;
}
- if (b[0] != 66 && b[0] != 99)
- {
- throw new IllegalStateException("Wrong byte");
- }
- receivedCount.incrementAndGet();
- } catch (IOException e)
- {
+ receivedBytes.addAndGet(readBytes);
+ }
+ }
+ catch (SocketException e)
+ {
+ if (e.getMessage().indexOf("closed") < 0) {
e.printStackTrace();
- return;
}
}
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
}
};
receiver.start();
- byte[] message = createMessage();
+ byte[] request = createMessage();
long start = System.currentTimeMillis();
+ int sentRequests = 0;
while (System.currentTimeMillis() - start < DURATION)
{
- os.write(message);
+ os.write(request);
+ sentRequests ++;
}
- os.write(lastMessage());
-
- assertTrue("did not receive all responses", latch.await(4 * DURATION, MILLISECONDS));
- long periodInMs = System.currentTimeMillis() - start;
-
+
+ while (receivedBytes.get() / RESPONSE_SIZE < sentRequests) {
+ Thread.yield();
+ }
+
+ is.close();
+ os.close();
clientSocket.close();
- display(receivedCount.longValue(), periodInMs);
+ long periodInMs = System.currentTimeMillis() - start;
+
+ display(sentRequests, receivedBytes.get() / RESPONSE_SIZE, periodInMs);
}
- private void startBlockingMINAClient(SocketAddress address)
+ private void startBlockingMINAClient(SocketConnector client, int port)
{
- NioSocketConnector client = newConfiguredConnector();
-
- final AtomicLong receivedCount = new AtomicLong(0);
+ configureConnector(client);
+ InetSocketAddress address = new InetSocketAddress(SERVER_HOSTNAME, port);
+ final IoBuffer request = IoBuffer.wrap(createMessage());
+ final AtomicBoolean shutdown = new AtomicBoolean();
client.setHandler(new IoHandlerAdapter()
{
@Override
+ public void sessionOpened(IoSession session) {
+ // Send the first message
+ session.write(request.duplicate());
+ }
+ @Override
public void messageReceived(IoSession session, Object message)
throws Exception
{
- receivedCount.incrementAndGet();
+ if (!shutdown.get())
+ {
+ int sentRequests = (int) (session.getWrittenBytes() / REQUEST_SIZE);
+ int receivedResponses = (int) (session.getReadBytes() / RESPONSE_SIZE);
+ if (receivedResponses != 0 && receivedResponses == sentRequests) {
+ session.write(request.duplicate());
+ }
+ }
}
});
ConnectFuture future = client.connect(address).awaitUninterruptibly();
IoSession session = future.getSession();
- IoBuffer buffer = IoBuffer.allocate(MESSAGE_SIZE);
- buffer.put(createMessage());
- buffer.flip();
-
- long sentCount = 0;
long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < DURATION)
- {
- session.write(buffer.duplicate());
- sentCount++;
- ReadFuture readFuture = session.read();
- readFuture.awaitUninterruptibly();
- readFuture.getMessage();
+ try {
+ Thread.sleep(DURATION);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- session.close().awaitUninterruptibly(DURATION, MILLISECONDS);
+ shutdown.set(true);
- assertEquals(sentCount, receivedCount.longValue());
+ while (client.getReadBytes() / RESPONSE_SIZE < (client.getWrittenBytes() + client.getScheduledWriteBytes()) / REQUEST_SIZE) {
+ Thread.yield();
+ }
+
+ session.closeOnFlush().awaitUninterruptibly();
long periodInMs = System.currentTimeMillis() - start;
- display(receivedCount.longValue(), periodInMs);
+ display(client.getWrittenBytes() / REQUEST_SIZE, client.getReadBytes() / RESPONSE_SIZE, periodInMs);
client.dispose();
}
- private void startNonBlockingMINAClient(SocketAddress address)
- throws InterruptedException
+ private void startNonBlockingMINAClient(SocketConnector client, int port)
{
- NioSocketConnector client = newConfiguredConnector();
+ configureConnector(client);
+ InetSocketAddress address = new InetSocketAddress(SERVER_HOSTNAME, port);
+ final IoBuffer request = IoBuffer.wrap(createMessage());
+ final AtomicBoolean shutdown = new AtomicBoolean();
+ final int maxScheduledWriteBytes;
+ if (TCP_BUFFER_SIZE != -1)
+ {
+ maxScheduledWriteBytes = TCP_BUFFER_SIZE * 4;
+ }
+ else
+ {
+ maxScheduledWriteBytes = client.getSessionConfig().getSendBufferSize() * 4;
+ }
- final AtomicLong receivedCount = new AtomicLong(0);
- final CountDownLatch latch = new CountDownLatch(1);
+ client.setHandler(new IoHandlerAdapter()
+ {
+ @Override
+ public void sessionOpened(IoSession session) {
+ // Push the requests in advance.
+ do {
+ session.write(request.duplicate());
+ } while (session.getScheduledWriteBytes() < maxScheduledWriteBytes);
+ }
- client.setHandler(new IoHandlerAdapter()
- {
- byte[] b = new byte[RESPONSE_SIZE];
-
- @Override
- public void messageReceived(IoSession session, Object message)
- throws Exception
- {
- IoBuffer buffer = (IoBuffer) message;
- buffer.get(b);
- if (b[0] == 99)
- {
- latch.countDown();
- }
- }
- });
- ConnectFuture future = client.connect(address).awaitUninterruptibly();
- IoSession session = future.getSession();
+ @Override
+ public void messageSent(IoSession session, Object message) {
+ if (!shutdown.get()) {
+ do {
+ session.write(request.duplicate());
+ } while (session.getScheduledWriteBytes() < maxScheduledWriteBytes);
+ }
+ }
+ });
+ ConnectFuture future = client.connect(address).awaitUninterruptibly();
+ IoSession session = future.getSession();
- IoBuffer buffer = IoBuffer.allocate(MESSAGE_SIZE);
- buffer.put(createMessage());
- buffer.flip();
+ long start = System.currentTimeMillis();
+ try {
+ Thread.sleep(DURATION);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ shutdown.set(true);
- long sentCount = 0;
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < DURATION)
- {
- session.write(buffer.duplicate());
- sentCount++;
- }
+ while (client.getReadBytes() / RESPONSE_SIZE < (client.getWrittenBytes() + client.getScheduledWriteBytes()) / REQUEST_SIZE) {
+ Thread.yield();
+ }
- buffer = IoBuffer.allocate(MESSAGE_SIZE);
- buffer.put(lastMessage());
- buffer.flip();
- session.write(buffer).awaitUninterruptibly(DURATION, MILLISECONDS);
- sentCount++;
+ session.closeOnFlush().awaitUninterruptibly();
+ long periodInMs = System.currentTimeMillis() - start;
+ display(client.getWrittenBytes() / REQUEST_SIZE, client.getReadBytes() / RESPONSE_SIZE, periodInMs);
- boolean receivedLastResponse = latch.await(4 * DURATION, MILLISECONDS);
- if (!receivedLastResponse)
- {
- fail("received " + receivedCount.longValue() + " responses on " + sentCount + "expected");
- }
- session.close().awaitUninterruptibly(DURATION, MILLISECONDS);
-
- assertEquals(sentCount, receivedCount.longValue());
- long periodInMs = System.currentTimeMillis() - start;
- display(receivedCount.longValue(), periodInMs);
-
- client.dispose();
+ client.dispose();
}
- private void display(long count, long periodInMs)
+ private void display(long requestCount, long responseCount, long periodInMs)
{
String name = getName().replace("test_", "").replace('_', ' ');
- double rate = 1000 * (double) count / periodInMs;
- System.out.format("%-36s: %6.0f inv./s (%d inv. in %dms)\n", name, rate, count, periodInMs);
+ double rate = 1000 * (double) responseCount / periodInMs;
+ System.out.format("%41s: %6.0f inv./s (%8d inv. in %d ms, sent %8d)\n", name, rate, responseCount, periodInMs, requestCount);
}
// Inner classes -------------------------------------------------
More information about the jboss-cvs-commits
mailing list