[jboss-cvs] JBoss Messaging SVN: r3763 - branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Feb 21 20:38:45 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-02-21 20:38:44 -0500 (Thu, 21 Feb 2008)
New Revision: 3763
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
Log:
Fixing test
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java 2008-02-22 01:38:22 UTC (rev 3762)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java 2008-02-22 01:38:44 UTC (rev 3763)
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -62,7 +63,6 @@
Object lockReader = new Object();
Object lockWriter = new Object();
- Object semaphore = new Object();
// Static ---------------------------------------------------------------------------------------
@@ -95,14 +95,15 @@
Connection conn = getConnection(new Connection[]{conn1, conn2, conn3}, 1);
conn.start();
+
+ CountDownLatch latch = new CountDownLatch(1);
ReceiveConsumerThread consumerThread = new ReceiveConsumerThread(
- conn.createSession(false,Session.AUTO_ACKNOWLEDGE),queue[1]);
+ conn.createSession(false,Session.AUTO_ACKNOWLEDGE),queue[1], latch);
consumerThread.start();
- // Just give some time to line up the thread on receive
- Thread.sleep(2000);
+ latch.await();
Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
@@ -320,7 +321,8 @@
}
- ArrayList threadList = new ArrayList();
+ ArrayList<Thread> threadList = new ArrayList<Thread>();
+ CountDownLatch latchStart = new CountDownLatch(consumerThread + producerThread + 1);
for (int i = 0; i < producerThread; i++)
{
@@ -334,7 +336,7 @@
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
threadList.add(new LocalThreadProducer(i, session , queue[1],
- transacted, 10 * (i+1), persistent));
+ transacted, 10 * (i+1), persistent, latchStart));
}
for (int i = 0; i < consumerThread; i++)
@@ -348,23 +350,19 @@
{
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
- threadList.add(new LocalThreadConsumer(i, session, queue[1], transacted, 20 * (i+1)));
+ threadList.add(new LocalThreadConsumer(i, session, queue[1], transacted, 20 * (i+1), latchStart));
}
- for (Iterator iter = threadList.iterator(); iter.hasNext();)
+ for (Iterator<Thread> iter = threadList.iterator(); iter.hasNext();)
{
- Thread t = (Thread) iter.next();
+ Thread t = iter.next();
t.start();
}
- Thread.sleep(2000); // time to everybody line up
- synchronized (semaphore)
- {
- started = true;
- semaphore.notifyAll();
- }
+ latchStart.countDown();
+ latchStart.await();
- Thread.sleep(10000); // 10 seconds generating / consuming messages
+ Thread.sleep(2000); // 2 seconds generating / consuming messages
log.info("Killing server 1");
@@ -392,7 +390,7 @@
}
}
- Thread.sleep(15000);
+ Thread.sleep(2000); // 2 seconds after
synchronized (lockWriter)
{
@@ -439,7 +437,7 @@
assertEquals(messageCounterProducer, messageCounterConsumer);
}
*/
-
+
// after kill... failover should kick and new messages arrive
assertTrue(messageCounterConsumer > consumedRightAfterKill);
assertTrue(messageCounterProducer > producedRightAfterKill);
@@ -447,11 +445,11 @@
}
finally
{
- conn1.close();
+ try { if (conn1 != null) conn1.close(); } catch (Exception ignored) {}
- conn2.close();
+ try { if (conn2 != null) conn2.close(); } catch (Exception ignored) {}
- conn3.close();
+ try { if (conn3 != null) conn3.close(); } catch (Exception ignored) {}
}
}
@@ -463,6 +461,21 @@
{
nodeCount = 3;
+ if (ServerManagement.getServer(0) != null)
+ {
+ this.removeAllMessages(queue[0].getQueueName(), true, 0);
+ }
+ if (ServerManagement.getServer(1) != null)
+ {
+ this.removeAllMessages(queue[1].getQueueName(), true, 1);
+ }
+ if (ServerManagement.getServer(2) != null)
+ {
+ this.removeAllMessages(queue[2].getQueueName(), true, 2);
+ }
+
+
+
super.setUp();
}
@@ -489,13 +502,15 @@
MessageConsumer consumer;
Session session;
+ CountDownLatch latch;
- public ReceiveConsumerThread(Session session, Destination destination)
+ public ReceiveConsumerThread(Session session, Destination destination, CountDownLatch latch)
throws Exception
{
super("Consumer Thread");
this.session = session;
consumer = session.createConsumer(destination);
+ this.latch = latch;
}
@@ -503,6 +518,7 @@
{
try
{
+ latch.countDown();
message = (TextMessage)consumer.receive();
if (message == null)
{
@@ -525,6 +541,7 @@
private final Logger log = Logger.getLogger(this.getClass());
int id;
+ CountDownLatch latchStart;
MessageConsumer consumer;
Session session;
boolean transacted;
@@ -532,7 +549,7 @@
public LocalThreadConsumer(int id, Session session, Destination destination,
boolean transacted,
- int commitInterval) throws Exception
+ int commitInterval, CountDownLatch latchStart) throws Exception
{
super("LocalThreadConsumer-" + id);
consumer = session.createConsumer(destination);
@@ -540,6 +557,7 @@
this.id = id;
this.transacted = transacted;
this.commitInterval = commitInterval;
+ this.latchStart = latchStart;
}
@@ -547,18 +565,14 @@
{
try
{
- synchronized (semaphore)
- {
- if (!started)
- {
- semaphore.wait();
- }
- }
+
+ latchStart.countDown();
+ latchStart.await();
int counter = 0;
while (true)
{
- Message message = consumer.receive(5000);
+ Message message = consumer.receive(1000);
if (message == null && shouldStop)
{
log.info("Finished execution of thread as shouldStop was true");
@@ -586,7 +600,7 @@
}
}
- if (transacted && !shouldStop)
+ if (transacted)
{
session.commit();
}
@@ -608,10 +622,11 @@
int id;
boolean transacted;
int commitInterval;
+ CountDownLatch latch;
public LocalThreadProducer(int id, Session session, Destination destination,
boolean transacted, int commitInterval,
- boolean persistent) throws Exception
+ boolean persistent, CountDownLatch latch) throws Exception
{
super("LocalThreadProducer-" + id);
this.session = session;
@@ -623,20 +638,16 @@
this.id = id;
this.transacted = transacted;
this.commitInterval = commitInterval;
+ this.latch = latch;
}
public void run()
{
try
{
- synchronized (semaphore)
- {
- if (!started)
- {
- semaphore.wait();
- }
- }
-
+ latch.countDown();
+ latch.await();
+
int counter = 0;
while (!shouldStop)
{
More information about the jboss-cvs-commits
mailing list