[jboss-cvs] JBoss Messaging SVN: r3763 - branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Feb 21 20:38:45 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-02-21 20:38:44 -0500 (Thu, 21 Feb 2008)
New Revision: 3763

Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
Log:
Fixing test

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java	2008-02-22 01:38:22 UTC (rev 3762)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java	2008-02-22 01:38:44 UTC (rev 3763)
@@ -24,6 +24,7 @@
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -62,7 +63,6 @@
 
    Object lockReader = new Object();
    Object lockWriter = new Object();
-   Object semaphore = new Object();
 
    // Static ---------------------------------------------------------------------------------------
 
@@ -95,14 +95,15 @@
       Connection conn = getConnection(new Connection[]{conn1, conn2, conn3}, 1);
 
       conn.start();
+      
+      CountDownLatch latch = new CountDownLatch(1);
 
       ReceiveConsumerThread consumerThread = new ReceiveConsumerThread(
-         conn.createSession(false,Session.AUTO_ACKNOWLEDGE),queue[1]);
+         conn.createSession(false,Session.AUTO_ACKNOWLEDGE),queue[1], latch);
 
       consumerThread.start();
 
-      // Just give some time to line up the thread on receive
-      Thread.sleep(2000);
+      latch.await();
 
       Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
 
@@ -320,7 +321,8 @@
 
          }
 
-         ArrayList threadList = new ArrayList();
+         ArrayList<Thread> threadList = new ArrayList<Thread>();
+         CountDownLatch latchStart  = new CountDownLatch(consumerThread + producerThread + 1);
 
          for (int i = 0; i < producerThread; i++)
          {
@@ -334,7 +336,7 @@
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             }
             threadList.add(new LocalThreadProducer(i, session , queue[1],
-                                  transacted, 10 * (i+1), persistent));
+                                  transacted, 10 * (i+1), persistent, latchStart));
          }
 
          for (int i = 0; i < consumerThread; i++)
@@ -348,23 +350,19 @@
             {
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
             }
-            threadList.add(new LocalThreadConsumer(i, session, queue[1], transacted, 20 * (i+1)));
+            threadList.add(new LocalThreadConsumer(i, session, queue[1], transacted, 20 * (i+1), latchStart));
          }
 
-         for (Iterator iter = threadList.iterator(); iter.hasNext();)
+         for (Iterator<Thread> iter = threadList.iterator(); iter.hasNext();)
          {
-            Thread t = (Thread) iter.next();
+            Thread t = iter.next();
             t.start();
          }
 
-         Thread.sleep(2000); // time to everybody line up
-         synchronized (semaphore)
-         {
-            started = true;
-            semaphore.notifyAll();
-         }
+         latchStart.countDown();
+         latchStart.await();
 
-         Thread.sleep(10000); // 10 seconds generating / consuming messages
+         Thread.sleep(2000); // 2 seconds generating / consuming messages
 
          log.info("Killing server 1");
 
@@ -392,7 +390,7 @@
             }
          }
 
-         Thread.sleep(15000);
+         Thread.sleep(2000); // 2 seconds after
 
          synchronized (lockWriter)
          {
@@ -439,7 +437,7 @@
             assertEquals(messageCounterProducer, messageCounterConsumer);
          }
          */
-
+         
          // after kill... failover should kick and new messages arrive
          assertTrue(messageCounterConsumer > consumedRightAfterKill);
          assertTrue(messageCounterProducer > producedRightAfterKill);
@@ -447,11 +445,11 @@
       }
       finally
       {
-         conn1.close();
+         try { if (conn1 != null) conn1.close(); } catch (Exception ignored) {}
 
-         conn2.close();
+         try { if (conn2 != null) conn2.close(); } catch (Exception ignored) {}
 
-         conn3.close();
+         try { if (conn3 != null) conn3.close(); } catch (Exception ignored) {}
       }
    }
 
@@ -463,6 +461,21 @@
    {
       nodeCount = 3;
 
+      if (ServerManagement.getServer(0) != null)
+      {      
+         this.removeAllMessages(queue[0].getQueueName(), true, 0);
+      }
+      if (ServerManagement.getServer(1) != null)
+      {
+         this.removeAllMessages(queue[1].getQueueName(), true, 1);
+      }
+      if (ServerManagement.getServer(2) != null)
+      {
+         this.removeAllMessages(queue[2].getQueueName(), true, 2);
+      }
+      
+      
+
       super.setUp();
    }
 
@@ -489,13 +502,15 @@
 
       MessageConsumer consumer;
       Session session;
+      CountDownLatch latch;
 
-      public ReceiveConsumerThread(Session session, Destination destination)
+      public ReceiveConsumerThread(Session session, Destination destination, CountDownLatch latch)
           throws Exception
       {
          super("Consumer Thread");
          this.session = session;
          consumer = session.createConsumer(destination);
+         this.latch = latch;
       }
 
 
@@ -503,6 +518,7 @@
       {
          try
          {
+            latch.countDown();
             message = (TextMessage)consumer.receive();
             if (message == null)
             {
@@ -525,6 +541,7 @@
       private final Logger log = Logger.getLogger(this.getClass());
 
       int id;
+      CountDownLatch latchStart;
       MessageConsumer consumer;
       Session session;
       boolean transacted;
@@ -532,7 +549,7 @@
 
       public LocalThreadConsumer(int id, Session session, Destination destination,
                                  boolean transacted,
-                                 int commitInterval) throws Exception
+                                 int commitInterval, CountDownLatch latchStart) throws Exception
       {
          super("LocalThreadConsumer-" + id);
          consumer = session.createConsumer(destination);
@@ -540,6 +557,7 @@
          this.id = id;
          this.transacted = transacted;
          this.commitInterval = commitInterval;
+         this.latchStart = latchStart;
       }
 
 
@@ -547,18 +565,14 @@
       {
          try
          {
-            synchronized (semaphore)
-            {
-               if (!started)
-               {
-                  semaphore.wait();
-               }
-            }
+            
+            latchStart.countDown();
+            latchStart.await();
 
             int counter = 0;
             while (true)
             {
-               Message message = consumer.receive(5000);
+               Message message = consumer.receive(1000);
                if (message == null && shouldStop)
                {
                   log.info("Finished execution of thread as shouldStop was true");
@@ -586,7 +600,7 @@
                }
             }
 
-            if (transacted && !shouldStop)
+            if (transacted)
             {
                session.commit();
             }
@@ -608,10 +622,11 @@
       int id;
       boolean transacted;
       int commitInterval;
+      CountDownLatch latch;
 
       public LocalThreadProducer(int id, Session session, Destination destination,
                                  boolean transacted, int commitInterval,
-                                 boolean persistent) throws Exception
+                                 boolean persistent, CountDownLatch latch) throws Exception
       {
          super("LocalThreadProducer-" + id);
          this.session = session;
@@ -623,20 +638,16 @@
          this.id = id;
          this.transacted = transacted;
          this.commitInterval = commitInterval;
+         this.latch = latch;
       }
 
       public void run()
       {
          try
          {
-            synchronized (semaphore)
-            {
-               if (!started)
-               {
-                  semaphore.wait();
-               }
-            }
-
+            latch.countDown();
+            latch.await();
+            
             int counter = 0;
             while (!shouldStop)
             {




More information about the jboss-cvs-commits mailing list