[jboss-cvs] JBoss Messaging SVN: r2248 - trunk/tests/src/org/jboss/test/messaging/jms/clustering.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 9 22:34:37 EST 2007


Author: clebert.suconic at jboss.com
Date: 2007-02-09 22:34:37 -0500 (Fri, 09 Feb 2007)
New Revision: 2248

Modified:
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-733 - Adding few other tests

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java	2007-02-10 03:23:13 UTC (rev 2247)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java	2007-02-10 03:34:37 UTC (rev 2248)
@@ -52,21 +52,16 @@
 
    // Constants ------------------------------------------------------------------------------------
 
-   int NUMBER_OF_PRODUCER_THREADS=1;
-   int NUMBER_OF_CONSUMER_THREADS=1;
-
    // Attributes -----------------------------------------------------------------------------------
    int messageCounterConsumer = 0;
    int messageCounterProducer = 0;
+   boolean started = false;
+   boolean shouldStop = false;
 
-
    Object lockReader = new Object();
    Object lockWriter = new Object();
    Object semaphore = new Object();
-   boolean started = false;
 
-   boolean shouldStop = false;
-
    // Static ---------------------------------------------------------------------------------------
 
    // Constructors ---------------------------------------------------------------------------------
@@ -252,8 +247,47 @@
     * This test will open several Consumers at the same Connection and it will kill the server,
     * expecting failover to happen inside the Valve
     */
-   public void testMultiThreadFailover() throws Exception
+   public void testMultiThreadFailoverSingleThread() throws Exception
    {
+      multiThreadFailover(1, 1, false, true);
+   }
+
+   public void testMultiThreadFailoverSingleThreadTransacted() throws Exception
+   {
+      multiThreadFailover(1, 1, true, true);
+   }
+
+   public void testMultiThreadFailoverSingleThreadNonPersistent() throws Exception
+   {
+      multiThreadFailover(1, 1, false, false);
+   }
+
+   public void testMultiThreadFailoverSeveralThreads() throws Exception
+   {
+      multiThreadFailover(5, 10, false, true);
+   }
+
+   public void testMultiThreadFailoverSeveralThreadsTransacted() throws Exception
+   {
+      multiThreadFailover(5, 10, true, true);
+   }
+
+   public void testMultiThreadFailoverNonPersistent() throws Exception
+   {
+      multiThreadFailover(5, 10, false, false);
+   }
+
+   // I kept this method on public area on purpose.. just to be easier to read the code
+   // As this is the real test being executed by test methods here.
+   private void   multiThreadFailover(int producerThread, int consumerThread, boolean transacted,
+                                    boolean persistent)
+      throws Exception
+   {
+      shouldStop = false;
+      started = false;
+      messageCounterConsumer = 0;
+      messageCounterProducer = 0;
+
       Connection conn1 = cf.createConnection();
       Connection conn2 = cf.createConnection();
       Connection conn3 = cf.createConnection();
@@ -282,16 +316,33 @@
 
          ArrayList threadList = new ArrayList();
 
-         for (int i = 0; i < NUMBER_OF_PRODUCER_THREADS; i++)
+         for (int i = 0; i < producerThread; i++)
          {
-            threadList.add(new LocalThreadProducer(i, conn.createSession(false,
-               Session.AUTO_ACKNOWLEDGE), queue[1]));
+            Session session;
+            if (transacted)
+            {
+               session = conn.createSession(true, Session.SESSION_TRANSACTED);
+            }
+            else
+            {
+               session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            }
+            threadList.add(new LocalThreadProducer(i, session , queue[1],
+                                  transacted, 10 * (i+1), persistent));
          }
 
-         for (int i = 0; i < NUMBER_OF_CONSUMER_THREADS; i++)
+         for (int i = 0; i < consumerThread; i++)
          {
-            threadList.add(new LocalThreadConsumer(i, conn.createSession(false,
-               Session.AUTO_ACKNOWLEDGE), queue[1]));
+            Session session;
+            if (transacted)
+            {
+               session = conn.createSession(true, Session.SESSION_TRANSACTED);
+            }
+            else
+            {
+               session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            }
+            threadList.add(new LocalThreadConsumer(i, session, queue[1], transacted, 20 * (i+1)));
          }
 
          for (Iterator iter = threadList.iterator(); iter.hasNext();)
@@ -300,32 +351,52 @@
             t.start();
          }
 
-         Thread.sleep(1000);
+         Thread.sleep(1000); // time to everybody line up
          synchronized (semaphore)
          {
             started = true;
             semaphore.notifyAll();
          }
 
-         Thread.sleep(30000);
+         Thread.sleep(10000); // 15 seconds generating / consuming messages
 
          log.info("Killing server 1");
 
