[jboss-cvs] JBoss Messaging SVN: r4329 - in trunk/examples/jms: src/org/jboss/jms/example and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 28 12:32:58 EDT 2008


Author: jmesnil
Date: 2008-05-28 12:32:57 -0400 (Wed, 28 May 2008)
New Revision: 4329

Modified:
   trunk/examples/jms/build.xml
   trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
   trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
Log:
removed perf sampling, metrics are displayed for every 10% of the messages which are sent/received

Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml	2008-05-28 16:28:50 UTC (rev 4328)
+++ trunk/examples/jms/build.xml	2008-05-28 16:32:57 UTC (rev 4329)
@@ -44,8 +44,6 @@
    <property name="message.count" value="200000"/>
    <property name="message.warmup.count" value="10000"/>
    <property name="delivery.mode" value="NON_PERSISTENT"/>
-   <!-- in seconds -->
-   <property name="sample.period" value="1"/>
    <property name="sess.trans" value="false"/>
    <property name="sess.ackmode" value="DUPS_OK"/>
    <property name="sess.trans.size" value="1000"/>
@@ -121,7 +119,6 @@
 * message.count         number of messages               200000                ${message.count}
 * message.warmup.count  number of messages to warm up    10000                 ${message.warmup.count}
 * delivery.mode         PERSISTENT/NON_PERSISTENT        NON_PERSISTENT        ${delivery.mode}
-* sample.period         timing period in seconds         1                     ${sample.period}
 * sess.trans            Is session transacted            false                 ${sess.trans}
 * sess.trans.size       batch size to commit             1000                  ${sess.trans.size}
 * sess.ackmode          Ack mode DUPS_OK/AUTO_ACK        DUPS_OK               ${sess.ackmode}
@@ -145,7 +142,6 @@
          <arg value="${message.count}"/>
          <arg value="${message.warmup.count}"/>
          <arg value="${delivery.mode}"/>
-         <arg value="${sample.period}"/>
          <arg value="${sess.trans}"/>
          <arg value="${sess.trans.size}"/>
          <arg value="${sess.ackmode}"/>
@@ -168,7 +164,6 @@
          <arg value="${message.count}"/>
          <arg value="${message.warmup.count}"/>
          <arg value="${delivery.mode}"/>
-         <arg value="${sample.period}"/>
          <arg value="${sess.trans}"/>
          <arg value="${sess.trans.size}"/>
          <arg value="${sess.ackmode}"/>

Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-05-28 16:28:50 UTC (rev 4328)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-05-28 16:32:57 UTC (rev 4329)
@@ -21,18 +21,26 @@
    */
 package org.jboss.jms.example;
 
-import org.jboss.jms.util.PerfParams;
-import org.jboss.messaging.core.logging.Logger;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
 
