[jboss-cvs] JBossAS SVN: r57904 - in trunk/testsuite/src/main/org/jboss/test/jbossmessaging: . perf

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Oct 27 17:19:43 EDT 2006


Author: rachmatowicz at jboss.com
Date: 2006-10-27 17:19:43 -0400 (Fri, 27 Oct 2006)
New Revision: 57904

Added:
   trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/
   trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSPerfStressTestCase.java
   trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSReconnectStressTestCase.java
   trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/ReceiveNackClientStressTestCase.java
   trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/SendReplyPerfStressTestCase.java
Log:
More refactored generic JMS test cases

Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSPerfStressTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSPerfStressTestCase.java	2006-10-27 18:39:33 UTC (rev 57903)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSPerfStressTestCase.java	2006-10-27 21:19:43 UTC (rev 57904)
@@ -0,0 +1,797 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.jbossmessaging.perf;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.Queue;
+import javax.naming.Context;
+
+import org.apache.log4j.Category;
+import org.jboss.test.jbossmessaging.JMSTestCase;
+/**
+ * JMSPerfStressTestCase.java Some simple tests of JMS provider
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author
+ * @version
+ */
+
+public class JMSPerfStressTestCase extends JMSTestCase
+{
+
+   // Provider specific
+   static String TOPIC_FACTORY = "ConnectionFactory";
+   static String QUEUE_FACTORY = "ConnectionFactory";
+
+   static String TEST_QUEUE = "queue/testQueue";
+   static String TEST_TOPIC = "topic/testTopic";
+
+   //   static int PERFORMANCE_TEST_ITERATIONS = 1000;
+   static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10 * 1024];
+
+   static int TRANS_NONE = 0;
+   static int TRANS_INDIVIDUAL = 1;
+   static int TRANS_TOTAL = 2;
+   static String[] TRANS_DESC = {"NOT", "individually", "totally"};
+
+   //JMSProviderAdapter providerAdapter;
+   static Context context;
+   static QueueConnection queueConnection;
+   static TopicConnection topicConnection;
+
+   /**
+    * Constructor for the JMSPerfStressTestCase object
+    *
+    * @param name           Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public JMSPerfStressTestCase(String name) throws Exception
+   {
+      super(name);
+   }
+
+
+   /**
+    * #Description of the Method
+    *
+    * @param transacted     Description of Parameter
+    * @param persistence    Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public void runAsynchQueuePerformance(final int transacted, final int persistence) throws Exception
+   {
+      {
+         queueConnection.start();
+         drainQueue();
+         queueConnection.stop();
+      }
+      final int iterationCount = getIterationCount();
+      final Category log = getLog();
+
+      Thread sendThread =
+         new Thread()
+         {
+            /**
+             * Main processing method for the JBossMQPerfStressTestCase object
+             */
+            public void run()
+            {
+               try
+               {
+                  QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+                  Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+                  QueueSender sender = session.createSender(queue);
+
+                  BytesMessage message = session.createBytesMessage();
+                  message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+                  long startTime = System.currentTimeMillis();
+                  for (int i = 0; i < iterationCount; i++)
+                  {
+                     //sender.send(queue, message, persistence, 4, 0);
+                     sender.send(message, persistence, 4, 0);
+                     //getLog().debug("  Sent #"+i);
+                     if (transacted == TRANS_INDIVIDUAL)
+                     {
+                        session.commit();
+                     }
+                  }
+
+                  if (transacted == TRANS_TOTAL)
+                  {
+                     session.commit();
+                  }
+
+                  long endTime = System.currentTimeMillis();
+
+                  session.close();
+
+                  long pTime = endTime - startTime;
+                  log.debug("  sent all messages in " + ((double)pTime / 1000) + " seconds. ");
+               }
+               catch (Exception e)
+               {
+                  log.error("error", e);
+               }
+            }
+         };
+
+      final QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+      QueueReceiver receiver = session.createReceiver(queue);
+
+      MessageListener listener =
+         new MessageListener()
+         {
+            long startTime = System.currentTimeMillis();
+            int i = 0;
+
+            /**
+             * #Description of the Method
+             *
+             * @param message  Description of Parameter
+             */
+            public void onMessage(Message message)
+            {
+               try
+               {
+				if( transacted == TRANS_INDIVIDUAL )
+					session.commit();
+                  i++;
+               }
+               catch (JMSException e)
+               {
+                  getLog().error("Unable to commit", e);
+                  synchronized (this)
+                  {
+                     this.notify();
+                  }
+               }
+               if (i >= iterationCount)
+               {
+                  long endTime = System.currentTimeMillis();
+                  long pTime = endTime - startTime;
+                  log.debug("  received all messages in " + ((double)pTime / 1000) + " seconds. ");
+
+                  synchronized (this)
+                  {
+                     this.notify();
+                  }
+               }
+            }
+         };
+
+      getLog().debug("  Asynch Queue: This test will send " + getIterationCount() + " "
+             + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of "
+             + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+             + " Session is " + TRANS_DESC[transacted] + " transacted");
+      long startTime = System.currentTimeMillis();
+      sendThread.start();
+      receiver.setMessageListener(listener);
+      synchronized (listener)
+      {
+         queueConnection.start();
+         listener.wait();
+      }
+
+      if (transacted == TRANS_TOTAL)
+      {
+         session.commit();
+      }
+
+      session.close();
+      sendThread.join();
+      long endTime = System.currentTimeMillis();
+      long pTime = endTime - startTime;
+      getLog().debug("  All threads finished after: " + ((double)pTime / 1000) + " seconds. ");
+
+   }
+
+   /**
+    * #Description of the Method
+    *
+    * @param transacted     Description of Parameter
+    * @param persistence    Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public void runAsynchTopicPerformance(final int transacted, final int persistence) throws Exception
+   {
+      {
+         queueConnection.start();
+         drainQueue();
+      }
+
+      final int iterationCount = getIterationCount();
+      final Category log = getLog();
+
+      Thread sendThread =
+         new Thread()
+         {
+            /**
+             * Main processing method for the JMSPerfStressTestCase object
+             */
+            public void run()
+            {
+               try
+               {
+
+                  TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+                  Topic topic = (Topic)context.lookup(TEST_TOPIC);
+
+                  TopicPublisher publisher = session.createPublisher(topic);
+
+                  waitForSynchMessage();
+
+                  BytesMessage message = session.createBytesMessage();
+                  message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+                  long startTime = System.currentTimeMillis();
+                  for (int i = 0; i < iterationCount; i++)
+                  {
+                     publisher.publish(message, persistence, 4, 0);
+                     //publisher.publish(topic, message, persistence, 4, 0);
+                     //getLog().debug("  Sent #"+i);
+                     if (transacted == TRANS_INDIVIDUAL)
+                     {
+                        session.commit();
+                     }
+                  }
+
+                  if (transacted == TRANS_TOTAL)
+                  {
+                     session.commit();
+                  }
+
+                  long endTime = System.currentTimeMillis();
+                  session.close();
+
+                  long pTime = endTime - startTime;
+                  log.debug("  sent all messages in " + ((double)pTime / 1000) + " seconds. ");
+               }
+               catch (Exception e)
+               {
+                  log.error("error", e);
+               }
+            }
+         };
+
+      final TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+      Topic topic = (Topic)context.lookup(TEST_TOPIC);
+      TopicSubscriber subscriber = session.createSubscriber(topic);
+
+      MessageListener listener =
+         new MessageListener()
+         {
+            long startTime = System.currentTimeMillis();
+            int i = 0;
+
+            /**
+             * #Description of the Method
+             *
+             * @param message  Description of Parameter
+             */
+            public void onMessage(Message message)
+            {
+               try
+               {
+				if( transacted == TRANS_INDIVIDUAL )
+					session.commit();
+                  i++;
+               }
+               catch (JMSException e)
+               {
+                  getLog().error("Unable to commit", e);
+                  synchronized (this)
+                  {
+                     this.notify();
+                  }
+               }
+               if (i >= iterationCount)
+               {
+                  long endTime = System.currentTimeMillis();
+                  long pTime = endTime - startTime;
+                  log.debug("  received all messages in " + ((double)pTime / 1000) + " seconds. ");
+
+                  synchronized (this)
+                  {
+                     this.notify();
+                  }
+               }
+            }
+         };
+
+      getLog().debug("  Asynch Topic: This test will send " + getIterationCount() + " "
+             + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of "
+             + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+             + " Session is " + TRANS_DESC[transacted] + " transacted");
+      long startTime = System.currentTimeMillis();
+      sendThread.start();
+      subscriber.setMessageListener(listener);
+      sendSynchMessage();
+      synchronized (listener)
+      {
+         topicConnection.start();
+         listener.wait();
+      }
+
+      if (transacted == TRANS_TOTAL)
+      {
+         session.commit();
+      }
+
+      session.close();
+      sendThread.join();
+      long endTime = System.currentTimeMillis();
+      long pTime = endTime - startTime;
+      getLog().debug("  All threads finished after: " + ((double)pTime / 1000) + " seconds. ");
+
+   }
+
+   /**
+    * #Description of the Method
+    *
+    * @param transacted     Description of Parameter
+    * @param persistence    Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public void runSynchQueuePerformance(final int transacted, final int persistence) throws Exception
+   {
+      {
+         queueConnection.start();
+         drainQueue();
+      }
+      final int iterationCount = getIterationCount();
+      final Category log = getLog();
+
+      Thread sendThread =
+         new Thread()
+         {
+            /**
+             * Main processing method for the JMSPerfStressTestCase object
+             */
+            public void run()
+            {
+               try
+               {
+                  QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+                  Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+                  QueueSender sender = session.createSender(queue);
+
+                  BytesMessage message = session.createBytesMessage();
+                  message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+                  long startTime = System.currentTimeMillis();
+                  for (int i = 0; i < iterationCount; i++)
+                  {
+                     sender.send( message, persistence, 4, 0);
+                     //sender.send(queue, message, persistence, 4, 0);
+                     //getLog().debug("  Sent #"+i);
+                     if (transacted == TRANS_INDIVIDUAL)
+                     {
+                        session.commit();
+                     }
+                  }
+
+                  if (transacted == TRANS_TOTAL)
+                  {
+                     session.commit();
+                  }
+
+                  session.close();
+
+                  long endTime = System.currentTimeMillis();
+
+                  long pTime = endTime - startTime;
+                  log.debug("  sent all messages in " + ((double)pTime / 1000) + " seconds. ");
+               }
+               catch (Exception e)
+               {
+                  log.error("error", e);
+               }
+            }
+         };
+
+      Thread recvThread =
+         new Thread()
+         {
+            /**
+             * Main processing method for the JMSPerfStressTestCase object
+             */
+            public void run()
+            {
+               try
+               {
+
+                  QueueSession session = queueConnection.createQueueSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+                  Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+                  QueueReceiver receiver = session.createReceiver(queue);
+                  long startTime = System.currentTimeMillis();
+                  for (int i = 0; i < iterationCount; i++)
+                  {
+                     receiver.receive();
+                     //getLog().debug("  Received #"+i);
+                     if (transacted == TRANS_INDIVIDUAL)
+                     {
+                        session.commit();
+                     }
+                  }
+
+                  if (transacted == TRANS_TOTAL)
+                  {
+                     session.commit();
+                  }
+
+                  long endTime = System.currentTimeMillis();
+
+                  session.close();
+
+                  long pTime = endTime - startTime;
+                  log.debug("  received all messages in " + ((double)pTime / 1000) + " seconds. ");
+
+               }
+               catch (Exception e)
+               {
+                  log.error("error", e);
+               }
+            }
+         };
+
+      getLog().debug("  Synch Queue: This test will send " + getIterationCount() + " "
+             + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of "
+             + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+             + " Session is " + TRANS_DESC[transacted] + " transacted");
+      long startTime = System.currentTimeMillis();
+      sendThread.start();
+      recvThread.start();
+      sendThread.join();
+      recvThread.join();
+      long endTime = System.currentTimeMillis();
+      long pTime = endTime - startTime;
+      getLog().debug("  All threads finished after: " + ((double)pTime / 1000) + " seconds. ");
+
+   }
+
+   /**
+    * #Description of the Method
+    *
+    * @param transacted     Description of Parameter
+    * @param persistence    Description of Parameter
+    * @exception Exception  Description of Exception
+    */
+   public void runSynchTopicPerformance(final int transacted, final int persistence) throws Exception
+   {
+      {
+         queueConnection.start();
+         topicConnection.start();
+         drainQueue();
+      }
+      final int iterationCount = getIterationCount();
+      final Category log = getLog();
+
+      Thread sendThread =
+         new Thread()
+         {
+            /**
+             * Main processing method for the JBossMQPerfStressTestCase object
+             */
+            public void run()
+            {
+               try
+               {
+
+                  TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+                  Topic topic = (Topic)context.lookup(TEST_TOPIC);
+
+                  TopicPublisher publisher = session.createPublisher(topic);
+
+                  waitForSynchMessage();
+
+                  BytesMessage message = session.createBytesMessage();
+                  message.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
+
+                  long startTime = System.currentTimeMillis();
+                  for (int i = 0; i < iterationCount; i++)
+                  {
+                     publisher.publish(message, persistence, 4, 0);
+                     //publisher.publish(topic, message, persistence, 4, 0);
+                     //getLog().debug("  Sent #"+i);
+                     if (transacted == TRANS_INDIVIDUAL)
+                     {
+                        session.commit();
+                     }
+                  }
+
+                  if (transacted == TRANS_TOTAL)
+                  {
+                     session.commit();
+                  }
+
+                  long endTime = System.currentTimeMillis();
+
+                  session.close();
+
+                  long pTime = endTime - startTime;
+                  log.debug("  sent all messages in " + ((double)pTime / 1000) + " seconds. ");
+               }
+               catch (Exception e)
+               {
+                  log.error("error", e);
+               }
+            }
+         };
+
+      Thread recvThread =
+         new Thread()
+         {
+            /**
+             * Main processing method for the JBossMQPerfStressTestCase object
+             */
+            public void run()
+            {
+               try
+               {
+
+                  TopicSession session = topicConnection.createTopicSession(transacted != TRANS_NONE, Session.AUTO_ACKNOWLEDGE);
+                  Topic topic = (Topic)context.lookup(TEST_TOPIC);
+                  TopicSubscriber subscriber = session.createSubscriber(topic);
+
+                  sendSynchMessage();
+
+                  long startTime = System.currentTimeMillis();
+                  for (int i = 0; i < iterationCount; i++)
+                  {
+                     subscriber.receive();
+                     //getLog().debug("  Received #"+i);
+                     if (transacted == TRANS_INDIVIDUAL)
+                     {
+                        session.commit();
+                     }
+                  }
+
+                  if (transacted == TRANS_TOTAL)
+                  {
+                     session.commit();
+                  }
+
+                  long endTime = System.currentTimeMillis();
+
+                  session.close();
+
+                  long pTime = endTime - startTime;
+                  log.debug("  received all messages in " + ((double)pTime / 1000) + " seconds. ");
+
+               }
+               catch (Exception e)
+               {
+                  log.error("error", e);
+               }
+            }
+         };
+
+      getLog().debug("  Synch Topic: This test will send " + getIterationCount() + " "
+             + (persistence == DeliveryMode.PERSISTENT ? "persistent" : "non-persistent") + " messages. Each with a payload of "
+             + ((double)PERFORMANCE_TEST_DATA_PAYLOAD.length / 1024) + "Kb"
+             + " Session is " + TRANS_DESC[transacted] + " transacted");
+      long startTime = System.currentTimeMillis();
+      sendThread.start();
+      recvThread.start();
+      sendThread.join();
+      recvThread.join();
+      long endTime = System.currentTimeMillis();
+      long pTime = endTime - startTime;
+      getLog().debug("  All threads finished after: " + ((double)pTime / 1000) + " seconds. ");
+
+   }
+
+   /**
+    * A unit test for JUnit
+    *
+    * @exception Exception  Description of Exception
+    */
+   public void testAsynchQueuePerformance() throws Exception
+   {
+
+      getLog().debug("Starting AsynchQueuePerformance test");
+
+      runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+      runAsynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+      runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+      runAsynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+      runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+      runAsynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+      getLog().debug("AsynchQueuePerformance passed");
+   }
+
+   /**
+    * A unit test for JUnit
+    *
+    * @exception Exception  Description of Exception
+    */
+   public void testAsynchTopicPerformance() throws Exception
+   {
+
+      getLog().debug("Starting AsynchTopicPerformance test");
+
+      runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+      runAsynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+      runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+      runAsynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+      runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+      runAsynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+      getLog().debug("AsynchTopicPerformance passed");
+   }
+
+   /**
+    * A unit test for JUnit
+    *
+    * @exception Exception  Description of Exception
+    */
+   public void testSynchQueuePerformance() throws Exception
+   {
+
+      getLog().debug("Starting SynchQueuePerformance test");
+
+      runSynchQueuePerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+      runSynchQueuePerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+      runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+      runSynchQueuePerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+      runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+      runSynchQueuePerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+      getLog().debug("SynchQueuePerformance passed");
+   }
+
+   /**
+    * A unit test for JUnit
+    *
+    * @exception Exception  Description of Exception
+    */
+   public void testSynchTopicPerformance() throws Exception
+   {
+
+      getLog().debug("Starting SynchTopicPerformance test");
+
+      runSynchTopicPerformance(TRANS_NONE, DeliveryMode.NON_PERSISTENT);
+      runSynchTopicPerformance(TRANS_NONE, DeliveryMode.PERSISTENT);
+      runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.NON_PERSISTENT);
+      runSynchTopicPerformance(TRANS_INDIVIDUAL, DeliveryMode.PERSISTENT);
+      runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.NON_PERSISTENT);
+      runSynchTopicPerformance(TRANS_TOTAL, DeliveryMode.PERSISTENT);
+
+      getLog().debug("SynchTopicPerformance passed");
+   }
+
+   /**
+    * The JUnit setup method
+    *
+    * @exception Exception  Description of Exception
+    */
+   protected void setUp() throws Exception
+   {
+      if (context == null)
+      {
+
+         context = getInitialContext();
+
+         QueueConnectionFactory queueFactory = (QueueConnectionFactory)context.lookup(QUEUE_FACTORY);
+         queueConnection = queueFactory.createQueueConnection();
+
+         TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup(TOPIC_FACTORY);
+         topicConnection = topicFactory.createTopicConnection();
+
+         getLog().debug("Connection to JMS provider established.");
+      }
+
+   }
+
+
+   // Emptys out all the messages in a queue
+   private void drainQueue() throws Exception
+   {
+
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+      QueueReceiver receiver = session.createReceiver(queue);
+      Message message = receiver.receive(50);
+      int c = 0;
+      while (message != null)
+      {
+         message = receiver.receive(50);
+         c++;
+      }
+
+      if (c != 0)
+      {
+         getLog().debug("  Drained " + c + " messages from the queue");
+      }
+
+      session.close();
+
+   }
+
+   private void waitForSynchMessage() throws Exception
+   {
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+      QueueReceiver receiver = session.createReceiver(queue);
+      receiver.receive();
+      session.close();
+   }
+
+   private void sendSynchMessage() throws Exception
+   {
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+      QueueSender sender = session.createSender(queue);
+
+      Message message = session.createMessage();
+      sender.send(message);
+
+      session.close();
+   }
+
+   /**
+    * The main entry-point for the JMSPerfStressTestCase class
+    *
+    * @param args  The command line arguments
+    */
+   public static void main(String[] args)
+   {
+
+      String newArgs[] = {"org.jboss.test.jbossmessaging.perf.JMSPerfStressTestCase"};
+      junit.swingui.TestRunner.main(newArgs);
+
+   }
+
+   public static junit.framework.Test suite() throws Exception
+   {
+       ClassLoader loader = Thread.currentThread().getContextClassLoader();
+       String resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ;
+
+       return getDeploySetup(JMSPerfStressTestCase.class,
+               loader.getResource(resourceName).toString());
+   }
+}

Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSReconnectStressTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSReconnectStressTestCase.java	2006-10-27 18:39:33 UTC (rev 57903)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/JMSReconnectStressTestCase.java	2006-10-27 21:19:43 UTC (rev 57904)
@@ -0,0 +1,107 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.jbossmessaging.perf;
+
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.naming.InitialContext;
+
+import org.jboss.test.jbossmessaging.JMSTestCase;
+
+/**
+ * Reconnect stress
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author
+ * @version
+ */
+
+public class JMSReconnectStressTestCase extends JMSTestCase
+{
+   static String QUEUE_FACTORY = "ConnectionFactory";
+
+   public JMSReconnectStressTestCase(String name) throws Exception
+   {
+      super(name);
+   }
+
+   public void testReconnectStress() throws Throwable
+   {
+      InitialContext ctx = new InitialContext();
+      QueueConnectionFactory qcf = (QueueConnectionFactory) ctx.lookup(QUEUE_FACTORY);
+      
+      ReconnectThread[] threads = new ReconnectThread[getThreadCount()];
+      for (int i = 0; i < threads.length; ++i)
+         threads[i] = new ReconnectThread(qcf, "Reconnect-"+i);
+      for (int i = 0; i < threads.length; ++i)
+         threads[i].start();
+      for (int i = 0; i < threads.length; ++i)
+         threads[i].join();
+      for (int i = 0; i < threads.length; ++i)
+      {
+         if (threads[i].error != null)
+            throw threads[i].error;
+      }
+   }
+   
+   public class ReconnectThread extends Thread
+   {
+      public Throwable error;
+      public QueueConnectionFactory qcf;
+      
+      public ReconnectThread(QueueConnectionFactory qcf, String name)
+      {
+         super(name);
+         this.qcf = qcf;
+      }
+      
+      public void run()
+      {
+         QueueConnection c = null;
+         try
+         {
+            for (int i = 0; i < getIterationCount(); ++i)
+            {
+               log.info(Thread.currentThread() + " connect " + i);
+               c = qcf.createQueueConnection();
+               log.info(Thread.currentThread() + " close " + i);
+               c.close();
+               c = null;
+            }
+         }
+         catch (Throwable t)
+         {
+            if (c != null)
+            {
+               try
+               {
+                  c.close();
+               }
+               catch (Throwable ignored)
+               {
+                  log.warn("Ignored: ", ignored);
+               }
+            }
+         }
+      }
+   }
+}

Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/ReceiveNackClientStressTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/ReceiveNackClientStressTestCase.java	2006-10-27 18:39:33 UTC (rev 57903)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/ReceiveNackClientStressTestCase.java	2006-10-27 21:19:43 UTC (rev 57904)
@@ -0,0 +1,139 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.jbossmessaging.perf;
+
+import java.io.Serializable;
+import java.util.HashMap;
+
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+
+import org.jboss.test.jbossmessaging.JMSTestCase;
+import org.jboss.util.NestedRuntimeException;
+
+/**
+ * A stress test for an impatient receiver
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author    <a href="mailto:adrian at jboss.com">Adrian Brock</a>
+ * @version   $Revision: 38400 $
+ */
+public class ReceiveNackClientStressTestCase extends JMSTestCase implements ExceptionListener
+{
+   protected QueueConnection queueConnection;
+   
+   public ReceiveNackClientStressTestCase(String name) throws Exception
+   {
+      super(name);
+   }
+
+   public void onException(JMSException e)
+   {
+      log.error("Error: ", e);
+      try
+      {
+         queueConnection.close();
+      }
+      catch (Exception ignored)
+      {
+      }
+   }
+   
+    private void drainQueue(String name) throws Exception
+    {
+	InitialContext context = getInitialContext() ;
+
+	QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+	Queue queue = (Queue)context.lookup(name);
+
+	QueueReceiver receiver = session.createReceiver(queue);
+	queueConnection.start();
+	Message message = receiver.receive(50);
+	int c = 0;
+	while (message != null)
+	    {
+		message = receiver.receive(50);
+		c++;
+	    }
+
+	if (c != 0)
+	    getLog().debug("  Drained " + c + " messages from the queue");
+	session.close();
+	queueConnection.stop();
+
+    }
+
+   public void testImpatient() throws Exception
+   {
+      int target = getIterationCount();
+      createQueue("Impatient");
+      drainQueue("Impatient") ;
+      try
+      {
+         InitialContext context = getInitialContext();
+         QueueConnectionFactory queueFactory = (QueueConnectionFactory) context.lookup("ConnectionFactory");
+         Queue queue = (Queue) context.lookup("Impatient");
+         queueConnection = queueFactory.createQueueConnection();
+         try
+         {
+            QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+            QueueSender sender = session.createSender(queue);
+            QueueReceiver receiver = session.createReceiver(queue);
+            Serializable payload = new HashMap();
+            Message message = session.createObjectMessage(payload);
+            queueConnection.start();
+            int count = 0;
+            int sendCount = 0;
+            while (count < target)
+            {
+               if (sendCount <= target)
+               {
+                  for (int i = 0; i < 10 && ++sendCount <= target; ++i)
+                     sender.send(message);
+               }
+               if (receiver.receive(1) != null)
+                  ++count;
+            }
+         }
+         finally
+         {
+            queueConnection.close();
+         }
+      }
+      finally
+      {
+	 drainQueue("Impatient") ;
+         deleteQueue("Impatient");
+      }
+   }
+}

