Author: borges
Date: 2012-02-01 11:50:09 -0500 (Wed, 01 Feb 2012)
New Revision: 12067
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Improve tearDown so as to avoid hanging tests.
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2012-02-01
16:49:50 UTC (rev 12066)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java 2012-02-01
16:50:09 UTC (rev 12067)
@@ -71,7 +71,7 @@
* A PagingTest
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
+ *
* Created Dec 5, 2008 8:25:58 PM
*
*
@@ -79,7 +79,8 @@
public class PagingTest extends ServiceTestBase
{
private ServerLocator locator;
-
+ private HornetQServer server;
+ private ClientSessionFactory sf;
static final int MESSAGE_SIZE = 1024; // 1k
public PagingTest(final String name)
@@ -126,8 +127,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
- config,
+ server =
+ createServer(true, config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
@@ -140,13 +141,13 @@
final int messagesPerTX = numberOfMessages / numberOfTX;
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -340,7 +341,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -353,7 +355,7 @@
final int numberOfMessages = 500;
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -361,7 +363,7 @@
locator.setProducerWindowSize(-1);
locator.setMinLargeMessageSize(1024 * 1024);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -427,7 +429,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -437,15 +440,13 @@
final int numberOfMessages = 1000;
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -544,17 +545,6 @@
Thread.sleep(100);
}
assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
}
@@ -570,7 +560,7 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -580,17 +570,15 @@
final int numberOfMessages = 1000;
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
- ClientSession session = sf.createSession(false, false, false);
+ ClientSession session = sf.createSession(false, false, false);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -778,18 +766,6 @@
sessionConsumer.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testMissingTXEverythingAcked() throws Exception
@@ -800,7 +776,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -816,13 +793,13 @@
try
{
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -927,7 +904,7 @@
server.stop();
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -966,10 +943,6 @@
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
sess.close();
-
- locator.close();
-
- server.stop();
}
public void testMissingTXEverythingAcked2() throws Exception
@@ -980,7 +953,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -996,13 +970,13 @@
try
{
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -1086,10 +1060,8 @@
server.start();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -1129,13 +1101,6 @@
{
Thread.sleep(100);
}
-
- locator.close();
- }
- finally
- {
- server.stop();
- }
}
public void testTwoQueuesOneNoRouting() throws Exception
@@ -1148,7 +1113,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1158,15 +1124,13 @@
final int numberOfMessages = 1000;
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -1235,22 +1199,6 @@
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testSendReceivePagingPersistent() throws Exception
@@ -1281,7 +1229,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1336,6 +1285,7 @@
this.queue = queue;
}
+ @Override
public void run()
{
try
@@ -1362,13 +1312,13 @@
try
{
{
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -1441,7 +1391,7 @@
tcount1.start();
tcount2.start();
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
final ClientSessionFactory sf2 = createSessionFactory(locator);
final AtomicInteger errors = new AtomicInteger(0);
@@ -1455,6 +1405,7 @@
threads[start - 1] = new Thread()
{
+ @Override
public void run()
{
try
@@ -1578,7 +1529,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1597,16 +1549,14 @@
bb.put(getSamplebyte(j));
}
- try
{
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -1652,13 +1602,14 @@
new HashMap<String, AddressSettings>());
server.start();
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
final ClientSessionFactory sf2 = createSessionFactory(locator);
final AtomicInteger errors = new AtomicInteger(0);
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -1734,20 +1685,8 @@
assertEquals(0,
server.getPostOffice().getPagingManager().getTransactions().size());
- }
- finally
- {
- try
- {
- server.stop();
}
- catch (Throwable ignored)
- {
- }
- }
- }
-
private void internaltestSendReceivePaging(final boolean persistentMessages) throws
Exception
{
@@ -1758,7 +1697,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1769,16 +1709,13 @@
final int numberOfIntegers = 256;
final int numberOfMessages = 1000;
+ locator = createInVMNonHALocator();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
-
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
@@ -1867,22 +1804,6 @@
consumer.close();
session.close();
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
private void assertBodiesEqual(final byte[] body, final HornetQBuffer buffer)
@@ -1900,7 +1821,7 @@
* - Consume the entire destination (not in page mode any more)
* - Add stuff to a transaction again
* - Check order
- *
+ *
*/
public void testDepageDuringTransaction() throws Exception
{
@@ -1908,7 +1829,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -1916,14 +1838,12 @@
server.start();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
@@ -2015,22 +1935,6 @@
consumer.close();
session.close();
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
/**
@@ -2039,9 +1943,9 @@
* - Consume the entire destination (not in page mode any more)
* - Add stuff to a transaction again
* - Check order
- *
+ *
* Test under discussion at :
http://community.jboss.org/thread/154061?tstart=0
- *
+ *
*/
public void testDepageDuringTransaction2() throws Exception
{
@@ -2050,7 +1954,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2058,14 +1963,12 @@
server.start();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
byte[] body = new byte[MESSAGE_SIZE];
@@ -2175,30 +2078,16 @@
session.close();
- sf.close();
-
- locator.close();
}
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
public void testDepageDuringTransaction3() throws Exception
{
clearData();
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2206,14 +2095,12 @@
server.start();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
byte[] body = new byte[MESSAGE_SIZE];
@@ -2304,23 +2191,7 @@
consumer.close();
- sessionNonTX.close();
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
+ sessionNonTX.close();
}
public void testDepageDuringTransaction4() throws Exception
@@ -2329,7 +2200,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2344,21 +2216,18 @@
final int numberOfMessages = 10000;
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(false);
+ sf = createSessionFactory(locator);
- try
- {
-
- final ClientSessionFactory sf = createSessionFactory(locator);
-
final byte[] body = new byte[MESSAGE_SIZE];
Thread producerThread = new Thread()
{
+ @Override
public void run()
{
ClientSession sessionProducer = null;
@@ -2439,17 +2308,6 @@
sf.close();
assertEquals(0, errors.get());
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
}
public void testOrderingNonTX() throws Exception
@@ -2458,7 +2316,8 @@
Configuration config = createDefaultConfig();
- final HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_SIZE * 2,
@@ -2473,12 +2332,10 @@
final int numberOfMessages = 2000;
- try
- {
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
final CountDownLatch ready = new CountDownLatch(1);
@@ -2486,6 +2343,7 @@
Thread producerThread = new Thread()
{
+ @Override
public void run()
{
ClientSession sessionProducer = null;
@@ -2568,18 +2426,6 @@
producerThread.join();
assertEquals(0, errors.get());
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testPageOnSchedulingNoRestart() throws Exception
@@ -2600,7 +2446,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2612,14 +2459,11 @@
final int numberOfBytes = 1024;
- try
- {
-
- locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2713,18 +2557,6 @@
consumer.close();
session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testRollbackOnSend() throws Exception
@@ -2733,7 +2565,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2745,14 +2578,11 @@
final int numberOfMessages = 10;
- try
- {
-
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, false, true, false,
0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2786,18 +2616,6 @@
Assert.assertNull(consumer.receiveImmediate());
session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testCommitOnSend() throws Exception
@@ -2806,7 +2624,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2818,14 +2637,11 @@
final int numberOfMessages = 500;
- try
- {
-
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, false, false, false,
0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2889,18 +2705,6 @@
}
session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testParialConsume() throws Exception
@@ -2909,7 +2713,8 @@
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -2919,14 +2724,11 @@
final int numberOfMessages = 1000;
- try
- {
-
- locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, false, false, false,
0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3019,19 +2821,7 @@
session.close();
}
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
public void testPageMultipleDestinations() throws Exception
{
internalTestPageMultipleDestinations(false);
@@ -3055,20 +2845,17 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 10 * 1024, settings);
+ server = createServer(true, config, 1024, 10 * 1024, settings);
server.start();
final int numberOfMessages = 1000;
- try
- {
-
- locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3169,19 +2956,6 @@
.getPagingManager()
.getPageStore(PagingTest.ADDRESS)
.getAddressSize());
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testDropMessagesExpiring() throws Exception
@@ -3197,18 +2971,15 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+ server = createServer(true, config, 1024, 1024 * 1024, settings);
server.start();
final int numberOfMessages = 30000;
- try
- {
-
locator.setAckBatchSize(0);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession();
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3269,18 +3040,6 @@
}
session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
private void internalTestPageMultipleDestinations(final boolean transacted) throws
Exception
@@ -3291,22 +3050,19 @@
int NUMBER_OF_MESSAGES = 2;
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
-
- try
- {
-
- locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(null, null, false, !transacted, true,
false, 0);
for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
@@ -3379,26 +3135,14 @@
Assert.assertEquals("Queue someQueue" + i + " was supposed to
be empty", 0, queue.getMessageCount());
Assert.assertEquals("Queue someQueue" + i + " was supposed to
be empty", 0, queue.getDeliveringCount());
}
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testSyncPage() throws Exception
{
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3498,7 +3242,8 @@
{
Configuration config = createDefaultConfig();
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3506,9 +3251,7 @@
server.start();
- try
- {
- server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
+ server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
final CountDownLatch pageUp = new CountDownLatch(0);
final CountDownLatch pageDone = new CountDownLatch(1);
@@ -3575,21 +3318,6 @@
assertTrue(pageUp.await(10, TimeUnit.SECONDS));
assertTrue(pageDone.await(10, TimeUnit.SECONDS));
-
- server.stop();
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
}
public void testPagingOneDestinationOnly() throws Exception
@@ -3609,13 +3337,11 @@
addresses.put(PAGED_ADDRESS.toString(), pagedDestination);
- HornetQServer server = createServer(true, configuration, -1, -1, addresses);
+ server = createServer(true, configuration, -1, -1, addresses);
- try
- {
- server.start();
+ server.start();
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, false);
@@ -3682,15 +3408,6 @@
Assert.assertNull(consumerPaged.receiveImmediate());
session.close();
-
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testPagingDifferentSizes() throws Exception
@@ -3718,14 +3435,11 @@
addresses.put(PAGED_ADDRESS_B.toString(), pagedDestinationB);
- HornetQServer server = createServer(true, configuration, -1, -1, addresses);
+ server = createServer(true, configuration, -1, -1, addresses);
+ server.start();
- try
- {
- server.start();
+ sf = createSessionFactory(locator);
- ClientSessionFactory sf = createSessionFactory(locator);
-
ClientSession session = sf.createSession(false, true, false);
session.createQueue(PAGED_ADDRESS_A, PAGED_ADDRESS_A, true);
@@ -3813,15 +3527,6 @@
consumerB.close();
session.close();
-
- }
- finally
- {
- if (server.isStarted())
- {
- server.stop();
- }
- }
}
public void testPageAndDepageRapidly() throws Exception
@@ -3835,7 +3540,7 @@
config.setJournalSyncNonTransactional(false);
config.setJournalFileSize(10 * 1024 * 1024);
- HornetQServer server = createServer(true, config, 512 * 1024, 1024 * 1024, new
HashMap<String, AddressSettings>());
+ server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String,
AddressSettings>());
server.start();
@@ -3843,15 +3548,13 @@
final int numberOfMessages = 200;
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(true, true);
@@ -3863,6 +3566,7 @@
Thread consumeThread = new Thread()
{
+ @Override
public void run()
{
ClientSession sessionConsumer = null;
@@ -3947,24 +3651,8 @@
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
assertEquals(1,
server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
}
- catch (Throwable ignored)
- {
- }
- }
- }
-
public void testTwoQueuesDifferentFilters() throws Exception
{
boolean persistentMessages = true;
@@ -3975,7 +3663,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -3984,11 +3673,8 @@
server.start();
final int numberOfMessages = 200;
+ locator = createInVMNonHALocator();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
-
locator.setClientFailureCheckPeriod(120000);
locator.setConnectionTTL(5000000);
locator.setCallTimeout(120000);
@@ -3997,7 +3683,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -4071,21 +3757,6 @@
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
}
public void testTwoQueues() throws Exception
@@ -4098,7 +3769,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -4107,11 +3779,8 @@
server.start();
final int numberOfMessages = 1000;
+ locator = createInVMNonHALocator();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
-
locator.setClientFailureCheckPeriod(120000);
locator.setConnectionTTL(5000000);
locator.setCallTimeout(120000);
@@ -4120,7 +3789,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -4191,21 +3860,6 @@
// It's async, so need to wait a bit for it happening
assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
- sf.close();
-
- locator.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
}
public void testTwoQueuesConsumeOneRestart() throws Exception
@@ -4218,7 +3872,8 @@
config.setJournalSyncNonTransactional(false);
- HornetQServer server = createServer(true,
+ server =
+ createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
@@ -4227,11 +3882,8 @@
server.start();
final int numberOfMessages = 1000;
+ locator = createInVMNonHALocator();
- try
- {
- ServerLocator locator = createInVMNonHALocator();
-
locator.setClientFailureCheckPeriod(120000);
locator.setConnectionTTL(5000000);
locator.setCallTimeout(120000);
@@ -4240,7 +3892,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -4298,9 +3950,9 @@
assertNull(consumer.receiveImmediate());
consumer.close();
-
+
long timeout = System.currentTimeMillis() + 10000;
-
+
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
// It's async, so need to wait a bit for it happening
@@ -4317,22 +3969,7 @@
server.stop();
server.start();
-
- sf.close();
-
- locator.close();
}
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
public void testDLAOnLargeMessageAndPaging() throws Exception
{
@@ -4349,12 +3986,10 @@
dla.setDeadLetterAddress(new SimpleString("DLA"));
settings.put(ADDRESS.toString(), dla);
- final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX, settings);
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX,
settings);
server.start();
- ServerLocator locator = null;
- ClientSessionFactory sf = null;
ClientSession session = null;
try
{
@@ -4564,15 +4199,6 @@
finally
{
session.close();
- sf.close();
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
}
}
@@ -4592,21 +4218,19 @@
dla.setExpiryAddress(new SimpleString("DLA"));
settings.put(ADDRESS.toString(), dla);
- final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX, settings);
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX,
settings);
server.start();
final int MESSAGE_SIZE = 20;
- try
- {
- ServerLocator locator = createInVMNonHALocator();
+ locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
- ClientSessionFactory sf = createSessionFactory(locator);
+ sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, false, false);
@@ -4718,19 +4342,7 @@
assertFalse(pgStoreAddress.isPaging());
- session.close();
- }
- finally
- {
- locator.close();
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
+ session.close();
}
// Package protected ---------------------------------------------
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2012-02-01
16:49:50 UTC (rev 12066)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2012-02-01
16:50:09 UTC (rev 12067)
@@ -46,6 +46,7 @@
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.CountDownSessionFailureListener;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.TransportConfigurationUtils;
@@ -94,7 +95,7 @@
}
protected ClientSession createSession(ClientSessionFactory sf,
- boolean autoCommitSends,
+ boolean autoCommitSends,
boolean autoCommitAcks,
int ackBatchSize) throws Exception
{
@@ -116,7 +117,7 @@
boolean autoCommitSends,
boolean autoCommitAcks) throws Exception
{
- return sf.createSession(xa, autoCommitSends, autoCommitAcks);
+ return addClientSession(sf.createSession(xa, autoCommitSends, autoCommitAcks));
}
//
https://issues.jboss.org/browse/HORNETQ-685
@@ -263,7 +264,7 @@
session.close();
}
-
+
public void testTimeoutOnFailoverConsumeBlocked() throws Exception
{
locator.setCallTimeout(5000);
@@ -303,6 +304,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
ClientMessage message = null;
@@ -333,7 +335,7 @@
{
endLatch.countDown();
}
-
+
if (message.getBooleanProperty("end"))
{
break;
@@ -346,7 +348,7 @@
}
}
-
+
private ClientMessage getMessage()
{
while (true)
@@ -482,7 +484,7 @@
locator.setAckBatchSize(0);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
ClientSession session = createSession(sf, true, true);
@@ -490,8 +492,6 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session.createMessage(true);
@@ -544,68 +544,35 @@
System.out.println("received.size() = " + received.size());
session.close();
- sf.close();
-
Assert.assertTrue(retry <= 5);
+ }
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private void createClientSessionFactory() throws Exception
+ {
+ sf = (ClientSessionFactoryInternal)createSessionFactory(locator);
}
public void testNonTransacted() throws Exception
{
- ClientSessionFactoryInternal sf;
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
session.close();
sf.close();
@@ -615,14 +582,19 @@
Assert.assertEquals(0, sf.numConnections());
}
- public void testConsumeTransacted() throws Exception
+ private void createSessionFactory() throws Exception
{
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ }
+ public void testConsumeTransacted() throws Exception
+ {
+ createSessionFactory();
+
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
@@ -631,17 +603,8 @@
final int numMessages = 10;
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createMessage(true);
+ sendMessages(session, producer, numMessages);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.commit();
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -651,6 +614,7 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = consumer.receive(1000);
+ assertNotNull("Just crashed? " + (i == 6) + " " + i,
message);
message.acknowledge();
@@ -683,7 +647,7 @@
{
ClientMessage message = consumer.receive(1000);
- assertNotNull(message);
+ assertNotNull("Expecting message #" + i, message);
message.acknowledge();
}
@@ -691,12 +655,6 @@
session.commit();
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
//
https://jira.jboss.org/jira/browse/HORNETQ-285
@@ -707,7 +665,7 @@
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
// Crash live server
crash();
@@ -718,72 +676,30 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(true);
+ sendMessages(session, producer, NUM_MESSAGES);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesSentSoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
Assert.assertTrue(session.isRollbackOnly());
@@ -805,15 +721,9 @@
ClientMessage message = consumer.receiveImmediate();
- Assert.assertNull(message);
+ Assert.assertNull("message should be null! Was: " + message, message);
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
/**
@@ -822,31 +732,16 @@
*/
public void testTransactedMessagesSentSoRollbackAndContinueWork() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
Assert.assertTrue(session.isRollbackOnly());
@@ -879,45 +774,24 @@
message = consumer.receiveImmediate();
- Assert.assertNotNull(message);
+ Assert.assertNotNull("expecting a message", message);
Assert.assertEquals(counter,
message.getIntProperty("counter").intValue());
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesNotSentSoNoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.commit();
crash(session);
@@ -932,45 +806,19 @@
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
Assert.assertNull(consumer.receiveImmediate());
session.commit();
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws
Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
@@ -982,19 +830,8 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
// messages will be delivered to the consumer when the session is committed
session.commit();
@@ -1012,64 +849,25 @@
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
Assert.assertNull(consumer.receiveImmediate());
session.commit();
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesConsumedSoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session1, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, false, false);
@@ -1078,19 +876,8 @@
session2.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session2);
Assert.assertTrue(session2.isRollbackOnly());
@@ -1105,34 +892,20 @@
{
Assert.assertEquals(HornetQException.TRANSACTION_ROLLED_BACK, e.getCode());
}
-
- session1.close();
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
-
+
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = session1.createMessage(true);
@@ -1179,7 +952,7 @@
{
ClientMessage message = consumer.receive(1000);
- Assert.assertNotNull(message);
+ Assert.assertNotNull("expecting message " + i, message);
assertMessageBody(i, message);
@@ -1191,26 +964,12 @@
session2.commit();
Assert.assertNull(consumer.receiveImmediate());
-
- session1.close();
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testXAMessagesSentSoRollbackOnEnd() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session = createSession(sf, true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512,
"auhsduashd".getBytes());
@@ -1219,21 +978,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessagesSomeDurable(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
crash(session);
try
@@ -1254,47 +1004,26 @@
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ final ClientSession session = createSession(sf, true, false, false);
- ClientSession session = createSession(sf, true, false, false);
-
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512,
"auhsduashd".getBytes());
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
- ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessagesSomeDurable(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
crash(session);
@@ -1308,6 +1037,7 @@
catch (XAException e)
{
Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
+ // XXXX session.rollback();
}
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -1318,24 +1048,15 @@
Assert.assertNull(message);
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ producer.close();
+ consumer.close();
}
// This might happen if 1PC optimisation kicks in
public void testXAMessagesSentSoRollbackOnCommit() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session = createSession(sf, true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512,
"auhsduashd".getBytes());
@@ -1344,21 +1065,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessagesSomeDurable(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
crash(session);
@@ -1381,24 +1093,12 @@
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session = createSession(sf, true, false, false);
Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512,
"auhsduashd".getBytes());
@@ -1407,21 +1107,12 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+
session.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessagesSomeDurable(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
@@ -1438,66 +1129,27 @@
session.start(xid2, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- // Only the persistent messages will survive
+ receiveDurableMessages(consumer);
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
- }
-
session.end(xid2, XAResource.TMSUCCESS);
session.prepare(xid2);
session.commit(xid2, false);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session1, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1510,19 +1162,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session2);
try
@@ -1535,45 +1176,20 @@
{
Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
}
-
- session1.close();
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session1, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1586,19 +1202,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session2.end(xid, XAResource.TMSUCCESS);
crash(session2);
@@ -1613,45 +1218,20 @@
{
Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
}
-
- session1.close();
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
// 1PC optimisation
public void testXAMessagesConsumedSoRollbackOnCommit() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
-
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ createSessionFactory();
ClientSession session1 = createSession(sf, false, false);
session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session1, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session1.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session1.commit();
ClientSession session2 = createSession(sf, true, false, false);
@@ -1664,19 +1244,8 @@
session2.start(xid, XAResource.TMNOFLAGS);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session2.end(xid, XAResource.TMSUCCESS);
// session2.prepare(xid);
@@ -1698,12 +1267,6 @@
session1.close();
session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testCreateNewFactoryAfterFailover() throws Exception
@@ -1711,7 +1274,7 @@
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = sendAndConsume(sf, true);
@@ -1721,27 +1284,15 @@
Thread.sleep(5000);
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
session = sendAndConsume(sf, true);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testFailoverMultipleSessionsWithConsumers() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
final int numSessions = 5;
final int numConsumersPerSession = 5;
@@ -1772,19 +1323,9 @@
ClientProducer producer = sendSession.createProducer(FailoverTestBase.ADDRESS);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = sendSession.createMessage(true);
+ sendMessages(sendSession, producer, NUM_MESSAGES);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
Set<ClientSession> sessionSet = sessionConsumerMap.keySet();
ClientSession[] sessions = new ClientSession[sessionSet.size()];
sessionSet.toArray(sessions);
@@ -1799,18 +1340,7 @@
{
for (ClientConsumer consumer : consumerList)
{
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
+ receiveMessages(consumer);
}
}
@@ -1818,14 +1348,6 @@
{
session.close();
}
-
- sendSession.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
/*
@@ -1833,105 +1355,56 @@
*/
public void testFailWithBrowser() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
-
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ createSessionFactory();
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer, 0, NUM_MESSAGES, false);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
- }
-
crash(session);
+ receiveDurableMessages(consumer);
+ }
+
+ private void sendMessagesSomeDurable(ClientSession session, ClientProducer producer)
throws Exception
+ {
for (int i = 0; i < NUM_MESSAGES; i++)
{
- // Only the persistent messages will survive
-
- if (i % 2 == 0)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
+ // some are durable, some are not!
+ boolean durable = isDurable(i);
+ ClientMessage message = session.createMessage(durable);
+ setBody(i, message);
+ message.putIntProperty("counter", i);
+ producer.send(message);
}
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
+ createSessionFactory();
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
-
ClientSession session = createSession(sf, true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
+ // Receive MSGs but don't ack!
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = consumer.receive(1000);
@@ -1947,31 +1420,29 @@
// Should get the same ones after failover since we didn't ack
+ receiveDurableMessages(consumer);
+ }
+
+ private void receiveDurableMessages(ClientConsumer consumer) throws HornetQException
+ {
for (int i = 0; i < NUM_MESSAGES; i++)
{
// Only the persistent messages will survive
- if (i % 2 == 0)
+ if (isDurable(i))
{
ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
+ Assert.assertNotNull("expecting durable msg " + i, message);
assertMessageBody(i, message);
-
Assert.assertEquals(i,
message.getIntProperty("counter").intValue());
-
message.acknowledge();
}
}
+ }
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private boolean isDurable(int i)
+ {
+ return i % 2 == 0;
}
public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception
@@ -1980,7 +1451,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = createSession(sf, true, true, 0);
@@ -1988,43 +1459,21 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
+ sendMessagesSomeDurable(session, producer);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
crash(session);
// Send some more
for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
{
- ClientMessage message = session.createMessage(i % 2 == 0);
+ ClientMessage message = session.createMessage(isDurable(i));
setBody(i, message);
@@ -2034,27 +1483,12 @@
}
// Should get the same ones after failover since we didn't ack
+ receiveMessages(consumer, NUM_MESSAGES, NUM_MESSAGES * 2, true);
+ }
- for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ private void receiveMessages(ClientConsumer consumer) throws HornetQException
+ {
+ receiveMessages(consumer, 0, NUM_MESSAGES, true);
}
public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
@@ -2083,7 +1517,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
ClientSession session = createSession(sf, true, true, 0);
@@ -2098,45 +1532,15 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
crash(session);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(i % 2 == 0);
+ sendMessagesSomeDurable(session, producer);
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ receiveMessages(consumer);
}
public void _testForceBlockingReturn() throws Exception
@@ -2145,7 +1549,7 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf =
(ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
// Add an interceptor to delay the send method so we can get time to cause failover
before it returns
@@ -2194,12 +1598,6 @@
Assert.assertEquals(sender.e.getCode(), HornetQException.UNBLOCKED);
session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
@@ -2209,14 +1607,15 @@
locator.setReconnectAttempts(-1);
locator.setBlockOnAcknowledge(true);
- final ClientSessionFactoryInternal sf =
createSessionFactoryAndWaitForTopology(locator, 2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
final ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
-
+
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
String txID = "my-tx-id";
@@ -2267,7 +1666,7 @@
}
catch (HornetQException e2)
{
-
+ throw new RuntimeException(e2);
}
}
@@ -2279,8 +1678,8 @@
Committer committer = new Committer();
- // Commit will occur, but response will never get back, connetion is failed, and
commit should be unblocked
- // with transaction rolled back
+ // Commit will occur, but response will never get back, connection is failed, and
commit
+ // should be unblocked with transaction rolled back
committer.start();
@@ -2293,7 +1692,7 @@
committer.join();
- Assert.assertFalse(committer.failed);
+ Assert.assertFalse("second attempt succeed?", committer.failed);
session.close();
@@ -2324,40 +1723,22 @@
try
{
session2.commit();
+ fail("expecting DUPLICATE_ID_REJECTED exception");
}
catch (HornetQException e)
{
- assertEquals(HornetQException.DUPLICATE_ID_REJECTED, e.getCode());
+ assertEquals(e.getMessage(), HornetQException.DUPLICATE_ID_REJECTED,
e.getCode());
}
ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
session2.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
ClientMessage message = consumer.receiveImmediate();
Assert.assertNull(message);
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testCommitDidNotOccurUnblockedAndResend() throws Exception
@@ -2366,27 +1747,17 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
final ClientSession session = createSession(sf, false, false);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
-
+
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+ sendMessages(session, producer,NUM_MESSAGES);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session.createMessage(true);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
class Committer extends Thread
{
@Override
@@ -2443,75 +1814,36 @@
producer = session2.createProducer(FailoverTestBase.ADDRESS);
// We now try and resend the messages since we get a transaction rolled back
exception
+ sendMessages(session2, producer,NUM_MESSAGES);
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = session2.createMessage(true);
-
- setBody(i, message);
-
- message.putIntProperty("counter", i);
-
- producer.send(message);
- }
-
session2.commit();
ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
session2.start();
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
+ receiveMessages(consumer);
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
ClientMessage message = consumer.receiveImmediate();
- Assert.assertNull(message);
-
- session2.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
+ Assert.assertNull("expecting null message", message);
}
public void testBackupServerNotRemoved() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setFailoverOnInitialConnection(true);
- locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener implements SessionFailureListener
+ // HORNETQ-720 Disabling test for replicating backups.
+ if (!backupServer.getServer().getConfiguration().isSharedStore())
{
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- latch.countDown();
- }
-
- public void beforeReconnect(HornetQException exception)
- {
- System.out.println("MyListener.beforeReconnect");
- }
+ waitForComponent(backupServer, 1);
+ return;
}
+ locator.setFailoverOnInitialConnection(true);
+ createSessionFactory();
+ CountDownSessionFailureListener listener = new CountDownSessionFailureListener();
ClientSession session = sendAndConsume(sf, true);
- session.addFailureListener(new MyListener());
+ session.addFailureListener(listener);
backupServer.stop();
@@ -2522,7 +1854,7 @@
backupServer.start();
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertTrue("session failure listener", listener.getLatch().await(5,
TimeUnit.SECONDS));
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -2531,23 +1863,12 @@
setBody(0, message);
producer.send(message);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testLiveAndBackupLiveComesBack() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
- locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ createSessionFactory();
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
@@ -2587,23 +1908,13 @@
setBody(0, message);
producer.send(message);
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
{
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
- locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
+ createSessionFactory();
+
final CountDownLatch latch = new CountDownLatch(1);
class MyListener implements SessionFailureListener
@@ -2646,7 +1957,7 @@
sf.close();
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
session = createSession(sf);
@@ -2659,14 +1970,6 @@
assertNotNull(cm);
Assert.assertEquals("message0", cm.getBodyBuffer().readString());
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
@@ -2675,25 +1978,12 @@
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
- ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
- final CountDownLatch latch = new CountDownLatch(1);
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ CountDownSessionFailureListener listener = new CountDownSessionFailureListener();
- class MyListener implements SessionFailureListener
- {
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- latch.countDown();
- }
-
- public void beforeReconnect(HornetQException exception)
- {
- System.out.println("MyListener.beforeReconnect");
- }
- }
-
ClientSession session = sendAndConsume(sf, true);
- session.addFailureListener(new MyListener());
+ session.addFailureListener(listener);
backupServer.stop();
@@ -2702,9 +1992,16 @@
// To reload security or other settings that are read during startup
beforeRestart(backupServer);
+ if (!backupServer.getServer().getConfiguration().isSharedStore())
+ {
+ // XXX
+ // this test would not make sense in the remote replication use case, without
the following
+ backupServer.getServer().getConfiguration().setBackup(false);
+ }
+
backupServer.start();
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertTrue("session failure listener", listener.getLatch().await(5,
TimeUnit.SECONDS));
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -2718,7 +2015,7 @@
sf.close();
- sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+ createClientSessionFactory();
session = createSession(sf);
@@ -2731,14 +2028,6 @@
assertNotNull(cm);
Assert.assertEquals("message0", cm.getBodyBuffer().readString());
-
- session.close();
-
- sf.close();
-
- Assert.assertEquals(0, sf.numSessions());
-
- Assert.assertEquals(0, sf.numConnections());
}
// Package protected ---------------------------------------------
@@ -2757,25 +2046,6 @@
return TransportConfigurationUtils.getInVMConnector(live);
}
- /**
- * @param i
- * @param message
- */
- protected void assertMessageBody(final int i, final ClientMessage message)
- {
- Assert.assertEquals("message" + i,
message.getBodyBuffer().readString());
- }
-
- /**
- * @param i
- * @param message
- * @throws Exception
- */
- protected void setBody(final int i, final ClientMessage message) throws Exception
- {
- message.getBodyBuffer().writeString("message" + i);
- }
-
protected void beforeRestart(TestableServer liveServer)
{
}