Author: timfox
Date: 2009-12-08 05:36:45 -0500 (Tue, 08 Dec 2009)
New Revision: 8618
Modified:
trunk/examples/core/perf/perf.properties
trunk/examples/core/perf/server0/hornetq-configuration.xml
trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java
trunk/examples/core/perf/src/org/hornetq/core/example/PerfParams.java
Log:
added send acks option to perf test
Modified: trunk/examples/core/perf/perf.properties
===================================================================
--- trunk/examples/core/perf/perf.properties 2009-12-08 10:23:43 UTC (rev 8617)
+++ trunk/examples/core/perf/perf.properties 2009-12-08 10:36:45 UTC (rev 8618)
@@ -1,5 +1,5 @@
-num-messages=1000000
-num-warmup-messages=0
+num-messages=100000
+num-warmup-messages=1000
message-size=1024
durable=false
transacted=false
@@ -12,9 +12,10 @@
port=5445
tcp-buffer=2048576
tcp-no-delay=false
-confirmation-window=-1
+confirmation-window=1048576
producer-window=1048576
consumer-window=1048576
pre-ack=false
block-ack=false
block-persistent=false
+use-send-acks=true
Modified: trunk/examples/core/perf/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-12-08 10:23:43 UTC
(rev 8617)
+++ trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-12-08 10:36:45 UTC
(rev 8618)
@@ -15,10 +15,10 @@
<security-enabled>false</security-enabled>
- <persistence-enabled>false</persistence-enabled>
+ <persistence-enabled>true</persistence-enabled>
- <journal-sync-non-transactional>false</journal-sync-non-transactional>
- <journal-sync-transactional>false</journal-sync-transactional>
+ <journal-sync-non-transactional>true</journal-sync-non-transactional>
+ <journal-sync-transactional>true</journal-sync-transactional>
<journal-type>ASYNCIO</journal-type>
<journal-min-files>20</journal-min-files>
<journal-buffer-timeout>20000</journal-buffer-timeout>
@@ -33,11 +33,13 @@
</queue>
</queues>
+<!--
<address-settings>
<address-setting match="perfAddress">
<max-size-bytes>10485760</max-size-bytes>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
+-->
</configuration>
Modified: trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java
===================================================================
--- trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java 2009-12-08
10:23:43 UTC (rev 8617)
+++ trunk/examples/core/perf/src/org/hornetq/core/example/PerfBase.java 2009-12-08
10:36:45 UTC (rev 8618)
@@ -28,8 +28,10 @@
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.client.MessageHandler;
+import org.hornetq.core.client.SendAcknowledgementHandler;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.message.Message;
import org.hornetq.integration.transports.netty.NettyConnectorFactory;
import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.utils.TokenBucketLimiter;
@@ -123,6 +125,7 @@
int consumerWindowSize =
Integer.valueOf(props.getProperty("consumer-window"));
boolean blockOnACK = Boolean.valueOf(props.getProperty("block-ack",
"false"));
boolean blockOnPersistent =
Boolean.valueOf(props.getProperty("block-persistent", "false"));
+ boolean useSendAcks = Boolean.valueOf(props.getProperty("use-send-acks",
"false"));
PerfBase.log.info("num-messages: " + noOfMessages);
PerfBase.log.info("num-warmup-messages: " + noOfWarmupMessages);
@@ -144,7 +147,13 @@
PerfBase.log.info("consumer-window: " + consumerWindowSize);
PerfBase.log.info("block-ack:" + blockOnACK);
PerfBase.log.info("block-persistent:" + blockOnPersistent);
+ PerfBase.log.info("use-send-acks:" + useSendAcks);
+ if (useSendAcks && confirmationWindowSize < 1)
+ {
+ throw new IllegalArgumentException("If you use send acks, then need to set
confirmation-window-size to a positive integer");
+ }
+
PerfParams perfParams = new PerfParams();
perfParams.setNoOfMessagesToSend(noOfMessages);
perfParams.setNoOfWarmupMessages(noOfWarmupMessages);
@@ -166,6 +175,7 @@
perfParams.setConsumerWindow(consumerWindowSize);
perfParams.setBlockOnACK(blockOnACK);
perfParams.setBlockOnPersistent(blockOnPersistent);
+ perfParams.setUseSendAcks(useSendAcks);
return perfParams;
}
@@ -179,8 +189,6 @@
private ClientSessionFactory factory;
- private ClientSession session;
-
private long start;
private void init(final boolean transacted, final String queueName) throws Exception
@@ -204,8 +212,6 @@
factory.setBlockOnAcknowledge(perfParams.isBlockOnACK());
factory.setBlockOnPersistentSend(perfParams.isBlockOnPersistent());
-
- session = factory.createSession(!transacted, !transacted);
}
private void displayAverage(final long numberOfMessages, final long start, final long
end)
@@ -223,6 +229,7 @@
try
{
PerfBase.log.info("params = " + perfParams);
+
init(perfParams.isSessionTransacted(), perfParams.getQueueName());
if (perfParams.isDrainQueue())
@@ -238,7 +245,8 @@
perfParams.isSessionTransacted(),
false,
perfParams.getThrottleRate(),
- perfParams.getMessageSize());
+ perfParams.getMessageSize(),
+ perfParams.isUseSendAcks());
PerfBase.log.info("warmed up");
start = System.currentTimeMillis();
sendMessages(perfParams.getNoOfMessagesToSend(),
@@ -247,7 +255,8 @@
perfParams.isSessionTransacted(),
true,
perfParams.getThrottleRate(),
- perfParams.getMessageSize());
+ perfParams.getMessageSize(),
+ perfParams.isUseSendAcks());
long end = System.currentTimeMillis();
displayAverage(perfParams.getNoOfMessagesToSend(), start, end);
}
@@ -255,29 +264,18 @@
{
e.printStackTrace();
}
- finally
- {
- if (session != null)
- {
- try
- {
- session.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- }
}
protected void runListener()
{
+ ClientSession session = null;
+
try
{
-
init(perfParams.isSessionTransacted(), perfParams.getQueueName());
+ session = factory.createSession(!perfParams.isSessionTransacted(),
!perfParams.isSessionTransacted());
+
if (perfParams.isDrainQueue())
{
drainQueue();
@@ -290,7 +288,7 @@
PerfBase.log.info("READY!!!");
CountDownLatch countDownLatch = new CountDownLatch(1);
- consumer.setMessageHandler(new PerfListener(countDownLatch, perfParams));
+ consumer.setMessageHandler(new PerfListener(session, countDownLatch,
perfParams));
countDownLatch.await();
long end = System.currentTimeMillis();
// start was set on the first received message
@@ -319,29 +317,42 @@
private void drainQueue() throws Exception
{
PerfBase.log.info("Draining queue");
- ClientConsumer consumer = session.createConsumer(perfParams.getQueueName());
- session.start();
+ ClientSession session = null;
- ClientMessage message = null;
-
- int count = 0;
- do
+ try
{
- message = consumer.receive(3000);
+ session = factory.createSession();
- if (message != null)
+ ClientConsumer consumer = session.createConsumer(perfParams.getQueueName());
+
+ session.start();
+
+ ClientMessage message = null;
+
+ int count = 0;
+ do
{
- message.acknowledge();
+ message = consumer.receive(3000);
- count++;
+ if (message != null)
+ {
+ message.acknowledge();
+
+ count++;
+ }
}
+ while (message != null);
+
+ PerfBase.log.info("Drained " + count + " messages");
}
- while (message != null);
-
- consumer.close();
-
- PerfBase.log.info("Drained " + count + " messages");
+ finally
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
}
private void sendMessages(final int numberOfMessages,
@@ -350,53 +361,97 @@
final boolean transacted,
final boolean display,
final int throttleRate,
- final int messageSize) throws Exception
+ final int messageSize,
+ final boolean useSendAcks) throws Exception
{
- ClientProducer producer = session.createProducer(perfParams.getAddress());
+ ClientSession session = null;
- ClientMessage message = session.createClientMessage(durable);
+ try
+ {
+ session = factory.createSession(!transacted, !transacted);
- byte[] payload = PerfBase.randomByteArray(messageSize);
+ CountDownLatch theLatch = null;
- message.getBodyBuffer().writeBytes(payload);
+ if (useSendAcks)
+ {
+ final CountDownLatch latch = new CountDownLatch(numberOfMessages);
- final int modulo = 2000;
+ class MySendAckHandler implements SendAcknowledgementHandler
+ {
+ public void sendAcknowledged(Message message)
+ {
+ latch.countDown();
+ }
+ }
+
+ session.setSendAcknowledgementHandler(new MySendAckHandler());
- TokenBucketLimiter tbl = throttleRate != -1 ? new
TokenBucketLimiterImpl(throttleRate, false) : null;
+ theLatch = latch;
+ }
- boolean committed = false;
- for (int i = 1; i <= numberOfMessages; i++)
- {
- producer.send(message);
+ ClientProducer producer = session.createProducer(perfParams.getAddress());
- if (transacted)
+ ClientMessage message = session.createClientMessage(durable);
+
+ byte[] payload = PerfBase.randomByteArray(messageSize);
+
+ message.getBodyBuffer().writeBytes(payload);
+
+ final int modulo = 2000;
+
+ TokenBucketLimiter tbl = throttleRate != -1 ? new
TokenBucketLimiterImpl(throttleRate, false) : null;
+
+ boolean committed = false;
+
+ for (int i = 1; i <= numberOfMessages; i++)
{
- if (i % txBatchSize == 0)
+ producer.send(message);
+
+ if (transacted)
{
- session.commit();
- committed = true;
+ if (i % txBatchSize == 0)
+ {
+ session.commit();
+ committed = true;
+ }
+ else
+ {
+ committed = false;
+ }
}
- else
+ if (display && i % modulo == 0)
{
- committed = false;
+ double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
+ PerfBase.log.info(String.format("sent %6d messages in %2.2fs",
i, duration));
}
+
+ // log.info("sent message " + i);
+
+ if (tbl != null)
+ {
+ tbl.limit();
+ }
}
- if (display && i % modulo == 0)
+
+ if (transacted && !committed)
{
- double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
- PerfBase.log.info(String.format("sent %6d messages in %2.2fs", i,
duration));
+ session.commit();
}
- // log.info("sent message " + i);
+ if (useSendAcks)
+ {
+ // Must close the session first since this flushes the confirmations
+ session.close();
- if (tbl != null)
- {
- tbl.limit();
+ theLatch.await();
}
}
- if (transacted && !committed)
+ finally
{
- session.commit();
+ if (session != null)
+ {
+ session.close();
+ }
}
}
@@ -414,8 +469,11 @@
private final AtomicLong count = new AtomicLong(0);
- public PerfListener(final CountDownLatch countDownLatch, final PerfParams
perfParams)
+ private final ClientSession session;
+
+ public PerfListener(final ClientSession session, final CountDownLatch
countDownLatch, final PerfParams perfParams)
{
+ this.session = session;
this.countDownLatch = countDownLatch;
this.perfParams = perfParams;
warmingUp = perfParams.getNoOfWarmupMessages() > 0;
@@ -428,13 +486,13 @@
{
if (warmingUp)
{
- boolean committed = checkCommit();
+ boolean committed = checkCommit(session);
if (count.incrementAndGet() == perfParams.getNoOfWarmupMessages())
{
PerfBase.log.info("warmed up after receiving " +
count.longValue() + " msgs");
if (!committed)
{
- checkCommit();
+ checkCommit(session);
}
warmingUp = false;
}
@@ -452,12 +510,12 @@
message.acknowledge();
long currentCount = count.incrementAndGet();
- boolean committed = checkCommit();
+ boolean committed = checkCommit(session);
if (currentCount == perfParams.getNoOfMessagesToSend())
{
if (!committed)
{
- checkCommit();
+ checkCommit(session);
}
countDownLatch.countDown();
}
@@ -473,7 +531,7 @@
}
}
- private boolean checkCommit() throws Exception
+ private boolean checkCommit(final ClientSession session) throws Exception
{
if (perfParams.isSessionTransacted())
{
Modified: trunk/examples/core/perf/src/org/hornetq/core/example/PerfParams.java
===================================================================
--- trunk/examples/core/perf/src/org/hornetq/core/example/PerfParams.java 2009-12-08
10:23:43 UTC (rev 8617)
+++ trunk/examples/core/perf/src/org/hornetq/core/example/PerfParams.java 2009-12-08
10:36:45 UTC (rev 8618)
@@ -54,7 +54,7 @@
private boolean preAck;
- private int confirmationWindow;
+ private int confirmationWindow = -1;
private int producerWindow;
@@ -63,6 +63,8 @@
private boolean blockOnPersistent = true;
private boolean blockOnACK = true;
+
+ private boolean useSendAcks;
public boolean isBlockOnPersistent()
{
@@ -284,4 +286,14 @@
{
this.consumerWindow = consumerWindow;
}
+
+ public boolean isUseSendAcks()
+ {
+ return useSendAcks;
+ }
+
+ public void setUseSendAcks(boolean useSendAcks)
+ {
+ this.useSendAcks = useSendAcks;
+ }
}