[jboss-cvs] JBossAS SVN: r57904 - in trunk/testsuite/src/main/org/jboss/test/jbossmessaging: . perf
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 27 17:19:43 EDT 2006
Author: rachmatowicz at jboss.com
Date: 2006-10-27 17:19:43 -0400 (Fri, 27 Oct 2006)
New Revision: 57904
Added:
trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/
trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSPerfStressTestCase.java
trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSReconnectStressTestCase.java
trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/ReceiveNackClientStressTestCase.java
trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/SendReplyPerfStressTestCase.java
Log:
More refactored generic JMS test cases
Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSPerfStressTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSPerfStressTestCase.java 2006-10-27 18:39:33 UTC (rev 57903)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSPerfStressTestCase.java 2006-10-27 21:19:43 UTC (rev 57904)
@@ -0,0 +1,797 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.jbossmessaging.perf;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.Queue;
+import javax.naming.Context;
+
+import org.apache.log4j.Category;
+import org.jboss.test.jbossmessaging.JMSTestCase;
+/**
+ * JMSPerfStressTestCase.java Some simple tests of JMS provider
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author
+ * @version
+ */
+
+public class JMSPerfStressTestCase extends JMSTestCase
+{
+
+ // Provider specific
+ static String TOPIC_FACTORY = "ConnectionFactory";
+ static String QUEUE_FACTORY = "ConnectionFactory";
+
+ static String TEST_QUEUE = "queue/testQueue";
+ static String TEST_TOPIC = "topic/testTopic";
+
+ // static int PERFORMANCE_TEST_ITERATIONS = 1000;
+ static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10 * 1024];
+
+ static int TRANS_NONE = 0;
+ static int TRANS_INDIVIDUAL = 1;
+ static int TRANS_TOTAL = 2;
+ static String[] TRANS_DESC = {"NOT", "individually", "totally"};
+
+ //JMSProviderAdapter providerAdapter;
+ static Context context;
+ static QueueConnection queueConnection;
+ static TopicConnection topicConnection;
+
+ /**
+ * Constructor for the JMSPerfStressTestCase object
+ *
+ * @param name Description of Parameter
+ * @exception Exception Description of Exception
+ */
+ public JMSPerfStressTestCase(String name) throws Exception
+ {
+ super(name);
+ }
+
+
+ /**
+ * #Description of the Method
+ *
+ * @param transacted Description of Parameter
+ * @param persistence Description of Parameter
+ * @exception Exception Description of Exception
+ */
+ public void runAsynchQueuePerformance(final int transacted, final int persistence) throws Exception
+ {
+ {
+ queueConnection.start();
+ drainQueue();
+ queueConnection.stop();
+ }
+ final int iterationCount = getIterationCount();
+ final Category log = getLog();
+
+ Thread sendThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JBossMQPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+ QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueSender sender = session.createSender(queue);
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ //sender.send(queue, message, persistence, 4, 0);
+ sender.send(message, persistence, 4, 0);
+ //getLog().debug(" Sent #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ session.close();
+
+ long pTime = endTime - startTime;
+ log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. ");
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ final QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+ QueueReceiver receiver = session.createReceiver(queue);
+
+ MessageListener listener =
+ new MessageListener()
+ {
+ long startTime = System.currentTimeMillis();
+ int i = 0;
+
+ /**
+ * #Description of the Method
+ *
+ * @param message Description of Parameter
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if( transacted == TRANS_INDIVIDUAL )
+ session.commit();
+ i++;
+ }
+ catch (JMSException e)
+ {
+ getLog().error("Unable to commit", e);
+ synchronized (this)
+ {
+ this.notify();
+ }
+ }
+ if (i >= iterationCount)
+ {
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. ");
+
+ synchronized (this)
+ {
+ this.notify();
+ }
+ }
+ }
+ };
+
+ getLog().debug(" Asynch Queue: This test will send " + getIterationCount() + " "
+ + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of "
+ + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+ + " Session is " + TRANS_DESC[transacted] + " transacted");
+ long startTime = System.currentTimeMillis();
+ sendThread.start();
+ receiver.setMessageListener(listener);
+ synchronized (listener)
+ {
+ queueConnection.start();
+ listener.wait();
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ session.close();
+ sendThread.join();
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. ");
+
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @param transacted Description of Parameter
+ * @param persistence Description of Parameter
+ * @exception Exception Description of Exception
+ */
+ public void runAsynchTopicPerformance(final int transacted, final int persistence) throws Exception
+ {
+ {
+ queueConnection.start();
+ drainQueue();
+ }
+
+ final int iterationCount = getIterationCount();
+ final Category log = getLog();
+
+ Thread sendThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JMSPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+
+ TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic)context.lookup(TEST_TOPIC);
+
+ TopicPublisher publisher = session.createPublisher(topic);
+
+ waitForSynchMessage();
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ publisher.publish(message, persistence, 4, 0);
+ //publisher.publish(topic, message, persistence, 4, 0);
+ //getLog().debug(" Sent #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ long endTime = System.currentTimeMillis();
+ session.close();
+
+ long pTime = endTime - startTime;
+ log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. ");
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ final TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic)context.lookup(TEST_TOPIC);
+ TopicSubscriber subscriber = session.createSubscriber(topic);
+
+ MessageListener listener =
+ new MessageListener()
+ {
+ long startTime = System.currentTimeMillis();
+ int i = 0;
+
+ /**
+ * #Description of the Method
+ *
+ * @param message Description of Parameter
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if( transacted == TRANS_INDIVIDUAL )
+ session.commit();
+ i++;
+ }
+ catch (JMSException e)
+ {
+ getLog().error("Unable to commit", e);
+ synchronized (this)
+ {
+ this.notify();
+ }
+ }
+ if (i >= iterationCount)
+ {
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. ");
+
+ synchronized (this)
+ {
+ this.notify();
+ }
+ }
+ }
+ };
+
+ getLog().debug(" Asynch Topic: This test will send " + getIterationCount() + " "
+ + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of "
+ + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+ + " Session is " + TRANS_DESC[transacted] + " transacted");
+ long startTime = System.currentTimeMillis();
+ sendThread.start();
+ subscriber.setMessageListener(listener);
+ sendSynchMessage();
+ synchronized (listener)
+ {
+ topicConnection.start();
+ listener.wait();
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ session.close();
+ sendThread.join();
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. ");
+
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @param transacted Description of Parameter
+ * @param persistence Description of Parameter
+ * @exception Exception Description of Exception
+ */
+ public void runSynchQueuePerformance(final int transacted, final int persistence) throws Exception
+ {
+ {
+ queueConnection.start();
+ drainQueue();
+ }
+ final int iterationCount = getIterationCount();
+ final Category log = getLog();
+
+ Thread sendThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JMSPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+ QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueSender sender = session.createSender(queue);
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ sender.send( message, persistence, 4, 0);
+ //sender.send(queue, message, persistence, 4, 0);
+ //getLog().debug(" Sent #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ session.close();
+
+ long endTime = System.currentTimeMillis();
+
+ long pTime = endTime - startTime;
+ log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. ");
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ Thread recvThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JMSPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+
+ QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueReceiver receiver = session.createReceiver(queue);
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ receiver.receive();
+ //getLog().debug(" Received #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ session.close();
+
+ long pTime = endTime - startTime;
+ log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. ");
+
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ getLog().debug(" Synch Queue: This test will send " + getIterationCount() + " "
+ + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of "
+ + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+ + " Session is " + TRANS_DESC[transacted] + " transacted");
+ long startTime = System.currentTimeMillis();
+ sendThread.start();
+ recvThread.start();
+ sendThread.join();
+ recvThread.join();
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. ");
+
+ }
+
+ /**
+ * #Description of the Method
+ *
+ * @param transacted Description of Parameter
+ * @param persistence Description of Parameter
+ * @exception Exception Description of Exception
+ */
+ public void runSynchTopicPerformance(final int transacted, final int persistence) throws Exception
+ {
+ {
+ queueConnection.start();
+ topicConnection.start();
+ drainQueue();
+ }
+ final int iterationCount = getIterationCount();
+ final Category log = getLog();
+
+ Thread sendThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JBossMQPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+
+ TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic)context.lookup(TEST_TOPIC);
+
+ TopicPublisher publisher = session.createPublisher(topic);
+
+ waitForSynchMessage();
+
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ publisher.publish(message, persistence, 4, 0);
+ //publisher.publish(topic, message, persistence, 4, 0);
+ //getLog().debug(" Sent #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ session.close();
+
+ long pTime = endTime - startTime;
+ log.debug(" sent all messages in " + ((double)pTime / 1000) + " seconds. ");
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ Thread recvThread =
+ new Thread()
+ {
+ /**
+ * Main processing method for the JBossMQPerfStressTestCase object
+ */
+ public void run()
+ {
+ try
+ {
+
+ TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = (Topic)context.lookup(TEST_TOPIC);
+ TopicSubscriber subscriber = session.createSubscriber(topic);
+
+ sendSynchMessage();
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < iterationCount; i++)
+ {
+ subscriber.receive();
+ //getLog().debug(" Received #"+i);
+ if (transacted == TRANS_INDIVIDUAL)
+ {
+ session.commit();
+ }
+ }
+
+ if (transacted == TRANS_TOTAL)
+ {
+ session.commit();
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ session.close();
+
+ long pTime = endTime - startTime;
+ log.debug(" received all messages in " + ((double)pTime / 1000) + " seconds. ");
+
+ }
+ catch (Exception e)
+ {
+ log.error("error", e);
+ }
+ }
+ };
+
+ getLog().debug(" Synch Topic: This test will send " + getIterationCount() + " "
+ + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of "
+ + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+ + " Session is " + TRANS_DESC[transacted] + " transacted");
+ long startTime = System.currentTimeMillis();
+ sendThread.start();
+ recvThread.start();
+ sendThread.join();
+ recvThread.join();
+ long endTime = System.currentTimeMillis();
+ long pTime = endTime - startTime;
+ getLog().debug(" All threads finished after: " + ((double)pTime / 1000) + " seconds. ");
+
+ }
+
+ /**
+ * A unit test for JUnit
+ *
+ * @exception Exception Description of Exception
+ */
+ public void testAsynchQueuePerformance() throws Exception
+ {
+
+ getLog().debug("Starting AsynchQueuePerformance test");
+
+ runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+ runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+ runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+ runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+ runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+ runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+ getLog().debug("AsynchQueuePerformance passed");
+ }
+
+ /**
+ * A unit test for JUnit
+ *
+ * @exception Exception Description of Exception
+ */
+ public void testAsynchTopicPerformance() throws Exception
+ {
+
+ getLog().debug("Starting AsynchTopicPerformance test");
+
+ runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+ runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+ runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+ runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+ runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+ runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+ getLog().debug("AsynchTopicPerformance passed");
+ }
+
+ /**
+ * A unit test for JUnit
+ *
+ * @exception Exception Description of Exception
+ */
+ public void testSynchQueuePerformance() throws Exception
+ {
+
+ getLog().debug("Starting SynchQueuePerformance test");
+
+ runSynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+ runSynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+ runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+ runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+ runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+ runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+ getLog().debug("SynchQueuePerformance passed");
+ }
+
+ /**
+ * A unit test for JUnit
+ *
+ * @exception Exception Description of Exception
+ */
+ public void testSynchTopicPerformance() throws Exception
+ {
+
+ getLog().debug("Starting SynchTopicPerformance test");
+
+ runSynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+ runSynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+ runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+ runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+ runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+ runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+ getLog().debug("SynchTopicPerformance passed");
+ }
+
+ /**
+ * The JUnit setup method
+ *
+ * @exception Exception Description of Exception
+ */
+ protected void setUp() throws Exception
+ {
+ if (context == null)
+ {
+
+ context = getInitialContext();
+
+ QueueConnectionFactory queueFactory = (QueueConnectionFactory)context.lookup(QUEUE_FACTORY);
+ queueConnection = queueFactory.createQueueConnection();
+
+ TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup(TOPIC_FACTORY);
+ topicConnection = topicFactory.createTopicConnection();
+
+ getLog().debug("Connection to JMS provider established.");
+ }
+
+ }
+
+
+ // Emptys out all the messages in a queue
+ private void drainQueue() throws Exception
+ {
+
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueReceiver receiver = session.createReceiver(queue);
+ Message message = receiver.receive(50);
+ int c = 0;
+ while (message != null)
+ {
+ message = receiver.receive(50);
+ c++;
+ }
+
+ if (c != 0)
+ {
+ getLog().debug(" Drained " + c + " messages from the queue");
+ }
+
+ session.close();
+
+ }
+
+ private void waitForSynchMessage() throws Exception
+ {
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueReceiver receiver = session.createReceiver(queue);
+ receiver.receive();
+ session.close();
+ }
+
+ private void sendSynchMessage() throws Exception
+ {
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueSender sender = session.createSender(queue);
+
+ Message message = session.createMessage();
+ sender.send(message);
+
+ session.close();
+ }
+
+ /**
+ * The main entry-point for the JMSPerfStressTestCase class
+ *
+ * @param args The command line arguments
+ */
+ public static void main(String[] args)
+ {
+
+ String newArgs[] = {"org.jboss.test.jbossmessaging.perf.JMSPerfStressTestCase"};
+ junit.swingui.TestRunner.main(newArgs);
+
+ }
+
+ public static junit.framework.Test suite() throws Exception
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ String resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ;
+
+ return getDeploySetup(JMSPerfStressTestCase.class,
+ loader.getResource(resourceName).toString());
+ }
+}
Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSReconnectStressTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSReconnectStressTestCase.java 2006-10-27 18:39:33 UTC (rev 57903)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSReconnectStressTestCase.java 2006-10-27 21:19:43 UTC (rev 57904)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.jbossmessaging.perf;
+
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.naming.InitialContext;
+
+import org.jboss.test.jbossmessaging.JMSTestCase;
+
+/**
+ * Reconnect stress
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author
+ * @version
+ */
+
+public class JMSReconnectStressTestCase extends JMSTestCase
+{
+ static String QUEUE_FACTORY = "ConnectionFactory";
+
+ public JMSReconnectStressTestCase(String name) throws Exception
+ {
+ super(name);
+ }
+
+ public void testReconnectStress() throws Throwable
+ {
+ InitialContext ctx = new InitialContext();
+ QueueConnectionFactory qcf = (QueueConnectionFactory) ctx.lookup(QUEUE_FACTORY);
+
+ ReconnectThread[] threads = new ReconnectThread[getThreadCount()];
+ for (int i = 0; i < threads.length; ++i)
+ threads[i] = new ReconnectThread(qcf, "Reconnect-"+i);
+ for (int i = 0; i < threads.length; ++i)
+ threads[i].start();
+ for (int i = 0; i < threads.length; ++i)
+ threads[i].join();
+ for (int i = 0; i < threads.length; ++i)
+ {
+ if (threads[i].error != null)
+ throw threads[i].error;
+ }
+ }
+
+ public class ReconnectThread extends Thread
+ {
+ public Throwable error;
+ public QueueConnectionFactory qcf;
+
+ public ReconnectThread(QueueConnectionFactory qcf, String name)
+ {
+ super(name);
+ this.qcf = qcf;
+ }
+
+ public void run()
+ {
+ QueueConnection c = null;
+ try
+ {
+ for (int i = 0; i < getIterationCount(); ++i)
+ {
+ log.info(Thread.currentThread() + " connect " + i);
+ c = qcf.createQueueConnection();
+ log.info(Thread.currentThread() + " close " + i);
+ c.close();
+ c = null;
+ }
+ }
+ catch (Throwable t)
+ {
+ if (c != null)
+ {
+ try
+ {
+ c.close();
+ }
+ catch (Throwable ignored)
+ {
+ log.warn("Ignored: ", ignored);
+ }
+ }
+ }
+ }
+ }
+}
Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/ReceiveNackClientStressTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/ReceiveNackClientStressTestCase.java 2006-10-27 18:39:33 UTC (rev 57903)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/ReceiveNackClientStressTestCase.java 2006-10-27 21:19:43 UTC (rev 57904)
@@ -0,0 +1,139 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.jbossmessaging.perf;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+
+import org.jboss.test.jbossmessaging.JMSTestCase;
+import org.jboss.util.NestedRuntimeException;
+
+/**
+ * A stress test for an impatient receiver
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author <a href="mailto:adrian at jboss.com">Adrian Brock</a>
+ * @version $Revision: 38400 $
+ */
+public class ReceiveNackClientStressTestCase extends JMSTestCase implements ExceptionListener
+{
+ protected QueueConnection queueConnection;
+
+ public ReceiveNackClientStressTestCase(String name) throws Exception
+ {
+ super(name);
+ }
+
+ public void onException(JMSException e)
+ {
+ log.error("Error: ", e);
+ try
+ {
+ queueConnection.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ private void drainQueue(String name) throws Exception
+ {
+ InitialContext context = getInitialContext() ;
+
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(name);
+
+ QueueReceiver receiver = session.createReceiver(queue);
+ queueConnection.start();
+ Message message = receiver.receive(50);
+ int c = 0;
+ while (message != null)
+ {
+ message = receiver.receive(50);
+ c++;
+ }
+
+ if (c != 0)
+ getLog().debug(" Drained " + c + " messages from the queue");
+ session.close();
+ queueConnection.stop();
+
+ }
+
+ public void testImpatient() throws Exception
+ {
+ int target = getIterationCount();
+ createQueue("Impatient");
+ drainQueue("Impatient") ;
+ try
+ {
+ InitialContext context = getInitialContext();
+ QueueConnectionFactory queueFactory = (QueueConnectionFactory) context.lookup("ConnectionFactory");
+ Queue queue = (Queue) context.lookup("Impatient");
+ queueConnection = queueFactory.createQueueConnection();
+ try
+ {
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueSender sender = session.createSender(queue);
+ QueueReceiver receiver = session.createReceiver(queue);
+ Serializable payload = new HashMap();
+ Message message = session.createObjectMessage(payload);
+ queueConnection.start();
+ int count = 0;
+ int sendCount = 0;
+ while (count < target)
+ {
+ if (sendCount <= target)
+ {
+ for (int i = 0; i < 10 && ++sendCount <= target; ++i)
+ sender.send(message);
+ }
+ if (receiver.receive(1) != null)
+ ++count;
+ }
+ }
+ finally
+ {
+ queueConnection.close();
+ }
+ }
+ finally
+ {
+ drainQueue("Impatient") ;
+ deleteQueue("Impatient");
+ }
+ }
+}
Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/SendReplyPerfStressTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/SendReplyPerfStressTestCase.java 2006-10-27 18:39:33 UTC (rev 57903)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/SendReplyPerfStressTestCase.java 2006-10-27 21:19:43 UTC (rev 57904)
@@ -0,0 +1,339 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.jbossmessaging.perf;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.Queue;
+import javax.naming.Context;
+
+import org.jboss.test.jbossmessaging.JMSTestCase;
+
+/**
+ * SendReplyPerfStressTestCase.java
+ * Some send/reply performance tests
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author
+ * @version
+ */
+public class SendReplyPerfStressTestCase extends JMSTestCase
+{
+ // Provider specific
+ static String TOPIC_FACTORY = "ConnectionFactory";
+ static String QUEUE_FACTORY = "ConnectionFactory";
+
+ static String TEST_QUEUE = "queue/testQueue";
+ static String TEST_TOPIC = "topic/testTopic";
+
+ static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10];
+
+ //JMSProviderAdapter providerAdapter;
+ static Context context;
+ static QueueConnection queueConnection;
+ static TopicConnection topicConnection;
+
+ public SendReplyPerfStressTestCase(String name) throws Exception
+ {
+ super(name);
+ }
+
+ /**
+ * The main entry-point for the SendReplyPerfStressTestCase class
+ *
+ * @param args The command line arguments
+ */
+ public static void main(String[] args)
+ {
+
+ String newArgs[] = {"org.jboss.test.jbossmessaging.perf.SendReplyPerfStressTestCase"};
+ junit.swingui.TestRunner.main(newArgs);
+ }
+
+ public static class State
+ {
+ public int expected;
+ public int finished = 0;
+ public ArrayList errors = new ArrayList();
+ public State(int expected)
+ {
+ this.expected = expected;
+ }
+ public synchronized void addError(Throwable t)
+ {
+ errors.add(t);
+ }
+ public synchronized void finished()
+ {
+ ++finished;
+ if (finished == expected)
+ notifyAll();
+ }
+ public synchronized void waitForFinish() throws Exception
+ {
+ if (finished == expected)
+ return;
+ wait();
+ }
+ }
+
+ public static class MessageQueueSender
+ implements Runnable
+ {
+ State state;
+ public MessageQueueSender(State state)
+ {
+ this.state = state;
+ }
+
+ public void run()
+ {
+ try
+ {
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue temp = session.createTemporaryQueue();
+ Message message = session.createTextMessage();
+ message.setJMSReplyTo(temp);
+
+ QueueSender sender = session.createSender(queue);
+ sender.send(message);
+
+ QueueReceiver receiver = session.createReceiver(temp);
+ receiver.receive();
+ receiver.close();
+ temp.delete();
+
+ session.close();
+ }
+ catch (Throwable t)
+ {
+ state.addError(t);
+ }
+ finally
+ {
+ state.finished();
+ }
+ }
+ }
+
+ public static class MessageTopicSender
+ implements Runnable
+ {
+ State state;
+ public MessageTopicSender(State state)
+ {
+ this.state = state;
+ }
+
+ public void run()
+ {
+ try
+ {
+ Topic topic = (Topic)context.lookup(TEST_TOPIC);
+ TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ Message message = session.createTextMessage();
+
+ QueueSession qsession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryQueue temp = qsession.createTemporaryQueue();
+ message.setJMSReplyTo(temp);
+
+ TopicPublisher publisher = session.createPublisher(topic);
+ publisher.publish(message);
+
+ QueueReceiver receiver = qsession.createReceiver(temp);
+ receiver.receive();
+ receiver.close();
+
+ session.close();
+ }
+ catch (Throwable t)
+ {
+ state.addError(t);
+ }
+ finally
+ {
+ state.finished();
+ }
+ }
+ }
+
+ public static class MessageReplier
+ implements MessageListener
+ {
+ State state;
+ public MessageReplier(State state)
+ {
+ this.state = state;
+ }
+ public void onMessage(Message message)
+ {
+ try
+ {
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue replyQueue = session.createQueue(((Queue)message.getJMSReplyTo()).getQueueName());
+ QueueSender sender = session.createSender(replyQueue);
+ sender.send(message);
+ sender.close();
+ session.close();
+ }
+ catch (Throwable t)
+ {
+ state.addError(t);
+ }
+ }
+ }
+
+ public void testSendReplyQueue() throws Exception
+ {
+ drainQueue();
+
+ // Set up the workers
+ State state = new State(getThreadCount());
+ MessageReplier replier = new MessageReplier(state);
+ Thread[] threads = new Thread[getThreadCount()];
+ for (int i = 0; i < threads.length; ++i)
+ threads[i] = new Thread(new MessageQueueSender(state));
+
+ // Register the message listener
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ QueueReceiver receiver = session.createReceiver(queue);
+ receiver.setMessageListener(replier);
+ queueConnection.start();
+
+ // Start the senders
+ for (int i = 0; i < threads.length; ++i)
+ threads[i].start();
+
+ // Wait for it to finish
+ state.waitForFinish();
+
+ // Report the result
+ for (Iterator i = state.errors.iterator(); i.hasNext();)
+ getLog().error("Error", (Throwable) i.next());
+ if (state.errors.size() > 0)
+ throw new RuntimeException("Test failed with " + state.errors.size() + " errors");
+ }
+
+ public void testSendReplyTopic() throws Exception
+ {
+ // Set up the workers
+ State state = new State(getThreadCount());
+ MessageReplier replier = new MessageReplier(state);
+
+ Thread[] threads = new Thread[getThreadCount()];
+ for (int i = 0; i < threads.length; ++i)
+ threads[i] = new Thread(new MessageTopicSender(state));
+
+
+ // Register the message listener
+ Topic topic = (Topic)context.lookup(TEST_TOPIC);
+ TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber subscriber = session.createSubscriber(topic);
+ subscriber.setMessageListener(replier);
+ topicConnection.start();
+ queueConnection.start();
+
+ // Start the senders
+ for (int i = 0; i < threads.length; ++i)
+ threads[i].start();
+
+ // Wait for it to finish
+ state.waitForFinish();
+
+ // Report the result
+ for (Iterator i = state.errors.iterator(); i.hasNext();)
+ getLog().error("Error", (Throwable) i.next());
+ if (state.errors.size() > 0)
+ throw new RuntimeException("Test failed with " + state.errors.size() + " errors");
+ }
+
+ protected void setUp() throws Exception
+ {
+ getLog().info("Starting test: " + getName());
+
+ context = getInitialContext();
+
+ QueueConnectionFactory queueFactory = (QueueConnectionFactory)context.lookup(QUEUE_FACTORY);
+ queueConnection = queueFactory.createQueueConnection();
+
+ TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup(TOPIC_FACTORY);
+ topicConnection = topicFactory.createTopicConnection();
+
+ getLog().debug("Connection to JMS provider established.");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ getLog().info("Ended test: " + getName());
+ queueConnection.close();
+ topicConnection.close();
+ }
+
+ private void drainQueue() throws Exception
+ {
+ QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+ QueueReceiver receiver = session.createReceiver(queue);
+ queueConnection.start();
+ Message message = receiver.receive(50);
+ int c = 0;
+ while (message != null)
+ {
+ message = receiver.receive(50);
+ c++;
+ }
+
+ if (c != 0)
+ getLog().debug(" Drained " + c + " messages from the queue");
+ session.close();
+ queueConnection.stop();
+
+ }
+
+ public static junit.framework.Test suite() throws Exception
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ String resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ;
+
+ return getDeploySetup(SendReplyPerfStressTestCase.class,
+ loader.getResource(resourceName).toString());
+ }
+
+}
More information about the jboss-cvs-commits
mailing list