[jboss-cvs] JBoss Messaging SVN: r4329 - in trunk/examples/jms: src/org/jboss/jms/example and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 28 12:32:58 EDT 2008
Author: jmesnil
Date: 2008-05-28 12:32:57 -0400 (Wed, 28 May 2008)
New Revision: 4329
Modified:
trunk/examples/jms/build.xml
trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
Log:
removed perf sampling, metrics are displayed for every 10% of the messages which are sent/received
Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml 2008-05-28 16:28:50 UTC (rev 4328)
+++ trunk/examples/jms/build.xml 2008-05-28 16:32:57 UTC (rev 4329)
@@ -44,8 +44,6 @@
<property name="message.count" value="200000"/>
<property name="message.warmup.count" value="10000"/>
<property name="delivery.mode" value="NON_PERSISTENT"/>
- <!-- in seconds -->
- <property name="sample.period" value="1"/>
<property name="sess.trans" value="false"/>
<property name="sess.ackmode" value="DUPS_OK"/>
<property name="sess.trans.size" value="1000"/>
@@ -121,7 +119,6 @@
* message.count number of messages 200000 ${message.count}
* message.warmup.count number of messages to warm up 10000 ${message.warmup.count}
* delivery.mode PERSISTENT/NON_PERSISTENT NON_PERSISTENT ${delivery.mode}
-* sample.period timing period in seconds 1 ${sample.period}
* sess.trans Is session transacted false ${sess.trans}
* sess.trans.size batch size to commit 1000 ${sess.trans.size}
* sess.ackmode Ack mode DUPS_OK/AUTO_ACK DUPS_OK ${sess.ackmode}
@@ -145,7 +142,6 @@
<arg value="${message.count}"/>
<arg value="${message.warmup.count}"/>
<arg value="${delivery.mode}"/>
- <arg value="${sample.period}"/>
<arg value="${sess.trans}"/>
<arg value="${sess.trans.size}"/>
<arg value="${sess.ackmode}"/>
@@ -168,7 +164,6 @@
<arg value="${message.count}"/>
<arg value="${message.warmup.count}"/>
<arg value="${delivery.mode}"/>
- <arg value="${sample.period}"/>
<arg value="${sess.trans}"/>
<arg value="${sess.trans.size}"/>
<arg value="${sess.ackmode}"/>
Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-05-28 16:28:50 UTC (rev 4328)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-05-28 16:32:57 UTC (rev 4329)
@@ -21,18 +21,26 @@
*/
package org.jboss.jms.example;
-import org.jboss.jms.util.PerfParams;
-import org.jboss.messaging.core.logging.Logger;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
-import javax.jms.*;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.jms.util.PerfParams;
+import org.jboss.messaging.core.logging.Logger;
+
/**
* a performance example that can be used to gather simple performance figures.
*
@@ -45,10 +53,8 @@
private static Logger log = Logger.getLogger(PerfExample.class);
private Queue queue;
private Connection connection;
- private AtomicLong messageCount = new AtomicLong(0);
- private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private Session session;
- private Sampler command = new Sampler();
+ private long start;
public static void main(String[] args)
{
@@ -57,20 +63,17 @@
int noOfMessages = Integer.parseInt(args[1]);
int noOfWarmupMessages = Integer.parseInt(args[2]);
int deliveryMode = args[3].equalsIgnoreCase("persistent") ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
- long samplePeriod = Long.parseLong(args[4]);
- boolean transacted = Boolean.parseBoolean(args[5]);
- log.info("Transacted:" + transacted);
- int transactionBatchSize = Integer.parseInt(args[6]);
- boolean dupsok = "DUPS_OK".equalsIgnoreCase(args[7]);
- boolean drainQueue = Boolean.parseBoolean(args[8]);
- String queueLookup = args[9];
- String connectionFactoryLookup = args[10];
+ boolean transacted = Boolean.parseBoolean(args[4]);
+ int transactionBatchSize = Integer.parseInt(args[5]);
+ boolean dupsok = "DUPS_OK".equalsIgnoreCase(args[6]);
+ boolean drainQueue = Boolean.parseBoolean(args[7]);
+ String queueLookup = args[8];
+ String connectionFactoryLookup = args[9];
PerfParams perfParams = new PerfParams();
perfParams.setNoOfMessagesToSend(noOfMessages);
perfParams.setNoOfWarmupMessages(noOfWarmupMessages);
perfParams.setDeliveryMode(deliveryMode);
- perfParams.setSamplePeriod(samplePeriod);
perfParams.setSessionTransacted(transacted);
perfParams.setTransactionBatchSize(transactionBatchSize);
perfParams.setDupsOk(dupsok);
@@ -99,22 +102,29 @@
session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : (dupsOk ? Session.DUPS_OK_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE));
}
+ private void displayAverage(long numberOfMessages, long start, long end)
+ {
+ double duration = (1.0 * end - start) / 1000; // in seconds
+ double average = (1.0 * numberOfMessages / duration);
+ log.info(String.format("average: %.2f msg/s (%d messages in %2.2fs)", average, numberOfMessages, duration));
+ }
+
public void runSender(final PerfParams perfParams)
{
try
{
log.info("params = " + perfParams);
init(perfParams.isSessionTransacted(), perfParams.getQueueLookup(), perfParams.getConnectionFactoryLookup(), perfParams.isDupsOk());
+ start = System.currentTimeMillis();
log.info("warming up by sending " + perfParams.getNoOfWarmupMessages() + " messages");
- sendMessages(perfParams.getNoOfWarmupMessages(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());
+ sendMessages(perfParams.getNoOfWarmupMessages(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted(), false);
log.info("warmed up");
- messageCount.set(0);
- scheduler.scheduleAtFixedRate(command, perfParams.getSamplePeriod(), perfParams.getSamplePeriod(), TimeUnit.SECONDS);
- sendMessages(perfParams.getNoOfMessagesToSend(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted());
- scheduler.shutdownNow();
+ start = System.currentTimeMillis();
+ sendMessages(perfParams.getNoOfMessagesToSend(), perfParams.getTransactionBatchSize(), perfParams.getDeliveryMode(), perfParams.isSessionTransacted(), true);
+ long end = System.currentTimeMillis();
- log.info(String.format("average: %.2f msg/s", (command.getAverage() / perfParams.getSamplePeriod())));
+ displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
}
catch (Exception e)
{
@@ -134,7 +144,7 @@
}
}
- private void sendMessages(int numberOfMessages, int txBatchSize, int deliveryMode, boolean transacted) throws JMSException
+ private void sendMessages(int numberOfMessages, int txBatchSize, int deliveryMode, boolean transacted, boolean display) throws JMSException
{
MessageProducer producer = session.createProducer(queue);
producer.setDisableMessageID(true);
@@ -143,15 +153,16 @@
BytesMessage bytesMessage = session.createBytesMessage();
byte[] payload = new byte[1024];
bytesMessage.writeBytes(payload);
+
+ int modulo = numberOfMessages / 10;
boolean committed = false;
for (int i = 1; i <= numberOfMessages; i++)
{
producer.send(bytesMessage);
- messageCount.incrementAndGet();
if (transacted)
{
- if (messageCount.longValue() % txBatchSize == 0)
+ if (i % txBatchSize == 0)
{
session.commit();
committed = true;
@@ -161,6 +172,11 @@
committed = false;
}
}
+ if (display && (i % modulo == 0))
+ {
+ double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
+ log.info(String.format("sent %6d messages in %2.2fs", i, duration));
+ }
}
if (transacted && !committed)
{
@@ -186,7 +202,9 @@
CountDownLatch countDownLatch = new CountDownLatch(1);
messageConsumer.setMessageListener(new PerfListener(countDownLatch, perfParams));
countDownLatch.await();
-
+ long end = System.currentTimeMillis();
+ // start was set on the first received message
+ displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
}
catch (Exception e)
{
@@ -211,7 +229,7 @@
log.info("draining queue");
while (true)
{
- Message m = consumer.receive(500);
+ Message m = consumer.receive(5000);
if (m == null)
{
log.info("queue is drained");
@@ -232,11 +250,15 @@
private boolean warmingUp = true;
private boolean started = false;
+ private int modulo;
+ private AtomicLong count = new AtomicLong(0);
+
public PerfListener(CountDownLatch countDownLatch, PerfParams perfParams)
{
this.countDownLatch = countDownLatch;
this.perfParams = perfParams;
warmingUp = perfParams.getNoOfWarmupMessages() > 0;
+ this.modulo = perfParams.getNoOfMessagesToSend() / 10;
}
public void onMessage(Message message)
@@ -246,16 +268,14 @@
if (warmingUp)
{
boolean committed = checkCommit();
- if (messageCount.incrementAndGet() == perfParams.getNoOfWarmupMessages())
+ if (count.incrementAndGet() == perfParams.getNoOfWarmupMessages())
{
- log.info("warmed up after receiving " + messageCount.longValue() + " msgs");
+ log.info("warmed up after receiving " + count.longValue() + " msgs");
if (!committed)
{
checkCommit();
}
warmingUp = false;
- // reset messageCount to take stats
- messageCount.set(0);
}
return;
}
@@ -263,21 +283,26 @@
if (!started)
{
started = true;
- scheduler.scheduleAtFixedRate(command, 1, 1, TimeUnit.SECONDS);
+ // reset count to take stats
+ count.set(0);
+ start = System.currentTimeMillis();
}
- messageCount.incrementAndGet();
+ long currentCount = count.incrementAndGet();
boolean committed = checkCommit();
- if (messageCount.longValue() == perfParams.getNoOfMessagesToSend())
+ if (currentCount == perfParams.getNoOfMessagesToSend())
{
if (!committed)
{
checkCommit();
}
countDownLatch.countDown();
- scheduler.shutdownNow();
- log.info(String.format("average: %.2f msg/s", command.getAverage()));
}
+ if (currentCount % modulo == 0)
+ {
+ double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
+ log.info(String.format("received %6d messages in %2.2fs", currentCount, duration));
+ }
}
catch (Exception e)
{
@@ -289,7 +314,7 @@
{
if (perfParams.isSessionTransacted())
{
- if (messageCount.longValue() % perfParams.getTransactionBatchSize() == 0)
+ if (count.longValue() % perfParams.getTransactionBatchSize() == 0)
{
session.commit();
@@ -299,35 +324,4 @@
return false;
}
}
-
- /**
- * simple class to gather performance figures
- */
- class Sampler implements Runnable
- {
- long sampleCount = 0;
-
- long startTime = 0;
- long samplesTaken = 0;
-
- public void run()
- {
- if (startTime == 0)
- {
- startTime = System.currentTimeMillis();
- }
- long elapsedTime = (System.currentTimeMillis() - startTime) / 1000; // in s
- long lastCount = sampleCount;
- sampleCount = messageCount.longValue();
- log.info(String.format("time elapsed: %2ds, message count: %7d, this period: %5d",
- elapsedTime, sampleCount, sampleCount - lastCount));
- samplesTaken++;
- }
-
- public double getAverage()
- {
- return (1.0 * sampleCount)/samplesTaken;
- }
-
- }
}
Modified: trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java 2008-05-28 16:28:50 UTC (rev 4328)
+++ trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java 2008-05-28 16:32:57 UTC (rev 4329)
@@ -31,7 +31,6 @@
{
int noOfMessagesToSend = 1000;
int noOfWarmupMessages;
- long samplePeriod = 1; // in seconds
int deliveryMode = DeliveryMode.NON_PERSISTENT;
boolean isSessionTransacted = false;
int transactionBatchSize = 5000;
@@ -60,16 +59,6 @@
this.noOfWarmupMessages = noOfWarmupMessages;
}
- public long getSamplePeriod()
- {
- return samplePeriod;
- }
-
- public void setSamplePeriod(long samplePeriod)
- {
- this.samplePeriod = samplePeriod;
- }
-
public int getDeliveryMode()
{
return deliveryMode;
@@ -143,7 +132,7 @@
public String toString()
{
- return "message to send = " + noOfMessagesToSend + ", samplePeriod = " + samplePeriod + "s" + ", DeliveryMode = " +
+ return "message to send = " + noOfMessagesToSend + ", DeliveryMode = " +
(deliveryMode == DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON_PERSISTENT") + ", session transacted = " + isSessionTransacted +
(isSessionTransacted ? ", transaction batch size = " + transactionBatchSize : "") + ", drain queue = " + drainQueue +
", queue lookup = " + queueLookup + ", connection factory lookup = " + connectionFactoryLookup +
More information about the jboss-cvs-commits
mailing list