[jboss-cvs] JBoss Messaging SVN: r3594 - in trunk: src/main/org/jboss/messaging/core/remoting/impl/mina and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 18 10:15:19 EST 2008


Author: jmesnil
Date: 2008-01-18 10:15:19 -0500 (Fri, 18 Jan 2008)
New Revision: 3594

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/BytesPacket.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
   trunk/tests/build.xml
Log:
* added PacketStressTest to stress sending and receiving packets using MINA

Added: trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/BytesPacketCodec.java	2008-01-18 15:15:19 UTC (rev 3594)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.codec;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.BYTES;
+
+import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ */
+public class BytesPacketCodec extends AbstractPacketCodec<BytesPacket>
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public BytesPacketCodec()
+   {
+      super(BYTES);
+   }
+
+   // Public --------------------------------------------------------
+
+   // AbstractPacketCodec overrides ---------------------------------
+
+   @Override
+   protected void encodeBody(BytesPacket packet, RemotingBuffer out)
+         throws Exception
+   {
+      byte[] bytes = packet.getBytes();
+      out.putInt(bytes.length);
+      out.put(bytes);
+   }
+
+   @Override
+   protected BytesPacket decodeBody(RemotingBuffer in)
+         throws Exception
+   {
+      int bodyLength = in.getInt();
+      if (bodyLength > in.remaining())
+      {
+         return null;
+      }
+      
+      byte[] bytes = new byte[bodyLength];
+      in.get(bytes);
+
+      return new BytesPacket(bytes);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java	2008-01-18 14:05:49 UTC (rev 3593)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/PacketCodecFactory.java	2008-01-18 15:15:19 UTC (rev 3594)
@@ -19,6 +19,7 @@
 import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockRequestCodec;
 import org.jboss.messaging.core.remoting.codec.BrowserNextMessageBlockResponseCodec;
 import org.jboss.messaging.core.remoting.codec.BrowserNextMessageResponseCodec;
+import org.jboss.messaging.core.remoting.codec.BytesPacketCodec;
 import org.jboss.messaging.core.remoting.codec.CancelDeliveriesMessageCodec;
 import org.jboss.messaging.core.remoting.codec.CancelDeliveryMessageCodec;
 import org.jboss.messaging.core.remoting.codec.ChangeRateMessageCodec;
@@ -59,6 +60,7 @@
 import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageRequest;
 import org.jboss.messaging.core.remoting.wireformat.BrowserNextMessageResponse;
 import org.jboss.messaging.core.remoting.wireformat.BrowserResetMessage;
+import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
 import org.jboss.messaging.core.remoting.wireformat.CancelDeliveriesMessage;
 import org.jboss.messaging.core.remoting.wireformat.CancelDeliveryMessage;
 import org.jboss.messaging.core.remoting.wireformat.ChangeRateMessage;
@@ -119,6 +121,7 @@
 
       // TextPacket are for testing purpose only!
       addCodec(TextPacket.class, TextPacketCodec.class);
+      addCodec(BytesPacket.class, BytesPacketCodec.class);
 
       addCodec(CreateConnectionRequest.class,
             ConnectionFactoryCreateConnectionRequestCodec.class);

Added: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/BytesPacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/BytesPacket.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/BytesPacket.java	2008-01-18 15:15:19 UTC (rev 3594)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.wireformat;
+
+import static org.jboss.messaging.core.remoting.wireformat.PacketType.BYTES;
+
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>.
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class BytesPacket extends AbstractPacket
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final byte[] bytes;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public BytesPacket(byte[] bytes)
+   {
+      super(BYTES);
+
+      assert bytes != null;
+
+      this.bytes = bytes;
+   }
+   
+   // Public --------------------------------------------------------
+
+   public byte[] getBytes()
+   {
+      return bytes;
+   }
+
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", bytes.length=" + bytes.length + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------   
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2008-01-18 14:05:49 UTC (rev 3593)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/PacketType.java	2008-01-18 15:15:19 UTC (rev 3594)
@@ -16,6 +16,7 @@
    NULL                           ((byte) 1),
    MSG_JMSEXCEPTION               ((byte) 2),
    TEXT                           ((byte) 3),
+   BYTES                          ((byte) 4),
 
    // Connection factory
    REQ_CREATECONNECTION           ((byte)10),

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2008-01-18 14:05:49 UTC (rev 3593)
+++ trunk/tests/build.xml	2008-01-18 15:15:19 UTC (rev 3594)
@@ -519,6 +519,7 @@
                <exclude name="**/jms/JCAWrapperTest.class"/>
                <exclude name="**/jms/server/ServerPeerTest.class"/>
                <exclude name="**/jms/SecurityTest.class"/>
+               <exclude name="**/stress/PacketStressTest.class"/>
             </fileset>
          </batchtest>
       </junit>

Added: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/stress/PacketStressTest.java	2008-01-18 15:15:19 UTC (rev 3594)
@@ -0,0 +1,165 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina.stress;
+
+import static java.util.UUID.randomUUID;
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+import static org.jboss.messaging.core.remoting.impl.mina.integration.test.TestSupport.PORT;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.remoting.Client;
+import org.jboss.messaging.core.remoting.NIOConnector;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketSender;
+import org.jboss.messaging.core.remoting.impl.ClientImpl;
+import org.jboss.messaging.core.remoting.impl.mina.MinaConnector;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.wireformat.BytesPacket;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class PacketStressTest extends TestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   private static final int MANY_MESSAGES = 100000;
+   private static final int PAYLOAD = 10000; // in bytes
+   
+   // Attributes ----------------------------------------------------
+
+   private MinaService service;
+   private NIOConnector connector;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      service = new MinaService(TCP, "localhost", PORT);
+      service.start();
+      connector = new MinaConnector(TCP, "localhost", PORT);
+      
+   }
+   
+   @Override
+   protected void tearDown() throws Exception
+   {
+      connector.disconnect();
+      service.stop();
+      
+      connector = null;
+      service = null;
+   }
+   
+   public void testManyPackets() throws Exception
+   {
+      int spinner = MANY_MESSAGES / 100;
+      System.out.println("number of messages: " + MANY_MESSAGES);
+      System.out.println("message payload: " + MANY_MESSAGES + " bytes");
+      System.out.println("# => " + spinner + " messages sent (1% of total messages)");
+      System.out.println(". => " + spinner + " messages received (1% of total messages)");
+      System.out.println();
+      
+      
+      final String handlerID = randomUUID().toString();
+      CountDownLatch latch = new CountDownLatch(1);
+      
+      service.getDispatcher().register(new ServerHandler(handlerID, latch, spinner));
+      Client client = new ClientImpl(connector, service.getLocator());
+      client.connect();
+      
+      byte[] payloadBytes = generatePayload(PAYLOAD);
+      AbstractPacket packet = new BytesPacket(payloadBytes);
+      packet.setVersion((byte) 19);
+      packet.setTargetID(handlerID);
+
+      long start = System.currentTimeMillis();
+      for (int i = 0; i < MANY_MESSAGES; i++)
+      {
+       client.sendOneWay(packet); 
+       if (i % spinner == 0)
+          System.out.print('#');
+      }
+      
+      long durationForSending = System.currentTimeMillis() - start;      
+      latch.await();
+      long durationForReceiving = System.currentTimeMillis() - start;
+
+      System.out.println();
+      System.out.println(MANY_MESSAGES + " messages of " + PAYLOAD + "B sent one-way in " + durationForSending + "ms");
+      System.out.println(MANY_MESSAGES + " messages of " + PAYLOAD + "B received on the server in " + durationForReceiving + "ms");
+      System.out.println("==============");
+      
+      // in MB/s
+      double sendingThroughput =  (MANY_MESSAGES * PAYLOAD) / (durationForSending * 1000); 
+      double receivingThroughput =  (MANY_MESSAGES * PAYLOAD) / (durationForReceiving * 1000); 
+      
+      System.out.format("sending throughput: %.1f MB/s\n", sendingThroughput);
+      System.out.format("receiving throughput: %.1f MB/s\n", receivingThroughput);
+      System.out.println("==============");
+   }
+
+   private byte[] generatePayload(int payload)
+   {
+      Random rand = new Random(System.currentTimeMillis());
+      byte[] bytes = new byte[payload];
+      rand.nextBytes(bytes);
+      return bytes;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private final class ServerHandler implements PacketHandler
+   {
+      private final String handlerID;
+      private CountDownLatch latch;
+      private int messagesReceived;
+      private int spinner;
+
+      private ServerHandler(String handlerID, CountDownLatch latch, int spinner)
+      {
+         this.handlerID = handlerID;
+         this.latch = latch;
+         this.spinner = spinner;
+         messagesReceived = 0;
+      }
+
+      public String getID()
+      {
+         return handlerID;
+      }
+
+      public void handle(AbstractPacket packet, PacketSender sender)
+      {
+         messagesReceived++;
+         if (messagesReceived % spinner == 0)
+            System.out.print('.');
+         if (messagesReceived == MANY_MESSAGES)
+         {
+            latch.countDown();
+         }
+      }
+   }
+}




More information about the jboss-cvs-commits mailing list