[jboss-cvs] JBoss Messaging SVN: r4284 - in trunk/examples/jms: src/org/jboss/jms/example and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu May 22 11:14:15 EDT 2008
Author: jmesnil
Date: 2008-05-22 11:14:14 -0400 (Thu, 22 May 2008)
New Revision: 4284
Modified:
trunk/examples/jms/build.xml
trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
Log:
the numbers of senders used by perfSender is configurable using Ant prop sender.count (1 by default)
Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml 2008-05-22 14:28:16 UTC (rev 4283)
+++ trunk/examples/jms/build.xml 2008-05-22 15:14:14 UTC (rev 4284)
@@ -47,6 +47,7 @@
<property name="sample.period" value="1"/>
<property name="sess.trans" value="false"/>
<property name="sess.trans.size" value="1"/>
+ <property name="sender.count" value="2"/>
<path id="compile.classpath">
<fileset dir="${lib.dir}">
@@ -138,6 +139,7 @@
<echo>* available parameters (-Dmessage.count=1000) </echo>
<echo>* </echo>
<echo>* param description default current</echo>
+ <echo>* sender.count number of senders 1 ${sender.count}</echo>
<echo>* message.count number of messages 200000 ${message.count}</echo>
<echo>* delivery.mode PERSISTENT/NON_PERSISTENT NON_PERSISTENT ${delivery.mode}</echo>
<echo>* sample.period timing period in seconds 1 second ${sample.period}</echo>
@@ -156,6 +158,7 @@
<arg value="${sample.period}"/>
<arg value="${sess.trans}"/>
<arg value="${sess.trans.size}"/>
+ <arg value="${sender.count}"/>
</java>
</target>
Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-05-22 14:28:16 UTC (rev 4283)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java 2008-05-22 15:14:14 UTC (rev 4284)
@@ -27,7 +27,9 @@
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.*;
+
import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
/**
* a performance example that can be used to gather simple performance figures.
@@ -41,7 +43,7 @@
private static Logger log = Logger.getLogger(PerfExample.class);
private Queue queue;
private Connection connection;
- private int messageCount = 0;
+ private AtomicLong messageCount = new AtomicLong(0);
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private Session session;
private Sampler command = new Sampler();
@@ -49,19 +51,26 @@
public static void main(String[] args)
{
PerfExample perfExample = new PerfExample();
-
+
int noOfMessages = Integer.parseInt(args[1]);
int deliveryMode = args[2].equalsIgnoreCase("persistent")? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT;
long samplePeriod = Long.parseLong(args[3]);
boolean transacted = Boolean.parseBoolean(args[4]);
log.info("Transacted:" + transacted);
int transactionBatchSize = Integer.parseInt(args[5]);
+ int numberOfSenders = 1;
+ if (args.length >= 7)
+ {
+ numberOfSenders = Integer.parseInt(args[6]);
+ }
+
PerfParams perfParams = new PerfParams();
perfParams.setNoOfMessagesToSend(noOfMessages);
perfParams.setDeliveryMode(deliveryMode);
perfParams.setSamplePeriod(samplePeriod);
perfParams.setSessionTransacted(transacted);
perfParams.setTransactionBatchSize(transactionBatchSize);
+ perfParams.setNumberOfSender(numberOfSenders);
if (args[0].equalsIgnoreCase("-l"))
{
@@ -69,8 +78,7 @@
}
else
{
-
- perfExample.runSender(perfParams);
+ perfExample.runSenders(perfParams);
}
}
@@ -85,42 +93,40 @@
session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.DUPS_OK_ACKNOWLEDGE);
}
- public void runSender(PerfParams perfParams)
+ public void runSenders(final PerfParams perfParams)
{
try
{
log.info("params = " + perfParams);
init(perfParams.isSessionTransacted());
- MessageProducer producer = session.createProducer(queue);
- producer.setDisableMessageID(true);
- producer.setDisableMessageTimestamp(true);
- producer.setDeliveryMode(perfParams.getDeliveryMode());
- scheduler.scheduleAtFixedRate(command, perfParams.getSamplePeriod(), perfParams.getSamplePeriod(), TimeUnit.SECONDS);
- BytesMessage bytesMessage = session.createBytesMessage();
- byte[] payload = new byte[1024];
- bytesMessage.writeBytes(payload);
- boolean committed = false;
- for (int i = 1; i <= perfParams.getNoOfMessagesToSend(); i++)
+
+ final CountDownLatch startSignal = new CountDownLatch(1);
+ final CountDownLatch endSignal = new CountDownLatch(perfParams.getNumberOfSenders());
+
+ for (int i = 0; i < perfParams.getNumberOfSenders(); i++)
{
- producer.send(bytesMessage);
- messageCount++;
- if (perfParams.isSessionTransacted())
- {
- if (messageCount % perfParams.getTransactionBatchSize() == 0)
+ Thread sender = new Thread() {
+ public void run()
{
- session.commit();
- committed = true;
+ try
+ {
+ startSignal.await();
+ sendMessages(perfParams);
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ } finally
+ {
+ endSignal.countDown();
+ }
}
- else
- {
- committed = false;
- }
- }
+ };
+ sender.start();
}
- if (perfParams.isSessionTransacted() && !committed)
- {
- session.commit();
- }
+
+ scheduler.scheduleAtFixedRate(command, perfParams.getSamplePeriod(), perfParams.getSamplePeriod(), TimeUnit.SECONDS);
+ startSignal.countDown();
+ endSignal.await();
scheduler.shutdownNow();
log.info("average: " + (command.getAverage() / perfParams.getSamplePeriod()) + " msg/s");
}
@@ -142,6 +148,40 @@
}
}
+ private void sendMessages(PerfParams perfParams) throws JMSException
+ {
+ MessageProducer producer = session.createProducer(queue);
+ producer.setDisableMessageID(true);
+ producer.setDisableMessageTimestamp(true);
+ producer.setDeliveryMode(perfParams.getDeliveryMode());
+ BytesMessage bytesMessage = session.createBytesMessage();
+ byte[] payload = new byte[1024];
+ bytesMessage.writeBytes(payload);
+
+ boolean committed = false;
+ for (int i = 1; i <= (perfParams.getNoOfMessagesToSend() / perfParams.getNumberOfSenders()); i++)
+ {
+ producer.send(bytesMessage);
+ messageCount.incrementAndGet();
+ if (perfParams.isSessionTransacted())
+ {
+ if (messageCount.longValue() % perfParams.getTransactionBatchSize() == 0)
+ {
+ session.commit();
+ committed = true;
+ }
+ else
+ {
+ committed = false;
+ }
+ }
+ }
+ if (perfParams.isSessionTransacted() && !committed)
+ {
+ session.commit();
+ }
+ }
+
public void runListener(final PerfParams perfParams)
{
try
@@ -212,9 +252,9 @@
try
{
BytesMessage bm = (BytesMessage) message;
- messageCount++;
+ messageCount.incrementAndGet();
boolean committed = checkCommit();
- if (messageCount == perfParams.getNoOfMessagesToSend())
+ if (messageCount.longValue() == perfParams.getNoOfMessagesToSend())
{
if (!committed)
{
@@ -235,7 +275,7 @@
{
if (perfParams.isSessionTransacted())
{
- if (messageCount % perfParams.getTransactionBatchSize() == 0)
+ if (messageCount.longValue() % perfParams.getTransactionBatchSize() == 0)
{
session.commit();
@@ -253,11 +293,10 @@
{
private static final int IGNORED_SAMPLES = 4;
- int sampleCount = 0;
- int ignoredCount = 0;
+ long sampleCount = 0;
+ AtomicLong ignoredCount = new AtomicLong(0);
long startTime = 0;
-
long samplesTaken = 0;
public void run()
@@ -267,19 +306,19 @@
startTime = System.currentTimeMillis();
}
long elapsedTime = (System.currentTimeMillis() - startTime) / 1000; // in s
- int lastCount = sampleCount;
- sampleCount = messageCount;
+ long lastCount = sampleCount;
+ sampleCount = messageCount.longValue();
if (samplesTaken >= IGNORED_SAMPLES)
{
info(elapsedTime, sampleCount, sampleCount - lastCount, false);
} else {
info(elapsedTime, sampleCount, sampleCount - lastCount, true);
- ignoredCount += (sampleCount - lastCount);
+ ignoredCount.addAndGet(sampleCount - lastCount);
}
samplesTaken++;
}
- public void info(long elapsedTime, int totalCount, int sampleCount, boolean ignored)
+ public void info(long elapsedTime, long totalCount, long sampleCount, boolean ignored)
{
String message = String.format("time elapsed: %2ds, message count: %7d, this period: %5d %s",
elapsedTime, totalCount, sampleCount, ignored ? "[IGNORED]" : "");
@@ -288,7 +327,7 @@
public long getAverage()
{
- return (sampleCount - ignoredCount)/(samplesTaken - IGNORED_SAMPLES);
+ return (sampleCount - ignoredCount.longValue())/(samplesTaken - IGNORED_SAMPLES);
}
}
Modified: trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java 2008-05-22 14:28:16 UTC (rev 4283)
+++ trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java 2008-05-22 15:14:14 UTC (rev 4284)
@@ -29,12 +29,23 @@
*/
public class PerfParams implements Serializable
{
+ int numberOfSender = 1;
int noOfMessagesToSend = 1000;
long samplePeriod = 1; // in seconds
int deliveryMode = DeliveryMode.NON_PERSISTENT;
boolean isSessionTransacted = false;
int transactionBatchSize = 5000;
+ public int getNumberOfSenders()
+ {
+ return numberOfSender;
+ }
+
+ public void setNumberOfSender(int numberOfSender)
+ {
+ this.numberOfSender = numberOfSender;
+ }
+
public int getNoOfMessagesToSend()
{
return noOfMessagesToSend;
@@ -88,7 +99,7 @@
public String toString()
{
- return "message to send = " + noOfMessagesToSend + " samplePeriod = " + samplePeriod + "s" + " DeliveryMode = " +
+ return "number of senders=" + numberOfSender + ", message to send = " + noOfMessagesToSend + " samplePeriod = " + samplePeriod + "s" + " DeliveryMode = " +
(deliveryMode == DeliveryMode.PERSISTENT?"PERSISTENT":"NON_PERSISTENT") + " session transacted = " + isSessionTransacted +
(isSessionTransacted?" transaction batch size = " + transactionBatchSize:"");
}
More information about the jboss-cvs-commits
mailing list