Author: borges
Date: 2011-07-06 06:21:27 -0400 (Wed, 06 Jul 2011)
New Revision: 10931
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
clean up
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-06
10:20:45 UTC (rev 10930)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-06
10:21:27 UTC (rev 10931)
@@ -57,6 +57,7 @@
public class FailoverTest extends FailoverTestBase
{
private static final Logger log = Logger.getLogger(FailoverTest.class);
+ private static final int NUM_MESSAGES = 100;
private ServerLocator locator;
private ClientSessionFactoryInternal sf;
@@ -145,9 +146,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
-
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session.createMessage(true);
@@ -187,7 +186,7 @@
crash(session);
int retry = 0;
- while (received.size() >= numMessages)
+ while (received.size() >= NUM_MESSAGES)
{
Thread.sleep(1000);
retry++;
@@ -199,19 +198,13 @@
System.out.println("received.size() = " + received.size());
session.close();
- sf.close();
-
Assert.assertTrue(retry <= 5);
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testNonTransacted() throws Exception
{
-
createSessionFactory();
ClientSession session = createSession(sf, true, true);
@@ -220,50 +213,19 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < numMessages; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
private void createSessionFactory() throws Exception
@@ -348,11 +310,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
//
https://jira.jboss.org/jira/browse/HORNETQ-285
@@ -374,9 +332,9 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
- for (int i = 0; i < numMessages; i++)
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session.createMessage(true);
@@ -391,26 +349,11 @@
session.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testTransactedMessagesSentSoRollback() throws Exception
@@ -423,19 +366,8 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
Assert.assertTrue(session.isRollbackOnly());
@@ -461,11 +393,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
/**
@@ -482,19 +410,8 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
Assert.assertTrue(session.isRollbackOnly());
@@ -532,11 +449,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testTransactedMessagesNotSentSoNoRollback() throws Exception
@@ -549,19 +462,8 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.commit();
crash(session);
@@ -576,35 +478,15 @@
session.start();
- for (int i = 0; i < numMessages; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
Assert.assertNull(consumer.receiveImmediate());
session.commit();
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws
Exception
@@ -622,19 +504,8 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
// messages will be delivered to the consumer when the session is committed
session.commit();
@@ -652,35 +523,15 @@
session.start();
- for (int i = 0; i < numMessages; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
Assert.assertNull(consumer.receiveImmediate());
session.commit();
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testTransactedMessagesConsumedSoRollback() throws Exception
@@ -693,19 +544,8 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session1, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, false, false);
@@ -714,19 +554,8 @@
session2.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session2);
Assert.assertTrue(session2.isRollbackOnly());
@@ -746,11 +575,7 @@
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
@@ -763,9 +588,9 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
- for (int i = 0; i < numMessages; i++)
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session1.createMessage(true);
@@ -784,7 +609,7 @@
session2.start();
- for (int i = 0; i < numMessages / 2; i++)
+ for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -807,7 +632,7 @@
consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
- for (int i = numMessages / 2; i < numMessages; i++)
+ for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -828,11 +653,7 @@
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
@@ -847,21 +668,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessages(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
try
@@ -885,11 +697,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
@@ -904,21 +712,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessages(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
crash(session);
@@ -944,11 +743,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
// This might happen if 1PC optimisation kicks in
@@ -964,21 +759,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessages(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
crash(session);
@@ -1004,11 +790,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
@@ -1023,21 +805,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessages(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
@@ -1054,24 +827,8 @@
session.start(xid2, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
session.end(xid2, XAResource.TMSUCCESS);
session.prepare(xid2);
@@ -1080,11 +837,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
@@ -1097,19 +850,8 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session1, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1122,19 +864,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session2);
try
@@ -1152,11 +883,7 @@
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
@@ -1169,19 +896,8 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session1, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1194,19 +910,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session2.end(xid, XAResource.TMSUCCESS);
crash(session2);
@@ -1226,11 +931,7 @@
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
// 1PC optimisation
@@ -1243,19 +944,8 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session1, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1268,19 +958,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session2.end(xid, XAResource.TMSUCCESS);
// session2.prepare(xid);
@@ -1303,11 +982,7 @@
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testCreateNewFactoryAfterFailover() throws Exception
@@ -1331,11 +1006,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testFailoverMultipleSessionsWithConsumers() throws Exception
@@ -1372,9 +1043,9 @@
ClientProducer producer = sendSession.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
- for (int i = 0; i < numMessages; i++)
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = sendSession.createMessage(true);
@@ -1399,18 +1070,7 @@
{
for (ClientConsumer consumer : consumerList)
{
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
+ receiveMessages(consumer);
}
}
@@ -1421,11 +1081,7 @@
sendSession.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
/*
@@ -1440,24 +1096,13 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
session.start();
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -1470,31 +1115,22 @@
crash(session);
- for (int i = 0; i < numMessages; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
session.close();
- sf.close();
+ closeSessionFactory();
+ }
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private void sendMessages(ClientSession session, ClientProducer producer) throws
Exception, HornetQException
+ {
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage message = session.createMessage(isDurable(i));
+ setBody(i, message);
+ message.putIntProperty("counter", i);
+ producer.send(message);
+ }
}
public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception
@@ -1507,24 +1143,13 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -1539,31 +1164,33 @@
// Should get the same ones after failover since we didn't ack
- for (int i = 0; i < numMessages; i++)
+ receiveDurableMessages(consumer);
+
+ session.close();
+
+ closeSessionFactory();
+ }
+
+ private void receiveDurableMessages(ClientConsumer consumer) throws HornetQException
+ {
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
// Only the persistent messages will survive
- if (i % 2 == 0)
+ if (isDurable(i))
{
ClientMessage message = consumer.receive(1000);
-
Assert.assertNotNull(message);
-
assertMessageBody(i, message);
-
Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
message.acknowledge();
}
}
+ }
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private boolean isDurable(int i)
+ {
+ return i % 2 == 0;
}
public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception
@@ -1580,43 +1207,21 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
+ sendMessages(session, producer);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session);
// Send some more
- for (int i = numMessages; i < numMessages * 2; i++)
+ for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
{
- ClientMessage message = session.createMessage(i % 2 == 0);
+ ClientMessage message = session.createMessage(isDurable(i));
setBody(i, message);
@@ -1627,7 +1232,7 @@
// Should get the same ones after failover since we didn't ack
- for (int i = numMessages; i < numMessages * 2; i++)
+ for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -1642,11 +1247,19 @@
session.close();
- sf.close();
+ closeSessionFactory();
+ }
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private void receiveMessages(ClientConsumer consumer) throws HornetQException
+ {
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertMessageBody(i, message);
+ Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+ message.acknowledge();
+ }
}
public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
@@ -1690,45 +1303,19 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- final int numMessages = 100;
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
crash(session);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessages(session, producer);
- setBody(i, message);
+ receiveMessages(consumer);
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void _testForceBlockingReturn() throws Exception
@@ -1787,11 +1374,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
@@ -1807,13 +1390,13 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
- final int numMessages = 100;
+
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
String txID = "my-tx-id";
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session.createMessage(true);
@@ -1871,8 +1454,8 @@
Committer committer = new Committer();
- // Commit will occur, but response will never get back, connetion is failed, and
commit should be unblocked
- // with transaction rolled back
+ // Commit will occur, but response will never get back, connection is failed, and
commit
+ // should be unblocked with transaction rolled back
committer.start();
@@ -1896,7 +1479,7 @@
// We now try and resend the messages since we get a transaction rolled back
exception
// but the commit actually succeeded, duplicate detection should kick in and
prevent dups
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session2.createMessage(true);
@@ -1926,29 +1509,21 @@
session2.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
session2.close();
- sf.close();
+ closeSessionFactory();
+ }
+ private void closeSessionFactory()
+ {
+ sf.close();
Assert.assertEquals(0, sf.numSessions());
-
Assert.assertEquals(0, sf.numConnections());
}
@@ -1964,11 +1539,9 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
- final int numMessages = 100;
-
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session.createMessage(true);
@@ -2036,7 +1609,7 @@
// We now try and resend the messages since we get a transaction rolled back
exception
- for (int i = 0; i < numMessages; i++)
+ for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session2.createMessage(true);
@@ -2053,30 +1626,15 @@
session2.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
session2.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testBackupServerNotRemoved() throws Exception
@@ -2126,11 +1684,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testLiveAndBackupLiveComesBack() throws Exception
@@ -2182,11 +1736,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
@@ -2254,11 +1804,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
@@ -2326,11 +1872,7 @@
session.close();
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ closeSessionFactory();
}
// Package protected ---------------------------------------------