[jboss-cvs] JBoss Messaging SVN: r3391 - branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/stress.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 30 18:32:48 EST 2007


Author: clebert.suconic at jboss.com
Date: 2007-11-30 18:32:47 -0500 (Fri, 30 Nov 2007)
New Revision: 3391

Modified:
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java
Log:
Improvements on test

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java	2007-11-30 17:55:48 UTC (rev 3390)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/stress/SeveralClientsStressTest.java	2007-11-30 23:32:47 UTC (rev 3391)
@@ -22,10 +22,11 @@
 
 package org.jboss.test.messaging.jms.stress;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Random;
-
+import java.util.concurrent.ConcurrentHashMap;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
@@ -34,16 +35,17 @@
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.JMSException;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
 import org.jboss.logging.Logger;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
 
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
-
 /**
  * In order for this test to run, you will need to edit /etc/security/limits.conf and change your max sockets to something bigger than 1024
  *
@@ -73,12 +75,15 @@
 
    // Static ---------------------------------------------------------------------------------------
 
-   protected static long PRODUCER_ALIVE_FOR=60000; // one minute
-   protected static long CONSUMER_ALIVE_FOR=60000; // one minutes
-   protected static long TEST_ALIVE_FOR=5 * 60 * 1000; // 5 minutes
-   protected static int NUMBER_OF_PRODUCERS=100; // this should be set to 300 later
-   protected static int NUMBER_OF_CONSUMERS=100; // this should be set to 300 later
+   protected static long PRODUCER_ALIVE_FOR=60000; // originalValue=60000
+   protected static long CONSUMER_ALIVE_FOR=60000; // originalValue=60000
+   protected static long TEST_ALIVE_FOR=5 * 60 * 1000; // originalValue = 5 * 60 * 1000
+   protected static int NUMBER_OF_PRODUCERS=50;
+   protected static int NUMBER_OF_CONSUMERS=50;
 
+   protected static ConcurrentHashMap<String, TextMessage> hashMessages = new ConcurrentHashMap<String, TextMessage>(); 
+
+
    // a producer should have a long wait between each message sent?
    protected static boolean LONG_WAIT_ON_PRODUCERS=false;
 
@@ -102,9 +107,6 @@
 
    public void testQueue() throws Exception
    {
-      Context ctx = createContext();
-
-
       HashSet threads = new HashSet();
 
       // A chhanel of communication between workers and the test method
@@ -130,8 +132,8 @@
 
       long timeToFinish = System.currentTimeMillis() + TEST_ALIVE_FOR;
 
-      int numberOfProducers = NUMBER_OF_PRODUCERS;
-      int numberOfConsumers = NUMBER_OF_CONSUMERS;
+      int numberOfProducers = NUMBER_OF_PRODUCERS +1;
+      int numberOfConsumers = NUMBER_OF_CONSUMERS +1;
 
       while (threads.size()>0)
       {
@@ -181,10 +183,22 @@
 
       log.info("Produced:" + producedMessages.get() + " and Consumed:" + readMessages.get() + " messages");
 
-      clearMessages();
+      clearMessages(true);
 
       log.info("Produced:" + producedMessages.get() + " and Consumed:" + readMessages.get() + " messages");
 
+      if (hashMessages.size() != 0)
+      {
+         log.info("There are " + hashMessages.size() + " non processed messages");
+         // This is to validate if the testcase is valid.
+         assertEquals(producedMessages.get(), readMessages.get() + hashMessages.size());
+      }
+
+      for (TextMessage msgNonProcessed: hashMessages.values())
+      {
+         log.warn("Message non processed, txt=\"" + msgNonProcessed.getText() + "\", msgToString=[" + msgNonProcessed + "]");
+      }
+
       assertEquals(producedMessages.get(), readMessages.get());
    }
 
@@ -193,7 +207,7 @@
 
    // Protected ------------------------------------------------------------------------------------
 
-   protected void clearMessages() throws Exception
+   protected void clearMessages(boolean validate) throws Exception
    {
       Context ctx = createContext();
       ConnectionFactory cf = (ConnectionFactory) ctx.lookup("/ClusteredConnectionFactory");
@@ -204,10 +218,18 @@
 
       conn.start();
 
-      while (consumer.receive(1000)!=null)
+
+      TextMessage msg = null;
+
+      while ((msg = (TextMessage)consumer.receive(1000)) != null)
       {
-         readMessages.increment();
          log.info("Received JMS message on clearMessages");
+
+         if (validate)
+         {
+            readMessages.increment();
+            processReceiveOnMessage(msg);
+         }
       }
 
       conn.close();
@@ -229,11 +251,31 @@
          ServerManagement.deployQueue("testQueue");
       }
 
-      clearMessages();
+      clearMessages(false);
+      hashMessages.clear();
       producedMessages = new SynchronizedInt(0);
       readMessages = new SynchronizedInt(0);
    }
 
+
+   protected void processReceiveOnMessage(ArrayList<TextMessage> messageList)
+           throws JMSException
+   {
+      for (TextMessage tmpMsg: messageList)
+      {
+         processReceiveOnMessage(tmpMsg);
+      }
+   }
+
+   protected void processReceiveOnMessage(TextMessage tmpMsg)
+           throws JMSException
+   {
+      if (hashMessages.remove(tmpMsg.getText()) == null)
+      {
+         throw new IllegalStateException("Message txt=\"" + tmpMsg.getText() + "\"msg.toString()=[" + tmpMsg.toString() + "]  wasn't found on hashMap. Receive duplicated!");
+      }
+   }
+
    // Private --------------------------------------------------------------------------------------
 
    // Inner classes --------------------------------------------------------------------------------
@@ -349,9 +391,15 @@
                int messageSent=0;
                while(System.currentTimeMillis() < timeToFinish)
                {
-                  prod.send(sess.createTextMessage("Message sent at " + System.currentTimeMillis()));
+                  messageSent++;
+
+                  TextMessage msg = sess.createTextMessage("msg prod#" + getWorkerId() + " seq#" + messageSent);
+
+                  // It has to be set on hashSets before sending, as sending can be faster than the hashmap
+                  hashMessages.put(msg.getText(), msg);
+                  prod.send(msg);
                   producedMessages.increment();
-                  messageSent++;
+
                   if (messageSent%50==0)
                   {
                      if (info) log.info("Sent " + messageSent + " Messages");
@@ -368,7 +416,6 @@
                      sleep(100);
                   }
                }
-               sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
             }
             finally
             {
@@ -381,6 +428,14 @@
             log.error(e, e);
             setFailed(true, e);
          }
+         finally
+         {
+            if (!isFailed())
+            {
+               sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
+            }
+         }
+
       }
    }
 
@@ -415,13 +470,17 @@
 
             long timeToFinish = System.currentTimeMillis() + CONSUMER_ALIVE_FOR;
 
+            ArrayList<TextMessage> msgReceived = new ArrayList<TextMessage>();
+
             try
             {
                while(System.currentTimeMillis() < timeToFinish)
                {
-                  Message msg = consumer.receive(1000);
+                  TextMessage msg = (TextMessage)consumer.receive(1000);
                   if (msg != null)
                   {
+                     msgReceived.add(msg);
+
                      msgs ++;
                      if (msgs>=50)
                      {
@@ -431,12 +490,15 @@
                            if (info) log.info("Commit transaction");
                            sess.commit();
                            readMessages.add(msgs);
+
+                           processReceiveOnMessage(msgReceived);
                         }
                         else
                         {
                            if (info) log.info("Rollback transaction");
                            sess.rollback();
                         }
+                        msgReceived.clear();
                         msgs=0;
                      }
                   }
@@ -446,13 +508,16 @@
                   }
                }
 
+               processReceiveOnMessage(msgReceived);
+               msgReceived.clear();
+
                readMessages.add(msgs);
                sess.commit();
 
-               sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
             }
             finally
             {
+               consumer.close();
                conn.close();
             }
 
@@ -462,7 +527,15 @@
             log.error(e);
             setFailed(true, e);
          }
+         finally
+         {
+            if (!isFailed())
+            {
+               sendInternalMessage(new SeveralClientsStressTest.WorkedFinishedMessages(this));
+            }
+         }
       }
+
    }
 
    // Objects used on the communication between  Workers and the test




More information about the jboss-cvs-commits mailing list