-import javax.jms.*;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
+import org.jboss.jms.util.PerfParams;
+import org.jboss.messaging.core.logging.Logger;
+
 /**
  * a performance example that can be used to gather simple performance figures.
  *
@@ -45,10 +53,8 @@
    private static Logger log = Logger.getLogger(PerfExample.class);
    private Queue queue;
    private Connection connection;
-   private AtomicLong messageCount = new AtomicLong(0);
-   private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private Session session;
-   private Sampler command = new Sampler();
+   private long start;
 
    public static void main(String[] args)
    {
@@ -57,20 +63,17 @@
       int noOfMessages = Integer.parseInt(args[1]);
       int noOfWarmupMessages = Integer.parseInt(args[2]);
       int deliveryMode = args[3].equalsIgnoreCase("persistent") ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
-      long samplePeriod = Long.parseLong(args[4]);
-      boolean transacted = Boolean.parseBoolean(args[5]);
-      log.info("Transacted:" + transacted);
-      int transactionBatchSize = Integer.parseInt(args[6]);
-      boolean dupsok = "DUPS_OK".equalsIgnoreCase(args[7]);
-      boolean drainQueue = Boolean.parseBoolean(args[8]);
-      String queueLookup = args[9];
-      String connectionFactoryLookup = args[10];
+      boolean transacted = Boolean.parseBoolean(args[4]);
+      int transactionBatchSize = Integer.parseInt(args[5]);
+      boolean dupsok = "DUPS_OK".equalsIgnoreCase(args[6]);
+      boolean drainQueue = Boolean.parseBoolean(args[7]);
+      String queueLookup = args[8];
+      String connectionFactoryLookup = args[9];
 
       PerfParams perfParams = new PerfParams();
       perfParams.setNoOfMessagesToSend(noOfMessages);
       perfParams.setNoOfWarmupMessages(noOfWarmupMessages);
       perfParams.setDeliveryMode(deliveryMode);
-      perfParams.setSamplePeriod(samplePeriod);
       perfParams.setSessionTransacted(transacted);
       perfParams.setTransactionBatchSize(transactionBatchSize);
       perfParams.setDupsOk(dupsok);
@@ -99,22 +102,29 @@
       session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : (dupsOk ? Session.DUPS_OK_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE));
    }
 
+   private void displayAverage(long numberOfMessages, long start, long end)
+   {
+      double duration = (1.0 * end - start) / 1000; // in seconds
+      double average = (1.0 * numberOfMessages / duration);
+      log.info(String.format("average: %.2f msg/s (%d messages in %2.2fs)", average, numberOfMessages, duration));
+   }
+
    public void runSender(final PerfParams perfParams)
    {
       try
       {
          log.info("params = " + perfParams);
          init(perfParams.isSessionTransacted(), perfParams.getQueueLookup(), perfParams.getConnectionFactoryLookup(), perfParams.isDupsOk());
+         start = System.currentTimeMillis();
          log.info("warming up by sending " + perfParams.getNoOfWarmupMessages() + " messages");
-         sendMessages(perfParams.getNoOfWarmupMessages(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());
+         sendMessages(perfParams.getNoOfWarmupMessages(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted(), false);
          log.info("warmed up");
-         messageCount.set(0);
 
-         scheduler.scheduleAtFixedRate(command, perfParams.getSamplePeriod(), perfParams.getSamplePeriod(), TimeUnit.SECONDS);
-         sendMessages(perfParams.getNoOfMessagesToSend(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());
-         scheduler.shutdownNow();
+         start = System.currentTimeMillis();
+         sendMessages(perfParams.getNoOfMessagesToSend(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted(), true);
+         long end = System.currentTimeMillis();
 
-         log.info(String.format("average: %.2f msg/s", (command.getAverage() / perfParams.getSamplePeriod())));
+         displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
       }
       catch (Exception e)
       {
@@ -134,7 +144,7 @@
       }
    }
 
-   private void sendMessages(int numberOfMessages, int txBatchSize, int deliveryMode, boolean transacted) throws JMSException
+   private void sendMessages(int numberOfMessages, int txBatchSize, int deliveryMode, boolean transacted, boolean display) throws JMSException
    {
       MessageProducer producer = session.createProducer(queue);
       producer.setDisableMessageID(true);
@@ -143,15 +153,16 @@
       BytesMessage bytesMessage = session.createBytesMessage();
       byte[] payload = new byte[1024];
       bytesMessage.writeBytes(payload);
+      
+      int modulo = numberOfMessages / 10;
 
       boolean committed = false;
       for (int i = 1; i <= numberOfMessages; i++)
       {
          producer.send(bytesMessage);
-         messageCount.incrementAndGet();
          if (transacted)
          {
-            if (messageCount.longValue() % txBatchSize == 0)
+            if (i % txBatchSize == 0)
             {
                session.commit();
                committed = true;
@@ -161,6 +172,11 @@
                committed = false;
             }
          }
+         if (display && (i % modulo == 0))
+         {
+            double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
+            log.info(String.format("sent %6d messages in %2.2fs", i, duration));
+         }
       }
       if (transacted && !committed)
       {
@@ -186,7 +202,9 @@
          CountDownLatch countDownLatch = new CountDownLatch(1);
          messageConsumer.setMessageListener(new PerfListener(countDownLatch, perfParams));
          countDownLatch.await();
-
+         long end = System.currentTimeMillis();
+         // start was set on the first received message
+         displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
       }
       catch (Exception e)
       {
@@ -211,7 +229,7 @@
       log.info("draining queue");
       while (true)
       {
-         Message m = consumer.receive(500);
+         Message m = consumer.receive(5000);
          if (m == null)
          {
             log.info("queue is drained");
@@ -232,11 +250,15 @@
       private boolean warmingUp = true;
       private boolean started = false;
 
+      private int modulo;
+      private AtomicLong count = new AtomicLong(0);
+
       public PerfListener(CountDownLatch countDownLatch, PerfParams perfParams)
       {
          this.countDownLatch = countDownLatch;
          this.perfParams = perfParams;
          warmingUp = perfParams.getNoOfWarmupMessages() > 0;
+         this.modulo = perfParams.getNoOfMessagesToSend() / 10;
       }
 
       public void onMessage(Message message)
@@ -246,16 +268,14 @@
             if (warmingUp)
             {
                boolean committed = checkCommit();
-               if (messageCount.incrementAndGet() == perfParams.getNoOfWarmupMessages())
+               if (count.incrementAndGet() == perfParams.getNoOfWarmupMessages())
                {
-                  log.info("warmed up after receiving " + messageCount.longValue() + " msgs");
+                  log.info("warmed up after receiving " + count.longValue() + " msgs");
                   if (!committed)
                   {
                      checkCommit();
                   }
                   warmingUp = false;
-                  // reset messageCount to take stats
-                  messageCount.set(0);
                }
                return;
             }
@@ -263,21 +283,26 @@
             if (!started)
             {
                started = true;
-               scheduler.scheduleAtFixedRate(command, 1, 1, TimeUnit.SECONDS);
+               // reset count to take stats
+               count.set(0);
+               start = System.currentTimeMillis();
             }
 
-            messageCount.incrementAndGet();
+            long currentCount = count.incrementAndGet();
             boolean committed = checkCommit();
-            if (messageCount.longValue() == perfParams.getNoOfMessagesToSend())
+            if (currentCount == perfParams.getNoOfMessagesToSend())
             {
                if (!committed)
                {
                   checkCommit();
                }
                countDownLatch.countDown();
-               scheduler.shutdownNow();
-               log.info(String.format("average: %.2f msg/s", command.getAverage()));
             }
+            if (currentCount % modulo == 0)
+            {
+               double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
+               log.info(String.format("received %6d messages in %2.2fs", currentCount, duration));
+            }
          }
          catch (Exception e)
          {
@@ -289,7 +314,7 @@
       {
          if (perfParams.isSessionTransacted())
          {
-            if (messageCount.longValue() % perfParams.getTransactionBatchSize() == 0)
+            if (count.longValue() % perfParams.getTransactionBatchSize() == 0)
             {
                session.commit();
 
@@ -299,35 +324,4 @@
          return false;
       }
    }
-
-   /**
-    * simple class to gather performance figures
-    */
-   class Sampler implements Runnable
-   {
-      long sampleCount = 0;
-
-      long startTime = 0;
-      long samplesTaken = 0;
-
-      public void run()
-      {
-         if (startTime == 0)
-         {
-            startTime = System.currentTimeMillis();
-         }
-         long elapsedTime = (System.currentTimeMillis() - startTime) / 1000; // in s
-         long lastCount = sampleCount;
-         sampleCount = messageCount.longValue();
-         log.info(String.format("time elapsed: %2ds, message count: %7d, this period: %5d",
-                 elapsedTime, sampleCount, sampleCount - lastCount));
-         samplesTaken++;
-      }
-
-      public double getAverage()
-      {
-         return (1.0 * sampleCount)/samplesTaken;
-      }
-
-   }
 }