-         ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed");
+         synchronized (lockWriter)
+         {
+            synchronized (lockReader)
+            {
+               log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" +
+                  messageCounterProducer);
+            }
+         }
 
-         ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed", 2);
 
-         log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" +
-            messageCounterProducer);
 
          ServerManagement.killAndWait(1);
 
-         Thread.sleep(50000);
+         int producedRightAfterKill;
+         int consumedRightAfterKill;
+         synchronized (lockWriter)
+         {
+            synchronized (lockReader)
+            {
+               producedRightAfterKill = messageCounterProducer;
+               consumedRightAfterKill = messageCounterConsumer;
+            }
+         }
 
-         log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" +
-            messageCounterProducer);
+         Thread.sleep(15000);
 
-         shouldStop = true;
+         synchronized (lockWriter)
+         {
+            synchronized (lockReader)
+            {
+               log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" +
+                  messageCounterProducer);
+               shouldStop = true;
+            }
+         }
 
          boolean failed = false;
 
@@ -351,8 +422,17 @@
          log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" +
             messageCounterProducer);
 
-         assertEquals(messageCounterProducer, messageCounterConsumer);
 
+         if (persistent)
+         {
+            // it only makes sense to test this on persistent messages
+            assertEquals(messageCounterProducer, messageCounterConsumer);
+         }
+
+         // after kill... failover should kick and new messages arrive
+         assertTrue(messageCounterConsumer > consumedRightAfterKill);
+         assertTrue(messageCounterProducer > producedRightAfterKill);
+
       }
       finally
       {
@@ -444,13 +524,19 @@
       int id;
       MessageConsumer consumer;
       Session session;
+      boolean transacted;
+      int commitInterval;
 
-      public LocalThreadConsumer(int id, Session session, Destination destination) throws Exception
+      public LocalThreadConsumer(int id, Session session, Destination destination,
+                                 boolean transacted,
+                                 int commitInterval) throws Exception
       {
          super("LocalThreadConsumer-" + id);
          consumer = session.createConsumer(destination);
          this.session = session;
          this.id = id;
+         this.transacted = transacted;
+         this.commitInterval = commitInterval;
       }
 
 
@@ -469,9 +555,10 @@
             int counter = 0;
             while (true)
             {
-               Message message = consumer.receive(5000);
+               Message message = consumer.receive(1000);
                if (message == null && shouldStop)
                {
+                  log.info("Finished execution of thread as shouldStop was true");
                   break;
                }
                if (message != null)
@@ -479,20 +566,27 @@
                   synchronized (lockReader)
                   {
                      messageCounterConsumer++;
-                     if (counter ++ % 10 == 0)
+                     if (counter ++ % 100 == 0)
                      {
                         log.info("Read = " + messageCounterConsumer);
                      }
                   }
                   log.trace("ReceiverID=" + id + " received message " + message);
-                  if (counter++ % 10 == 0)
+                  if (transacted)
                   {
-                     //log.info("Commit on id=" + id);
-                     //session.commit();
+                     if (counter % commitInterval == 0)
+                     {
+                        //log.info("Commit on id=" + id + " counter = " + counter);
+                        session.commit();
+                     }
                   }
                }
             }
-            //session.commit();
+
+            if (transacted)
+            {
+               session.commit();
+            }
          }
          catch (Exception e)
          {
@@ -509,14 +603,23 @@
       MessageProducer producer;
       Session session;
       int id;
+      boolean transacted;
+      int commitInterval;
 
-      public LocalThreadProducer(int id, Session session, Destination destination) throws Exception
+      public LocalThreadProducer(int id, Session session, Destination destination,
+                                 boolean transacted, int commitInterval,
+                                 boolean persistent) throws Exception
       {
          super("LocalThreadProducer-" + id);
          this.session = session;
          producer = session.createProducer(destination);
-         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         if (persistent)
+         {
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         }
          this.id = id;
+         this.transacted = transacted;
+         this.commitInterval = commitInterval;
       }
 
       public void run()
@@ -540,19 +643,27 @@
                synchronized (lockWriter)
                {
                   messageCounterProducer++;
-                  if (counter ++ % 10 == 0)
+                  if (counter ++ % 100 == 0)
                   {
                      log.info("Sent = " + messageCounterProducer);
                   }
                }
 
-               if (counter++ % 10 == 0)
+               if (transacted)
                {
-                  //log.info("Committing message");
-                  //session.commit();
+                  if (counter % commitInterval == 0)
+                  {
+                     //log.info("Commit on id=" + id + " counter = " + counter);
+                     session.commit();
+                  }
                }
             }
 
+            if (transacted)
+            {
+               session.commit();
+            }
+
          }
          catch (Exception e)
          {




More information about the jboss-cvs-commits mailing list