[jboss-cvs] JBoss Messaging SVN: r4284 - in trunk/examples/jms: src/org/jboss/jms/example and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu May 22 11:14:15 EDT 2008


Author: jmesnil
Date: 2008-05-22 11:14:14 -0400 (Thu, 22 May 2008)
New Revision: 4284

Modified:
   trunk/examples/jms/build.xml
   trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
   trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
Log:
the numbers of senders used by perfSender is configurable using Ant prop sender.count (1 by default)

Modified: trunk/examples/jms/build.xml
===================================================================
--- trunk/examples/jms/build.xml	2008-05-22 14:28:16 UTC (rev 4283)
+++ trunk/examples/jms/build.xml	2008-05-22 15:14:14 UTC (rev 4284)
@@ -47,6 +47,7 @@
    <property name="sample.period" value="1"/>
    <property name="sess.trans" value="false"/>
    <property name="sess.trans.size" value="1"/>
+   <property name="sender.count" value="2"/>
 
    <path id="compile.classpath">
       <fileset dir="${lib.dir}">
@@ -138,6 +139,7 @@
       <echo>* available parameters (-Dmessage.count=1000)                                </echo>
       <echo>*                                                                            </echo>
       <echo>*     param            description                default          current</echo>
+ 	  <echo>*  sender.count     number of senders                1             ${sender.count}</echo>
    	  <echo>*  message.count    number of messages            200000           ${message.count}</echo>
       <echo>*  delivery.mode  PERSISTENT/NON_PERSISTENT    NON_PERSISTENT      ${delivery.mode}</echo>
       <echo>*  sample.period  timing period in seconds       1 second          ${sample.period}</echo>
@@ -156,6 +158,7 @@
          <arg value="${sample.period}"/>
          <arg value="${sess.trans}"/>
          <arg value="${sess.trans.size}"/>
+         <arg value="${sender.count}"/>
       </java>
    </target>
 	

Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-05-22 14:28:16 UTC (rev 4283)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-05-22 15:14:14 UTC (rev 4284)
@@ -27,7 +27,9 @@
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 import javax.jms.*;
+
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * a performance example that can be used to gather simple performance figures.
@@ -41,7 +43,7 @@
    private static Logger log = Logger.getLogger(PerfExample.class);
    private Queue queue;
    private Connection connection;
-   private int messageCount = 0;
+   private AtomicLong messageCount = new AtomicLong(0);
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    private Session session;
    private Sampler command = new Sampler();
@@ -49,19 +51,26 @@
    public static void main(String[] args)
    {
       PerfExample perfExample = new PerfExample();
-      
+
       int noOfMessages = Integer.parseInt(args[1]);
       int deliveryMode = args[2].equalsIgnoreCase("persistent")? DeliveryMode.PERSISTENT: DeliveryMode.NON_PERSISTENT;
       long samplePeriod = Long.parseLong(args[3]);
       boolean transacted = Boolean.parseBoolean(args[4]);
       log.info("Transacted:" + transacted);
       int transactionBatchSize = Integer.parseInt(args[5]);
+      int numberOfSenders = 1;
+      if (args.length >= 7)
+      {
+         numberOfSenders = Integer.parseInt(args[6]);
+      }
+
       PerfParams perfParams = new PerfParams();
       perfParams.setNoOfMessagesToSend(noOfMessages);
       perfParams.setDeliveryMode(deliveryMode);
       perfParams.setSamplePeriod(samplePeriod);
       perfParams.setSessionTransacted(transacted);
       perfParams.setTransactionBatchSize(transactionBatchSize);
+      perfParams.setNumberOfSender(numberOfSenders);
       
       if (args[0].equalsIgnoreCase("-l"))
       {
@@ -69,8 +78,7 @@
       }
       else
       {
-         
-         perfExample.runSender(perfParams);
+         perfExample.runSenders(perfParams);
       }
 
    }
@@ -85,42 +93,40 @@
       session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.DUPS_OK_ACKNOWLEDGE);
    }
    
-   public void runSender(PerfParams perfParams)
+   public void runSenders(final PerfParams perfParams)
    {
       try
       {
          log.info("params = " + perfParams);
          init(perfParams.isSessionTransacted());
-         MessageProducer producer = session.createProducer(queue);
-         producer.setDisableMessageID(true);
-         producer.setDisableMessageTimestamp(true);
-         producer.setDeliveryMode(perfParams.getDeliveryMode());
-         scheduler.scheduleAtFixedRate(command, perfParams.getSamplePeriod(), perfParams.getSamplePeriod(), TimeUnit.SECONDS);
-         BytesMessage bytesMessage = session.createBytesMessage();
-         byte[] payload = new byte[1024];
-         bytesMessage.writeBytes(payload);
-         boolean committed = false;
-         for (int i = 1; i <= perfParams.getNoOfMessagesToSend(); i++)
+         
+         final CountDownLatch startSignal = new CountDownLatch(1);
+         final CountDownLatch endSignal = new CountDownLatch(perfParams.getNumberOfSenders());
+         
+         for (int i = 0; i < perfParams.getNumberOfSenders(); i++)
          {
-            producer.send(bytesMessage);
-            messageCount++;
-            if (perfParams.isSessionTransacted())
-            {
-               if (messageCount % perfParams.getTransactionBatchSize() == 0)
+            Thread sender = new Thread() {
+               public void run()
                {
-                  session.commit();
-                  committed = true;
+                  try
+                  {
+                     startSignal.await();
+                     sendMessages(perfParams);
+                  } catch (Exception e)
+                  {
+                     e.printStackTrace();
+                  } finally
+                  {
+                     endSignal.countDown();
+                  }
                }
-               else
-               {
-                  committed = false;
-               }
-            }
+            };
+            sender.start();
          }
-         if (perfParams.isSessionTransacted() && !committed)
-         {
-            session.commit();
-         }
+
+         scheduler.scheduleAtFixedRate(command, perfParams.getSamplePeriod(), perfParams.getSamplePeriod(), TimeUnit.SECONDS);
+         startSignal.countDown();
+         endSignal.await();
          scheduler.shutdownNow();
          log.info("average: " + (command.getAverage() / perfParams.getSamplePeriod()) + " msg/s");
       }
@@ -142,6 +148,40 @@
       }
    }
 
