[jboss-cvs] JBoss Messaging SVN: r2248 - trunk/tests/src/org/jboss/test/messaging/jms/clustering.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 9 22:34:37 EST 2007
Author: clebert.suconic at jboss.com
Date: 2007-02-09 22:34:37 -0500 (Fri, 09 Feb 2007)
New Revision: 2248
Modified:
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-733 - Adding few other tests
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java 2007-02-10 03:23:13 UTC (rev 2247)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultiThreadFailoverTest.java 2007-02-10 03:34:37 UTC (rev 2248)
@@ -52,21 +52,16 @@
// Constants ------------------------------------------------------------------------------------
- int NUMBER_OF_PRODUCER_THREADS=1;
- int NUMBER_OF_CONSUMER_THREADS=1;
-
// Attributes -----------------------------------------------------------------------------------
int messageCounterConsumer = 0;
int messageCounterProducer = 0;
+ boolean started = false;
+ boolean shouldStop = false;
-
Object lockReader = new Object();
Object lockWriter = new Object();
Object semaphore = new Object();
- boolean started = false;
- boolean shouldStop = false;
-
// Static ---------------------------------------------------------------------------------------
// Constructors ---------------------------------------------------------------------------------
@@ -252,8 +247,47 @@
* This test will open several Consumers at the same Connection and it will kill the server,
* expecting failover to happen inside the Valve
*/
- public void testMultiThreadFailover() throws Exception
+ public void testMultiThreadFailoverSingleThread() throws Exception
{
+ multiThreadFailover(1, 1, false, true);
+ }
+
+ public void testMultiThreadFailoverSingleThreadTransacted() throws Exception
+ {
+ multiThreadFailover(1, 1, true, true);
+ }
+
+ public void testMultiThreadFailoverSingleThreadNonPersistent() throws Exception
+ {
+ multiThreadFailover(1, 1, false, false);
+ }
+
+ public void testMultiThreadFailoverSeveralThreads() throws Exception
+ {
+ multiThreadFailover(5, 10, false, true);
+ }
+
+ public void testMultiThreadFailoverSeveralThreadsTransacted() throws Exception
+ {
+ multiThreadFailover(5, 10, true, true);
+ }
+
+ public void testMultiThreadFailoverNonPersistent() throws Exception
+ {
+ multiThreadFailover(5, 10, false, false);
+ }
+
+ // I kept this method on public area on purpose.. just to be easier to read the code
+ // As this is the real test being executed by test methods here.
+ private void multiThreadFailover(int producerThread, int consumerThread, boolean transacted,
+ boolean persistent)
+ throws Exception
+ {
+ shouldStop = false;
+ started = false;
+ messageCounterConsumer = 0;
+ messageCounterProducer = 0;
+
Connection conn1 = cf.createConnection();
Connection conn2 = cf.createConnection();
Connection conn3 = cf.createConnection();
@@ -282,16 +316,33 @@
ArrayList threadList = new ArrayList();
- for (int i = 0; i < NUMBER_OF_PRODUCER_THREADS; i++)
+ for (int i = 0; i < producerThread; i++)
{
- threadList.add(new LocalThreadProducer(i, conn.createSession(false,
- Session.AUTO_ACKNOWLEDGE), queue[1]));
+ Session session;
+ if (transacted)
+ {
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else
+ {
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ threadList.add(new LocalThreadProducer(i, session , queue[1],
+ transacted, 10 * (i+1), persistent));
}
- for (int i = 0; i < NUMBER_OF_CONSUMER_THREADS; i++)
+ for (int i = 0; i < consumerThread; i++)
{
- threadList.add(new LocalThreadConsumer(i, conn.createSession(false,
- Session.AUTO_ACKNOWLEDGE), queue[1]));
+ Session session;
+ if (transacted)
+ {
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else
+ {
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+ threadList.add(new LocalThreadConsumer(i, session, queue[1], transacted, 20 * (i+1)));
}
for (Iterator iter = threadList.iterator(); iter.hasNext();)
@@ -300,32 +351,52 @@
t.start();
}
- Thread.sleep(1000);
+ Thread.sleep(1000); // time to everybody line up
synchronized (semaphore)
{
started = true;
semaphore.notifyAll();
}
- Thread.sleep(30000);
+ Thread.sleep(10000); // 15 seconds generating / consuming messages
log.info("Killing server 1");
- ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed");
+ synchronized (lockWriter)
+ {
+ synchronized (lockReader)
+ {
+ log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" +
+ messageCounterProducer);
+ }
+ }
- ServerManagement.log(ServerManagement.INFO, "Server 1 will be killed", 2);
- log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" +
- messageCounterProducer);
ServerManagement.killAndWait(1);
- Thread.sleep(50000);
+ int producedRightAfterKill;
+ int consumedRightAfterKill;
+ synchronized (lockWriter)
+ {
+ synchronized (lockReader)
+ {
+ producedRightAfterKill = messageCounterProducer;
+ consumedRightAfterKill = messageCounterConsumer;
+ }
+ }
- log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" +
- messageCounterProducer);
+ Thread.sleep(15000);
- shouldStop = true;
+ synchronized (lockWriter)
+ {
+ synchronized (lockReader)
+ {
+ log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" +
+ messageCounterProducer);
+ shouldStop = true;
+ }
+ }
boolean failed = false;
@@ -351,8 +422,17 @@
log.info("messageCounterConsumer=" + messageCounterConsumer + ", messageCounterProducer=" +
messageCounterProducer);
- assertEquals(messageCounterProducer, messageCounterConsumer);
+ if (persistent)
+ {
+ // it only makes sense to test this on persistent messages
+ assertEquals(messageCounterProducer, messageCounterConsumer);
+ }
+
+ // after kill... failover should kick and new messages arrive
+ assertTrue(messageCounterConsumer > consumedRightAfterKill);
+ assertTrue(messageCounterProducer > producedRightAfterKill);
+
}
finally
{
@@ -444,13 +524,19 @@
int id;
MessageConsumer consumer;
Session session;
+ boolean transacted;
+ int commitInterval;
- public LocalThreadConsumer(int id, Session session, Destination destination) throws Exception
+ public LocalThreadConsumer(int id, Session session, Destination destination,
+ boolean transacted,
+ int commitInterval) throws Exception
{
super("LocalThreadConsumer-" + id);
consumer = session.createConsumer(destination);
this.session = session;
this.id = id;
+ this.transacted = transacted;
+ this.commitInterval = commitInterval;
}
@@ -469,9 +555,10 @@
int counter = 0;
while (true)
{
- Message message = consumer.receive(5000);
+ Message message = consumer.receive(1000);
if (message == null && shouldStop)
{
+ log.info("Finished execution of thread as shouldStop was true");
break;
}
if (message != null)
@@ -479,20 +566,27 @@
synchronized (lockReader)
{
messageCounterConsumer++;
- if (counter ++ % 10 == 0)
+ if (counter ++ % 100 == 0)
{
log.info("Read = " + messageCounterConsumer);
}
}
log.trace("ReceiverID=" + id + " received message " + message);
- if (counter++ % 10 == 0)
+ if (transacted)
{
- //log.info("Commit on id=" + id);
- //session.commit();
+ if (counter % commitInterval == 0)
+ {
+ //log.info("Commit on id=" + id + " counter = " + counter);
+ session.commit();
+ }
}
}
}
- //session.commit();
+
+ if (transacted)
+ {
+ session.commit();
+ }
}
catch (Exception e)
{
@@ -509,14 +603,23 @@
MessageProducer producer;
Session session;
int id;
+ boolean transacted;
+ int commitInterval;
- public LocalThreadProducer(int id, Session session, Destination destination) throws Exception
+ public LocalThreadProducer(int id, Session session, Destination destination,
+ boolean transacted, int commitInterval,
+ boolean persistent) throws Exception
{
super("LocalThreadProducer-" + id);
this.session = session;
producer = session.createProducer(destination);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ if (persistent)
+ {
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ }
this.id = id;
+ this.transacted = transacted;
+ this.commitInterval = commitInterval;
}
public void run()
@@ -540,19 +643,27 @@
synchronized (lockWriter)
{
messageCounterProducer++;
- if (counter ++ % 10 == 0)
+ if (counter ++ % 100 == 0)
{
log.info("Sent = " + messageCounterProducer);
}
}
- if (counter++ % 10 == 0)
+ if (transacted)
{
- //log.info("Committing message");
- //session.commit();
+ if (counter % commitInterval == 0)
+ {
+ //log.info("Commit on id=" + id + " counter = " + counter);
+ session.commit();
+ }
}
}
+ if (transacted)
+ {
+ session.commit();
+ }
+
}
catch (Exception e)
{
More information about the jboss-cvs-commits
mailing list