Modified: trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java	2008-05-28 16:28:50 UTC (rev 4328)
+++ trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java	2008-05-28 16:32:57 UTC (rev 4329)
@@ -31,7 +31,6 @@
 {
    int noOfMessagesToSend = 1000;
    int noOfWarmupMessages;
-   long samplePeriod = 1; // in seconds
    int deliveryMode = DeliveryMode.NON_PERSISTENT;
    boolean isSessionTransacted = false;
    int transactionBatchSize = 5000;
@@ -60,16 +59,6 @@
       this.noOfWarmupMessages = noOfWarmupMessages;
    }
 
-   public long getSamplePeriod()
-   {
-      return samplePeriod;
-   }
-
-   public void setSamplePeriod(long samplePeriod)
-   {
-      this.samplePeriod = samplePeriod;
-   }
-
    public int getDeliveryMode()
    {
       return deliveryMode;
@@ -143,7 +132,7 @@
 
    public String toString()
    {
-      return "message to send = " + noOfMessagesToSend + ", samplePeriod = " + samplePeriod + "s" + ", DeliveryMode = " +
+      return "message to send = " + noOfMessagesToSend + ", DeliveryMode = " +
               (deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT") + ", session transacted = " + isSessionTransacted +
               (isSessionTransacted ? ", transaction batch size = " + transactionBatchSize : "") + ", drain queue = " + drainQueue +
               ", queue lookup = " + queueLookup + ", connection factory lookup = " + connectionFactoryLookup +




More information about the jboss-cvs-commits mailing list