+   private void sendMessages(PerfParams perfParams) throws JMSException
+   {
+      MessageProducer producer = session.createProducer(queue);
+      producer.setDisableMessageID(true);
+      producer.setDisableMessageTimestamp(true);
+      producer.setDeliveryMode(perfParams.getDeliveryMode());
+      BytesMessage bytesMessage = session.createBytesMessage();
+      byte[] payload = new byte[1024];
+      bytesMessage.writeBytes(payload);
+
+      boolean committed = false;
+      for (int i = 1; i <= (perfParams.getNoOfMessagesToSend() / perfParams.getNumberOfSenders()); i++)
+      {
+         producer.send(bytesMessage);
+         messageCount.incrementAndGet();
+         if (perfParams.isSessionTransacted())
+         {
+            if (messageCount.longValue() % perfParams.getTransactionBatchSize() == 0)
+            {
+               session.commit();
+               committed = true;
+            }
+            else
+            {
+               committed = false;
+            }
+         }
+      }
+      if (perfParams.isSessionTransacted() && !committed)
+      {
+         session.commit();
+      }
+   }
+
    public void runListener(final PerfParams perfParams)
    {
       try
@@ -212,9 +252,9 @@
          try
          {
             BytesMessage bm = (BytesMessage) message;
-            messageCount++;      
+            messageCount.incrementAndGet();      
             boolean committed = checkCommit();
-            if (messageCount == perfParams.getNoOfMessagesToSend())
+            if (messageCount.longValue() == perfParams.getNoOfMessagesToSend())
             {
                if (!committed)
                {
@@ -235,7 +275,7 @@
       {
          if (perfParams.isSessionTransacted())
          {
-            if (messageCount % perfParams.getTransactionBatchSize() == 0)
+            if (messageCount.longValue() % perfParams.getTransactionBatchSize() == 0)
             {
                session.commit();
                
@@ -253,11 +293,10 @@
    {
       private static final int IGNORED_SAMPLES = 4;
       
-      int sampleCount = 0;
-      int ignoredCount = 0;
+      long sampleCount = 0;
+      AtomicLong ignoredCount = new AtomicLong(0);
       
       long startTime = 0;
-
       long samplesTaken = 0;
 
       public void run()
@@ -267,19 +306,19 @@
             startTime = System.currentTimeMillis();
          }
          long elapsedTime = (System.currentTimeMillis() - startTime) / 1000; // in s
-         int lastCount = sampleCount;
-         sampleCount = messageCount;
+         long lastCount = sampleCount;
+         sampleCount = messageCount.longValue();
          if (samplesTaken >= IGNORED_SAMPLES)
          {
             info(elapsedTime, sampleCount, sampleCount - lastCount, false);
          } else {
             info(elapsedTime, sampleCount, sampleCount - lastCount, true);
-            ignoredCount += (sampleCount - lastCount);            
+            ignoredCount.addAndGet(sampleCount - lastCount);            
          }
          samplesTaken++;
       }
 
-      public void info(long elapsedTime, int totalCount, int sampleCount, boolean ignored)
+      public void info(long elapsedTime, long totalCount, long sampleCount, boolean ignored)
       {
          String message = String.format("time elapsed: %2ds, message count: %7d, this period: %5d %s", 
                elapsedTime, totalCount, sampleCount, ignored ? "[IGNORED]" : "");
@@ -288,7 +327,7 @@
       
       public long getAverage()
       {
-         return (sampleCount - ignoredCount)/(samplesTaken - IGNORED_SAMPLES);
+         return (sampleCount - ignoredCount.longValue())/(samplesTaken - IGNORED_SAMPLES);
       }
 
    }

Modified: trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java	2008-05-22 14:28:16 UTC (rev 4283)
+++ trunk/examples/jms/src/org/jboss/jms/util/PerfParams.java	2008-05-22 15:14:14 UTC (rev 4284)
@@ -29,12 +29,23 @@
 */
 public class PerfParams implements Serializable
 {
+   int numberOfSender = 1;
    int noOfMessagesToSend = 1000;
    long samplePeriod = 1; // in seconds
    int deliveryMode  = DeliveryMode.NON_PERSISTENT;
    boolean isSessionTransacted = false;
    int transactionBatchSize = 5000;
 
+   public int getNumberOfSenders()
+   {
+      return numberOfSender;
+   }
+   
+   public void setNumberOfSender(int numberOfSender)
+   {
+      this.numberOfSender = numberOfSender;
+   }
+   
    public int getNoOfMessagesToSend()
    {
       return noOfMessagesToSend;
@@ -88,7 +99,7 @@
 
    public String toString()
    {
-      return "message to send = " + noOfMessagesToSend + " samplePeriod = " + samplePeriod + "s" + " DeliveryMode = " +
+      return "number of senders=" + numberOfSender + ", message to send = " + noOfMessagesToSend + " samplePeriod = " + samplePeriod + "s" + " DeliveryMode = " +
               (deliveryMode == DeliveryMode.PERSISTENT?"PERSISTENT":"NON_PERSISTENT") + " session transacted = " + isSessionTransacted +
               (isSessionTransacted?" transaction batch size = " + transactionBatchSize:"");
    }




More information about the jboss-cvs-commits mailing list