Added: trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/SendReplyPerfStressTestCase.java
===================================================================
--- trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/SendReplyPerfStressTestCase.java	2006-10-27 18:39:33 UTC (rev 57903)
+++ trunk/testsuite/src/main/org/jboss/test/jbossmessaging/perf/SendReplyPerfStressTestCase.java	2006-10-27 21:19:43 UTC (rev 57904)
@@ -0,0 +1,339 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.jbossmessaging.perf;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.jms.Queue;
+import javax.naming.Context;
+
+import org.jboss.test.jbossmessaging.JMSTestCase;
+
+/**
+ * SendReplyPerfStressTestCase.java
+ * Some send/reply performance tests
+ *
+ * @author <a href="mailto:richard.achmatowicz at jboss.com">Richard Achmatowicz</a>
+ * @author
+ * @version
+ */
+public class SendReplyPerfStressTestCase extends JMSTestCase
+{
+   // Provider specific
+   static String TOPIC_FACTORY = "ConnectionFactory";
+   static String QUEUE_FACTORY = "ConnectionFactory";
+
+   static String TEST_QUEUE = "queue/testQueue";
+   static String TEST_TOPIC = "topic/testTopic";
+
+   static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10];
+
+   //JMSProviderAdapter providerAdapter;
+   static Context context;
+   static QueueConnection queueConnection;
+   static TopicConnection topicConnection;
+
+   public SendReplyPerfStressTestCase(String name) throws Exception
+   {
+      super(name);
+   }
+
+   /**
+    * The main entry-point for the SendReplyPerfStressTestCase class
+    *
+    * @param args  The command line arguments
+    */
+   public static void main(String[] args)
+   {
+
+      String newArgs[] = {"org.jboss.test.jbossmessaging.perf.SendReplyPerfStressTestCase"};
+      junit.swingui.TestRunner.main(newArgs);
+   }
+
+   public static class State
+   {
+      public int expected;
+      public int finished = 0;
+      public ArrayList errors = new ArrayList();
+      public State(int expected)
+      {
+         this.expected = expected;
+      }
+      public synchronized void addError(Throwable t)
+      {
+         errors.add(t);
+      }
+      public synchronized void finished()
+      {
+         ++finished;
+         if (finished == expected)
+            notifyAll();
+      }
+      public synchronized void waitForFinish() throws Exception
+      {
+         if (finished == expected)
+            return;
+         wait();
+      }
+   }
+
+   public static class MessageQueueSender
+      implements Runnable
+   {
+      State state;
+      public MessageQueueSender(State state)
+      {
+         this.state = state;
+      }
+
+      public void run()
+      {
+         try
+         {
+            Queue queue = (Queue)context.lookup(TEST_QUEUE);
+            QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+            TemporaryQueue temp = session.createTemporaryQueue();
+            Message message = session.createTextMessage();
+            message.setJMSReplyTo(temp);
+
+            QueueSender sender = session.createSender(queue);
+            sender.send(message);
+
+            QueueReceiver receiver = session.createReceiver(temp);
+            receiver.receive();
+            receiver.close();
+            temp.delete();
+            
+            session.close();
+         }
+         catch (Throwable t)
+         {
+            state.addError(t);
+         }
+         finally
+         {
+            state.finished();
+         }
+      }
+   }
+
+   public static class MessageTopicSender
+      implements Runnable
+   {
+      State state;
+      public MessageTopicSender(State state)
+      {
+         this.state = state;
+      }
+
+      public void run()
+      {
+         try
+         {
+            Topic topic = (Topic)context.lookup(TEST_TOPIC);
+            TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+            Message message = session.createTextMessage();
+
+            QueueSession qsession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+            TemporaryQueue temp = qsession.createTemporaryQueue();
+            message.setJMSReplyTo(temp);
+
+            TopicPublisher publisher = session.createPublisher(topic);
+            publisher.publish(message);
+
+            QueueReceiver receiver = qsession.createReceiver(temp);
+            receiver.receive();
+            receiver.close();
+            
+            session.close();
+         }
+         catch (Throwable t)
+         {
+            state.addError(t);
+         }
+         finally
+         {
+            state.finished();
+         }
+      }
+   }
+
+   public static class MessageReplier
+      implements MessageListener
+   {
+      State state;
+      public MessageReplier(State state)
+      {
+         this.state = state;
+      }
+      public void onMessage(Message message)
+      {
+         try
+         {
+            QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue replyQueue = session.createQueue(((Queue)message.getJMSReplyTo()).getQueueName());
+            QueueSender sender = session.createSender(replyQueue);
+            sender.send(message);
+            sender.close();
+            session.close();
+         }
+         catch (Throwable t)
+         {
+            state.addError(t);
+         }
+      }
+   }
+
+   public void testSendReplyQueue() throws Exception
+   {
+      drainQueue();
+
+      // Set up the workers
+      State state = new State(getThreadCount());
+      MessageReplier replier = new MessageReplier(state);
+      Thread[] threads = new Thread[getThreadCount()];
+      for (int i = 0; i < threads.length; ++i)
+          threads[i] = new Thread(new MessageQueueSender(state));
+
+      // Register the message listener
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      QueueReceiver receiver = session.createReceiver(queue);
+      receiver.setMessageListener(replier);
+      queueConnection.start();
+
+      // Start the senders
+      for (int i = 0; i < threads.length; ++i)
+          threads[i].start();
+
+      // Wait for it to finish
+      state.waitForFinish();
+
+      // Report the result
+      for (Iterator i = state.errors.iterator(); i.hasNext();)
+         getLog().error("Error", (Throwable) i.next());
+      if (state.errors.size() > 0)
+         throw new RuntimeException("Test failed with " + state.errors.size() + " errors");
+   }
+
+   public void testSendReplyTopic() throws Exception
+   {
+      // Set up the workers
+      State state = new State(getThreadCount());
+      MessageReplier replier = new MessageReplier(state);
+
+      Thread[] threads = new Thread[getThreadCount()];
+      for (int i = 0; i < threads.length; ++i)
+          threads[i] = new Thread(new MessageTopicSender(state));
+
+
+      // Register the message listener
+      Topic topic = (Topic)context.lookup(TEST_TOPIC);
+      TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+      TopicSubscriber subscriber = session.createSubscriber(topic);
+      subscriber.setMessageListener(replier);
+      topicConnection.start();
+      queueConnection.start();
+
+      // Start the senders
+      for (int i = 0; i < threads.length; ++i)
+          threads[i].start();
+
+      // Wait for it to finish
+      state.waitForFinish();
+
+      // Report the result
+      for (Iterator i = state.errors.iterator(); i.hasNext();)
+         getLog().error("Error", (Throwable) i.next());
+      if (state.errors.size() > 0)
+         throw new RuntimeException("Test failed with " + state.errors.size() + " errors");
+   }
+
+   protected void setUp() throws Exception
+   {
+      getLog().info("Starting test: " + getName());
+
+      context = getInitialContext();
+
+      QueueConnectionFactory queueFactory = (QueueConnectionFactory)context.lookup(QUEUE_FACTORY);
+      queueConnection = queueFactory.createQueueConnection();
+
+      TopicConnectionFactory topicFactory = (TopicConnectionFactory)context.lookup(TOPIC_FACTORY);
+      topicConnection = topicFactory.createTopicConnection();
+
+      getLog().debug("Connection to JMS provider established.");
+   }
+
+   protected void tearDown() throws Exception
+   {
+      getLog().info("Ended test: " + getName());
+      queueConnection.close();
+      topicConnection.close();
+   }   
+
+   private void drainQueue() throws Exception
+   {
+      QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue queue = (Queue)context.lookup(TEST_QUEUE);
+
+      QueueReceiver receiver = session.createReceiver(queue);
+      queueConnection.start();
+      Message message = receiver.receive(50);
+      int c = 0;
+      while (message != null)
+      {
+         message = receiver.receive(50);
+         c++;
+      }
+
+      if (c != 0)
+         getLog().debug("  Drained " + c + " messages from the queue");
+      session.close();
+      queueConnection.stop();
+
+   }
+
+   public static junit.framework.Test suite() throws Exception
+   {
+       ClassLoader loader = Thread.currentThread().getContextClassLoader();
+       String resourceName = getJMSResourceRelativePathname("test-destinations-service.xml") ;
+
+       return getDeploySetup(SendReplyPerfStressTestCase.class,
+               loader.getResource(resourceName).toString());
+   }
+
+}




More information about the jboss-cvs-commits mailing list