[hornetq-commits] JBoss hornetq SVN: r8618 - in trunk/examples/core/perf: server0 and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 8 05:36:45 EST 2009


Author: timfox
Date: 2009-12-08 05:36:45 -0500 (Tue, 08 Dec 2009)
New Revision: 8618

Modified:
   trunk/examples/core/perf/perf.properties
   trunk/examples/core/perf/server0/hornetq-configuration.xml
   trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java
   trunk/examples/core/perf/src/org/hornetq/core/example/PerfParams.java
Log:
added send acks option to perf test

Modified: trunk/examples/core/perf/perf.properties
===================================================================
--- trunk/examples/core/perf/perf.properties	2009-12-08 10:23:43 UTC (rev 8617)
+++ trunk/examples/core/perf/perf.properties	2009-12-08 10:36:45 UTC (rev 8618)
@@ -1,5 +1,5 @@
-num-messages=1000000
-num-warmup-messages=0
+num-messages=100000
+num-warmup-messages=1000
 message-size=1024
 durable=false
 transacted=false
@@ -12,9 +12,10 @@
 port=5445
 tcp-buffer=2048576
 tcp-no-delay=false
-confirmation-window=-1
+confirmation-window=1048576
 producer-window=1048576
 consumer-window=1048576
 pre-ack=false
 block-ack=false
 block-persistent=false
+use-send-acks=true

Modified: trunk/examples/core/perf/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/hornetq-configuration.xml	2009-12-08 10:23:43 UTC (rev 8617)
+++ trunk/examples/core/perf/server0/hornetq-configuration.xml	2009-12-08 10:36:45 UTC (rev 8618)
@@ -15,10 +15,10 @@
    
    <security-enabled>false</security-enabled>
    
-   <persistence-enabled>false</persistence-enabled>
+   <persistence-enabled>true</persistence-enabled>
 
-   <journal-sync-non-transactional>false</journal-sync-non-transactional>
-   <journal-sync-transactional>false</journal-sync-transactional>
+   <journal-sync-non-transactional>true</journal-sync-non-transactional>
+   <journal-sync-transactional>true</journal-sync-transactional>
    <journal-type>ASYNCIO</journal-type>
    <journal-min-files>20</journal-min-files>
    <journal-buffer-timeout>20000</journal-buffer-timeout>
@@ -33,11 +33,13 @@
 	   </queue>
    </queues>
 
+<!--
    <address-settings>      
       <address-setting match="perfAddress">
          <max-size-bytes>10485760</max-size-bytes>
          <address-full-policy>BLOCK</address-full-policy>
       </address-setting>
    </address-settings>
+-->
 
 </configuration>

Modified: trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java
===================================================================
--- trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java	2009-12-08 10:23:43 UTC (rev 8617)
+++ trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java	2009-12-08 10:36:45 UTC (rev 8618)
@@ -28,8 +28,10 @@
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.ClientSessionFactory;
 import org.hornetq.core.client.MessageHandler;
+import org.hornetq.core.client.SendAcknowledgementHandler;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.message.Message;
 import org.hornetq.integration.transports.netty.NettyConnectorFactory;
 import org.hornetq.integration.transports.netty.TransportConstants;
 import org.hornetq.utils.TokenBucketLimiter;
@@ -123,6 +125,7 @@
       int consumerWindowSize = Integer.valueOf(props.getProperty("consumer-window"));
       boolean blockOnACK = Boolean.valueOf(props.getProperty("block-ack", "false"));
       boolean blockOnPersistent = Boolean.valueOf(props.getProperty("block-persistent", "false"));
+      boolean useSendAcks = Boolean.valueOf(props.getProperty("use-send-acks", "false"));
 
       PerfBase.log.info("num-messages: " + noOfMessages);
       PerfBase.log.info("num-warmup-messages: " + noOfWarmupMessages);
@@ -144,7 +147,13 @@
       PerfBase.log.info("consumer-window: " + consumerWindowSize);
       PerfBase.log.info("block-ack:" + blockOnACK);
       PerfBase.log.info("block-persistent:" + blockOnPersistent);
+      PerfBase.log.info("use-send-acks:" + useSendAcks);
 
+      if (useSendAcks && confirmationWindowSize < 1)
+      {
+         throw new IllegalArgumentException("If you use send acks, then need to set confirmation-window-size to a positive integer");
+      }
+
       PerfParams perfParams = new PerfParams();
       perfParams.setNoOfMessagesToSend(noOfMessages);
       perfParams.setNoOfWarmupMessages(noOfWarmupMessages);
@@ -166,6 +175,7 @@
       perfParams.setConsumerWindow(consumerWindowSize);
       perfParams.setBlockOnACK(blockOnACK);
       perfParams.setBlockOnPersistent(blockOnPersistent);
+      perfParams.setUseSendAcks(useSendAcks);
 
       return perfParams;
    }
