[jboss-cvs] JBoss Messaging SVN: r3391 - branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/stress.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 30 18:32:48 EST 2007
Author: clebert.suconic at jboss.com
Date: 2007-11-30 18:32:47 -0500 (Fri, 30 Nov 2007)
New Revision: 3391
Modified:
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java
Log:
Improvements on test
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java 2007-11-30 17:55:48 UTC (rev 3390)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java 2007-11-30 23:32:47 UTC (rev 3391)
@@ -22,10 +22,11 @@
package org.jboss.test.messaging.jms.stress;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
-
+import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@@ -34,16 +35,17 @@
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
import javax.naming.Context;
import javax.naming.InitialContext;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import org.jboss.logging.Logger;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-
/**
* In order for this test to run, you will need to edit /etc/security/limits.conf and change your max sockets to something bigger than 1024
*
@@ -73,12 +75,15 @@
// Static ---------------------------------------------------------------------------------------
- protected static long PRODUCER_ALIVE_FOR=60000; // one minute
- protected static long CONSUMER_ALIVE_FOR=60000; // one minutes
- protected static long TEST_ALIVE_FOR=5 * 60 * 1000; // 5 minutes
- protected static int NUMBER_OF_PRODUCERS=100; // this should be set to 300 later
- protected static int NUMBER_OF_CONSUMERS=100; // this should be set to 300 later
+ protected static long PRODUCER_ALIVE_FOR=60000; // originalValue=60000
+ protected static long CONSUMER_ALIVE_FOR=60000; // originalValue=60000
+ protected static long TEST_ALIVE_FOR=5 * 60 * 1000; // originalValue = 5 * 60 * 1000
+ protected static int NUMBER_OF_PRODUCERS=50;
+ protected static int NUMBER_OF_CONSUMERS=50;
+ protected static ConcurrentHashMap<String, TextMessage> hashMessages = new ConcurrentHashMap<String, TextMessage>();
+
+
// a producer should have a long wait between each message sent?
protected static boolean LONG_WAIT_ON_PRODUCERS=false;
@@ -102,9 +107,6 @@
public void testQueue() throws Exception
{
- Context ctx = createContext();
-
-
HashSet threads = new HashSet();
// A chhanel of communication between workers and the test method
@@ -130,8 +132,8 @@
long timeToFinish = System.currentTimeMillis() + TEST_ALIVE_FOR;
- int numberOfProducers = NUMBER_OF_PRODUCERS;
- int numberOfConsumers = NUMBER_OF_CONSUMERS;
+ int numberOfProducers = NUMBER_OF_PRODUCERS +1;
+ int numberOfConsumers = NUMBER_OF_CONSUMERS +1;
while (threads.size()>0)
{
@@ -181,10 +183,22 @@
log.info("Produced:" + producedMessages.get() + " and Consumed:" + readMessages.get() + " messages");
- clearMessages();
+ clearMessages(true);
log.info("Produced:" + producedMessages.get() + " and Consumed:" + readMessages.get() + " messages");
+ if (hashMessages.size() != 0)
+ {
+ log.info("There are " + hashMessages.size() + " non processed messages");
+ // This is to validate if the testcase is valid.
+ assertEquals(producedMessages.get(), readMessages.get() + hashMessages.size());
+ }
+
+ for (TextMessage msgNonProcessed: hashMessages.values())
+ {
+ log.warn("Message non processed, txt=\"" + msgNonProcessed.getText() + "\", msgToString=[" + msgNonProcessed + "]");
+ }
+
assertEquals(producedMessages.get(), readMessages.get());
}
@@ -193,7 +207,7 @@
// Protected ------------------------------------------------------------------------------------
- protected void clearMessages() throws Exception
+ protected void clearMessages(boolean validate) throws Exception
{
Context ctx = createContext();
ConnectionFactory cf = (ConnectionFactory) ctx.lookup("/ClusteredConnectionFactory");
@@ -204,10 +218,18 @@
conn.start();
- while (consumer.receive(1000)!=null)
+
+ TextMessage msg = null;
+
+ while ((msg = (TextMessage)consumer.receive(1000)) != null)
{
- readMessages.increment();
log.info("Received JMS message on clearMessages");
+
+ if (validate)
+ {
+ readMessages.increment();
+ processReceiveOnMessage(msg);
+ }
}
conn.close();
@@ -229,11 +251,31 @@
ServerManagement.deployQueue("testQueue");
}
- clearMessages();
+ clearMessages(false);
+ hashMessages.clear();
producedMessages = new SynchronizedInt(0);
readMessages = new SynchronizedInt(0);
}
+
+ protected void processReceiveOnMessage(ArrayList<TextMessage> messageList)
+ throws JMSException
+ {
+ for (TextMessage tmpMsg: messageList)
+ {
+ processReceiveOnMessage(tmpMsg);
+ }
+ }
+
+ protected void processReceiveOnMessage(TextMessage tmpMsg)
+ throws JMSException
+ {
+ if (hashMessages.remove(tmpMsg.getText()) == null)
+ {
+ throw new IllegalStateException("Message txt=\"" + tmpMsg.getText() + "\"msg.toString()=[" + tmpMsg.toString() + "] wasn't found on hashMap. Receive duplicated!");
+ }
+ }
+
// Private --------------------------------------------------------------------------------------
// Inner classes --------------------------------------------------------------------------------
@@ -349,9 +391,15 @@
int messageSent=0;
while(System.currentTimeMillis() < timeToFinish)
{
- prod.send(sess.createTextMessage("Message sent at " + System.currentTimeMillis()));
+ messageSent++;
+
+ TextMessage msg = sess.createTextMessage("msg prod#" + getWorkerId() + " seq#" + messageSent);
+
+ // It has to be set on hashSets before sending, as sending can be faster than the hashmap
+ hashMessages.put(msg.getText(), msg);
+ prod.send(msg);
producedMessages.increment();
- messageSent++;
+
if (messageSent%50==0)
{
if (info) log.info("Sent " + messageSent + " Messages");
@@ -368,7 +416,6 @@
sleep(100);
}
}
- sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
}
finally
{
@@ -381,6 +428,14 @@
log.error(e, e);
setFailed(true, e);
}
+ finally
+ {
+ if (!isFailed())
+ {
+ sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
+ }
+ }
+
}
}
@@ -415,13 +470,17 @@
long timeToFinish = System.currentTimeMillis() + CONSUMER_ALIVE_FOR;
+ ArrayList<TextMessage> msgReceived = new ArrayList<TextMessage>();
+
try
{
while(System.currentTimeMillis() < timeToFinish)
{
- Message msg = consumer.receive(1000);
+ TextMessage msg = (TextMessage)consumer.receive(1000);
if (msg != null)
{
+ msgReceived.add(msg);
+
msgs ++;
if (msgs>=50)
{
@@ -431,12 +490,15 @@
if (info) log.info("Commit transaction");
sess.commit();
readMessages.add(msgs);
+
+ processReceiveOnMessage(msgReceived);
}
else
{
if (info) log.info("Rollback transaction");
sess.rollback();
}
+ msgReceived.clear();
msgs=0;
}
}
@@ -446,13 +508,16 @@
}
}
+ processReceiveOnMessage(msgReceived);
+ msgReceived.clear();
+
readMessages.add(msgs);
sess.commit();
- sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
}
finally
{
+ consumer.close();
conn.close();
}
@@ -462,7 +527,15 @@
log.error(e);
setFailed(true, e);
}
+ finally
+ {
+ if (!isFailed())
+ {
+ sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
+ }
+ }
}
+
}
// Objects used on the communication between Workers and the test
More information about the jboss-cvs-commits
mailing list