[jboss-cvs] JBoss Messaging SVN: r2836 - in trunk/tests/src/org/jboss/test/messaging/jms/stress: clustering and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 4 07:47:45 EDT 2007
Author: timfox
Date: 2007-07-04 07:47:45 -0400 (Wed, 04 Jul 2007)
New Revision: 2836
Added:
trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/
trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/ClusteredTopicStressTest.java
Modified:
trunk/tests/src/org/jboss/test/messaging/jms/stress/ManyConnectionsStressTest.java
Log:
Added new stress test and update another
Modified: trunk/tests/src/org/jboss/test/messaging/jms/stress/ManyConnectionsStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/ManyConnectionsStressTest.java 2007-07-04 11:19:20 UTC (rev 2835)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/ManyConnectionsStressTest.java 2007-07-04 11:47:45 UTC (rev 2836)
@@ -47,7 +47,6 @@
private static final int NUM_MESSAGES = 100;
-
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -55,6 +54,9 @@
private InitialContext ic;
private volatile boolean failed;
+
+ private Set listeners = new HashSet();
+
// Constructors --------------------------------------------------
@@ -87,8 +89,6 @@
super.tearDown();
}
- private Set listeners = new HashSet();
-
public void testManyConnections() throws Exception
{
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
Added: trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/ClusteredTopicStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/ClusteredTopicStressTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/clustering/ClusteredTopicStressTest.java 2007-07-04 11:47:45 UTC (rev 2836)
@@ -0,0 +1,221 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.jms.stress.clustering;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.logging.Logger;
+import org.jboss.test.messaging.jms.clustering.ClusteringTestBase;
+
+
+public class ClusteredTopicStressTest extends ClusteringTestBase
+{
+ // Constants -----------------------------------------------------
+
+ private static Logger log = Logger.getLogger(ClusteredTopicStressTest.class);
+
+
+ // Static --------------------------------------------------------
+
+ private static final int NODE_COUNT = 10;
+
+ private static final int NUM_MESSAGES = 100000;
+
+ // Attributes ----------------------------------------------------
+
+ private Set listeners = new HashSet();
+
+ private volatile boolean failed;
+
+ // Constructors --------------------------------------------------
+
+ public ClusteredTopicStressTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ this.nodeCount = NODE_COUNT;
+
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testTopic() throws Throwable
+ {
+ Connection[] conns = new Connection[nodeCount];
+
+ Connection connSend = null;
+
+ try
+ {
+ for (int i = 0; i < nodeCount; i++)
+ {
+ conns[i] = cf.createConnection();
+ }
+
+ this.checkConnectionsDifferentServers(conns);
+
+ for (int i = 0; i < nodeCount; i++)
+ {
+ Session sess = conns[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess.createConsumer(topic[i]);
+
+ MyListener listener = new MyListener();
+
+ synchronized (listeners)
+ {
+ listeners.add(listener);
+ }
+
+ cons.setMessageListener(listener);
+
+ conns[i].start();
+
+ log.info("Created " + i);
+ }
+
+ connSend = cf.createConnection();
+
+ Session sessSend = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sessSend.createProducer(topic[0]);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sessSend.createTextMessage("message" + i);
+
+ tm.setIntProperty("count", i);
+
+ prod.send(tm);
+ }
+
+ long wait = 30000;
+
+ synchronized (listeners)
+ {
+ while (!listeners.isEmpty() && wait > 0)
+ {
+ long start = System.currentTimeMillis();
+ try
+ {
+ listeners.wait(wait);
+ }
+ catch (InterruptedException e)
+ {
+ //Ignore
+ }
+ wait -= (System.currentTimeMillis() - start);
+ }
+ }
+
+ if (wait <= 0)
+ {
+ fail("Timed out");
+ }
+
+ assertFalse(failed);
+ }
+ finally
+ {
+ for (int i = 0; i < nodeCount; i++)
+ {
+ try
+ {
+ if (conns[i] != null)
+ {
+ conns[i].close();
+ }
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to close connection", t);
+ }
+ }
+
+ if (connSend != null)
+ {
+ connSend.close();
+ }
+ }
+ }
+
+ private void finished(MyListener listener)
+ {
+ synchronized (listeners)
+ {
+ log.info("consumer " + listener + " has finished");
+
+ listeners.remove(listener);
+
+ listeners.notify();
+ }
+ }
+
+ private void failed(MyListener listener)
+ {
+ synchronized (listeners)
+ {
+ log.error("consumer " + listener + " has failed");
+
+ listeners.remove(listener);
+
+ failed = true;
+
+ listeners.notify();
+ }
+ }
+
+ private class MyListener implements MessageListener
+ {
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ int count = msg.getIntProperty("count");
+
+ if (count % 100 == 0)
+ {
+ log.info(this + " got message " + msg);
+ }
+
+ if (count == NUM_MESSAGES - 1)
+ {
+ finished(this);
+ }
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to get int property", e);
+
+ failed(this);
+ }
+ }
+
+ }
+
+}
+
+
More information about the jboss-cvs-commits
mailing list