@@ -179,8 +189,6 @@
 
    private ClientSessionFactory factory;
 
-   private ClientSession session;
-
    private long start;
 
    private void init(final boolean transacted, final String queueName) throws Exception
@@ -204,8 +212,6 @@
 
       factory.setBlockOnAcknowledge(perfParams.isBlockOnACK());
       factory.setBlockOnPersistentSend(perfParams.isBlockOnPersistent());
-
-      session = factory.createSession(!transacted, !transacted);
    }
 
    private void displayAverage(final long numberOfMessages, final long start, final long end)
@@ -223,6 +229,7 @@
       try
       {
          PerfBase.log.info("params = " + perfParams);
+
          init(perfParams.isSessionTransacted(), perfParams.getQueueName());
 
          if (perfParams.isDrainQueue())
@@ -238,7 +245,8 @@
                       perfParams.isSessionTransacted(),
                       false,
                       perfParams.getThrottleRate(),
-                      perfParams.getMessageSize());
+                      perfParams.getMessageSize(),
+                      perfParams.isUseSendAcks());
          PerfBase.log.info("warmed up");
          start = System.currentTimeMillis();
          sendMessages(perfParams.getNoOfMessagesToSend(),
@@ -247,7 +255,8 @@
                       perfParams.isSessionTransacted(),
                       true,
                       perfParams.getThrottleRate(),
-                      perfParams.getMessageSize());
+                      perfParams.getMessageSize(),
+                      perfParams.isUseSendAcks());
          long end = System.currentTimeMillis();
          displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
       }
@@ -255,29 +264,18 @@
       {
          e.printStackTrace();
       }
