[jboss-cvs] JBoss Messaging SVN: r3594 - in trunk: src/main/org/jboss/messaging/core/remoting/impl/mina and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 18 10:15:19 EST 2008
Author: jmesnil
Date: 2008-01-18 10:15:19 -0500 (Fri, 18 Jan 2008)
New Revision: 3594
Added:
trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/BytesPacket.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
trunk/tests/build.xml
Log:
* added PacketStressTest to stress sending and receiving packets using MINA
Added: trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java 2008-01-18 15:15:19 UTC (rev 3594)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.BYTES;
+
+import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class BytesPacketCodec extends AbstractPacketCodec<BytesPacket>
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public BytesPacketCodec()
+ {
+ super(BYTES);
+ }
+
+ // Public --------------------------------------------------------
+
+ // AbstractPacketCodec overrides ---------------------------------
+
+ @Override
+ protected void encodeBody(BytesPacket packet, RemotingBuffer out)
+ throws Exception
+ {
+ byte[] bytes = packet.getBytes();
+ out.putInt(bytes.length);
+ out.put(bytes);
+ }
+
+ @Override
+ protected BytesPacket decodeBody(RemotingBuffer in)
+ throws Exception
+ {
+ int bodyLength = in.getInt();
+ if (bodyLength > in.remaining())
+ {
+ return null;
+ }
+
+ byte[] bytes = new byte[bodyLength];
+ in.get(bytes);
+
+ return new BytesPacket(bytes);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java 2008-01-18 14:05:49 UTC (rev 3593)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java 2008-01-18 15:15:19 UTC (rev 3594)
@@ -19,6 +19,7 @@
import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockRequestCodec;
import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockResponseCodec;
import org.jboss.messaging.core.remoting.codec.BrowserNextMessageResponseCodec;
+import org.jboss.messaging.core.remoting.codec.BytesPacketCodec;
import org.jboss.messaging.core.remoting.codec.CancelDeliveriesMessageCodec;
import org.jboss.messaging.core.remoting.codec.CancelDeliveryMessageCodec;
import org.jboss.messaging.core.remoting.codec.ChangeRateMessageCodec;
@@ -59,6 +60,7 @@
import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageRequest;
import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
import org.jboss.messaging.core.remoting.wireformat.BrowserResetMessage;
+import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
@@ -119,6 +121,7 @@
// TextPacket are for testing purpose only!
addCodec(TextPacket.class, TextPacketCodec.class);
+ addCodec(BytesPacket.class, BytesPacketCodec.class);
addCodec(CreateConnectionRequest.class,
ConnectionFactoryCreateConnectionRequestCodec.class);
Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/BytesPacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/BytesPacket.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/BytesPacket.java 2008-01-18 15:15:19 UTC (rev 3594)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.BYTES;
+
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class BytesPacket extends AbstractPacket
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final byte[] bytes;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public BytesPacket(byte[] bytes)
+ {
+ super(BYTES);
+
+ assert bytes != null;
+
+ this.bytes = bytes;
+ }
+
+ // Public --------------------------------------------------------
+
+ public byte[] getBytes()
+ {
+ return bytes;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", bytes.length=" + bytes.length + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2008-01-18 14:05:49 UTC (rev 3593)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java 2008-01-18 15:15:19 UTC (rev 3594)
@@ -16,6 +16,7 @@
NULL ((byte) 1),
MSG_JMSEXCEPTION ((byte) 2),
TEXT ((byte) 3),
+ BYTES ((byte) 4),
// Connection factory
REQ_CREATECONNECTION ((byte)10),
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2008-01-18 14:05:49 UTC (rev 3593)
+++ trunk/tests/build.xml 2008-01-18 15:15:19 UTC (rev 3594)
@@ -519,6 +519,7 @@
<exclude name="**/jms/JCAWrapperTest.class"/>
<exclude name="**/jms/server/ServerPeerTest.class"/>
<exclude name="**/jms/SecurityTest.class"/>
+ <exclude name="**/stress/PacketStressTest.class"/>
</fileset>
</batchtest>
</junit>
Added: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java 2008-01-18 15:15:19 UTC (rev 3594)
@@ -0,0 +1,165 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina.stress;
+
+import static java.util.UUID.randomUUID;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.PORT;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.impl.ClientImpl;
+import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class PacketStressTest extends TestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ private static final int MANY_MESSAGES = 100000;
+ private static final int PAYLOAD = 10000; // in bytes
+
+ // Attributes ----------------------------------------------------
+
+ private MinaService service;
+ private NIOConnector connector;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ service = new MinaService(TCP, "localhost", PORT);
+ service.start();
+ connector = new MinaConnector(TCP, "localhost", PORT);
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ connector.disconnect();
+ service.stop();
+
+ connector = null;
+ service = null;
+ }
+
+ public void testManyPackets() throws Exception
+ {
+ int spinner = MANY_MESSAGES / 100;
+ System.out.println("number of messages: " + MANY_MESSAGES);
+ System.out.println("message payload: " + MANY_MESSAGES + " bytes");
+ System.out.println("# => " + spinner + " messages sent (1% of total messages)");
+ System.out.println(". => " + spinner + " messages received (1% of total messages)");
+ System.out.println();
+
+
+ final String handlerID = randomUUID().toString();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ service.getDispatcher().register(new ServerHandler(handlerID, latch, spinner));
+ Client client = new ClientImpl(connector, service.getLocator());
+ client.connect();
+
+ byte[] payloadBytes = generatePayload(PAYLOAD);
+ AbstractPacket packet = new BytesPacket(payloadBytes);
+ packet.setVersion((byte) 19);
+ packet.setTargetID(handlerID);
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < MANY_MESSAGES; i++)
+ {
+ client.sendOneWay(packet);
+ if (i % spinner == 0)
+ System.out.print('#');
+ }
+
+ long durationForSending = System.currentTimeMillis() - start;
+ latch.await();
+ long durationForReceiving = System.currentTimeMillis() - start;
+
+ System.out.println();
+ System.out.println(MANY_MESSAGES + " messages of " + PAYLOAD + "B sent one-way in " + durationForSending + "ms");
+ System.out.println(MANY_MESSAGES + " messages of " + PAYLOAD + "B received on the server in " + durationForReceiving + "ms");
+ System.out.println("==============");
+
+ // in MB/s
+ double sendingThroughput = (MANY_MESSAGES * PAYLOAD) / (durationForSending * 1000);
+ double receivingThroughput = (MANY_MESSAGES * PAYLOAD) / (durationForReceiving * 1000);
+
+ System.out.format("sending throughput: %.1f MB/s\n", sendingThroughput);
+ System.out.format("receiving throughput: %.1f MB/s\n", receivingThroughput);
+ System.out.println("==============");
+ }
+
+ private byte[] generatePayload(int payload)
+ {
+ Random rand = new Random(System.currentTimeMillis());
+ byte[] bytes = new byte[payload];
+ rand.nextBytes(bytes);
+ return bytes;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private final class ServerHandler implements PacketHandler
+ {
+ private final String handlerID;
+ private CountDownLatch latch;
+ private int messagesReceived;
+ private int spinner;
+
+ private ServerHandler(String handlerID, CountDownLatch latch, int spinner)
+ {
+ this.handlerID = handlerID;
+ this.latch = latch;
+ this.spinner = spinner;
+ messagesReceived = 0;
+ }
+
+ public String getID()
+ {
+ return handlerID;
+ }
+
+ public void handle(AbstractPacket packet, PacketSender sender)
+ {
+ messagesReceived++;
+ if (messagesReceived % spinner == 0)
+ System.out.print('.');
+ if (messagesReceived == MANY_MESSAGES)
+ {
+ latch.countDown();
+ }
+ }
+ }
+}
More information about the jboss-cvs-commits
mailing list