-      finally
-      {
-         if (session != null)
-         {
-            try
-            {
-               session.close();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      }
    }
 
    protected void runListener()
    {
+      ClientSession session = null;
+
       try
       {
-
          init(perfParams.isSessionTransacted(), perfParams.getQueueName());
 
+         session = factory.createSession(!perfParams.isSessionTransacted(), !perfParams.isSessionTransacted());
+
          if (perfParams.isDrainQueue())
          {
             drainQueue();
@@ -290,7 +288,7 @@
          PerfBase.log.info("READY!!!");
 
          CountDownLatch countDownLatch = new CountDownLatch(1);
-         consumer.setMessageHandler(new PerfListener(countDownLatch, perfParams));
+         consumer.setMessageHandler(new PerfListener(session, countDownLatch, perfParams));
          countDownLatch.await();
          long end = System.currentTimeMillis();
          // start was set on the first received message
@@ -319,29 +317,42 @@
    private void drainQueue() throws Exception
    {
       PerfBase.log.info("Draining queue");
-      ClientConsumer consumer = session.createConsumer(perfParams.getQueueName());
 
-      session.start();
+      ClientSession session = null;
 
-      ClientMessage message = null;
-
-      int count = 0;
-      do
+      try
       {
-         message = consumer.receive(3000);
+         session = factory.createSession();
 
-         if (message != null)
+         ClientConsumer consumer = session.createConsumer(perfParams.getQueueName());
+
+         session.start();
+
+         ClientMessage message = null;
+
+         int count = 0;
+         do
          {
-            message.acknowledge();
+            message = consumer.receive(3000);
 
-            count++;
+            if (message != null)
+            {
+               message.acknowledge();
+
+               count++;
+            }
          }
+         while (message != null);
+
+         PerfBase.log.info("Drained " + count + " messages");
       }
-      while (message != null);
-
-      consumer.close();
-
-      PerfBase.log.info("Drained " + count + " messages");
+      finally
+      {
+         if (session != null)
+         {
+            session.close();
+         }
+      }
    }
 
    private void sendMessages(final int numberOfMessages,
@@ -350,53 +361,97 @@
                              final boolean transacted,
                              final boolean display,
                              final int throttleRate,
-                             final int messageSize) throws Exception
+                             final int messageSize,
+                             final boolean useSendAcks) throws Exception
    {
-      ClientProducer producer = session.createProducer(perfParams.getAddress());
+      ClientSession session = null;
 
-      ClientMessage message = session.createClientMessage(durable);
+      try
+      {
+         session = factory.createSession(!transacted, !transacted);
 
-      byte[] payload = PerfBase.randomByteArray(messageSize);
+         CountDownLatch theLatch = null;
 
-      message.getBodyBuffer().writeBytes(payload);
+         if (useSendAcks)
+         {
+            final CountDownLatch latch = new CountDownLatch(numberOfMessages);
 
-      final int modulo = 2000;
+            class MySendAckHandler implements SendAcknowledgementHandler
+            {
+               public void sendAcknowledged(Message message)
+               {
+                  latch.countDown();
+               }
+            }
+            
+            session.setSendAcknowledgementHandler(new MySendAckHandler());
 
-      TokenBucketLimiter tbl = throttleRate != -1 ? new TokenBucketLimiterImpl(throttleRate, false) : null;
+            theLatch = latch;
+         }
 
-      boolean committed = false;
-      for (int i = 1; i <= numberOfMessages; i++)
-      {
-         producer.send(message);
+         ClientProducer producer = session.createProducer(perfParams.getAddress());
 
-         if (transacted)
+         ClientMessage message = session.createClientMessage(durable);
+
+         byte[] payload = PerfBase.randomByteArray(messageSize);
+
+         message.getBodyBuffer().writeBytes(payload);
+
+         final int modulo = 2000;
+
+         TokenBucketLimiter tbl = throttleRate != -1 ? new TokenBucketLimiterImpl(throttleRate, false) : null;
+
+         boolean committed = false;
+
+         for (int i = 1; i <= numberOfMessages; i++)
          {
-            if (i % txBatchSize == 0)
+            producer.send(message);
+
+            if (transacted)
             {
-               session.commit();
-               committed = true;
+               if (i % txBatchSize == 0)
+               {
+                  session.commit();
+                  committed = true;
+               }
+               else
+               {
+                  committed = false;
+               }
             }
-            else
+            if (display && i % modulo == 0)
             {
-               committed = false;
+               double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
+               PerfBase.log.info(String.format("sent %6d messages in %2.2fs", i, duration));
             }
+
+            // log.info("sent message " + i);
+
+            if (tbl != null)
+            {
+               tbl.limit();
+            }
          }
-         if (display && i % modulo == 0)
+
+         if (transacted && !committed)
          {
-            double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
-            PerfBase.log.info(String.format("sent %6d messages in %2.2fs", i, duration));
+            session.commit();
          }
 
-         // log.info("sent message " + i);
+         if (useSendAcks)
+         {
+            // Must close the session first since this flushes the confirmations
+            session.close();
 
-         if (tbl != null)
-         {
-            tbl.limit();
+            theLatch.await();
          }
       }
-      if (transacted && !committed)
+      finally
       {
-         session.commit();
+         if (session != null)
+         {
+            session.close();
+         }
       }
    }
 
@@ -414,8 +469,11 @@
 
       private final AtomicLong count = new AtomicLong(0);
 
-      public PerfListener(final CountDownLatch countDownLatch, final PerfParams perfParams)
+      private final ClientSession session;
+
+      public PerfListener(final ClientSession session, final CountDownLatch countDownLatch, final PerfParams perfParams)
       {
+         this.session = session;
          this.countDownLatch = countDownLatch;
          this.perfParams = perfParams;
          warmingUp = perfParams.getNoOfWarmupMessages() > 0;
@@ -428,13 +486,13 @@
          {
             if (warmingUp)
             {
-               boolean committed = checkCommit();
+               boolean committed = checkCommit(session);
                if (count.incrementAndGet() == perfParams.getNoOfWarmupMessages())
                {
                   PerfBase.log.info("warmed up after receiving " + count.longValue() + " msgs");
                   if (!committed)
                   {
-                     checkCommit();
+                     checkCommit(session);
                   }
                   warmingUp = false;
                }
@@ -452,12 +510,12 @@
             message.acknowledge();
 
             long currentCount = count.incrementAndGet();
-            boolean committed = checkCommit();
+            boolean committed = checkCommit(session);
             if (currentCount == perfParams.getNoOfMessagesToSend())
             {
                if (!committed)
                {
-                  checkCommit();
+                  checkCommit(session);
                }
                countDownLatch.countDown();
             }
@@ -473,7 +531,7 @@
          }
       }
 
-      private boolean checkCommit() throws Exception
+      private boolean checkCommit(final ClientSession session) throws Exception
       {
          if (perfParams.isSessionTransacted())
          {

Modified: trunk/examples/core/perf/src/org/hornetq/core/example/PerfParams.java
===================================================================
--- trunk/examples/core/perf/src/org/hornetq/core/example/PerfParams.java	2009-12-08 10:23:43 UTC (rev 8617)
+++ trunk/examples/core/perf/src/org/hornetq/core/example/PerfParams.java	2009-12-08 10:36:45 UTC (rev 8618)
@@ -54,7 +54,7 @@
 
    private boolean preAck;
 
-   private int confirmationWindow;
+   private int confirmationWindow = -1;
 
    private int producerWindow;
 
@@ -63,6 +63,8 @@
    private boolean blockOnPersistent = true;
 
    private boolean blockOnACK = true;
+   
+   private boolean useSendAcks;
 
    public boolean isBlockOnPersistent()
    {
@@ -284,4 +286,14 @@
    {
       this.consumerWindow = consumerWindow;
    }
+
+   public boolean isUseSendAcks()
+   {
+      return useSendAcks;
+   }
+
+   public void setUseSendAcks(boolean useSendAcks)
+   {
+      this.useSendAcks = useSendAcks;
+   }
 }



More information about the hornetq-commits mailing list