[hornetq-commits] JBoss hornetq SVN: r12067 - in trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration: cluster/failover and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Feb 1 11:50:11 EST 2012


Author: borges
Date: 2012-02-01 11:50:09 -0500 (Wed, 01 Feb 2012)
New Revision: 12067

Modified:
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Improve tearDown so as to avoid hanging tests.

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java	2012-02-01 16:49:50 UTC (rev 12066)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/PagingTest.java	2012-02-01 16:50:09 UTC (rev 12067)
@@ -71,7 +71,7 @@
  * A PagingTest
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * 
+ *
  * Created Dec 5, 2008 8:25:58 PM
  *
  *
@@ -79,7 +79,8 @@
 public class PagingTest extends ServiceTestBase
 {
    private ServerLocator locator;
-
+   private HornetQServer server;
+   private ClientSessionFactory sf;
    static final int MESSAGE_SIZE = 1024; // 1k
 
    public PagingTest(final String name)
@@ -126,8 +127,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
-                                          config,
+      server =
+               createServer(true, config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
                                           new HashMap<String, AddressSettings>());
@@ -140,13 +141,13 @@
 
       final int messagesPerTX = numberOfMessages / numberOfTX;
 
-         ServerLocator locator = createInVMNonHALocator();
+      locator = createInVMNonHALocator();
 
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+      sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, false, false);
 
@@ -340,7 +341,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -353,7 +355,7 @@
 
       final int numberOfMessages = 500;
 
-         ServerLocator locator = createInVMNonHALocator();
+      locator = createInVMNonHALocator();
 
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
@@ -361,7 +363,7 @@
          locator.setProducerWindowSize(-1);
          locator.setMinLargeMessageSize(1024 * 1024);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, false, false);
 
@@ -427,7 +429,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -437,15 +440,13 @@
 
       final int numberOfMessages = 1000;
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
+      locator = createInVMNonHALocator();
 
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+      sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, false, false);
 
@@ -544,17 +545,6 @@
             Thread.sleep(100);
          }
          assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
 
    }
 
@@ -570,7 +560,7 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server = createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -580,17 +570,15 @@
 
       final int numberOfMessages = 1000;
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
+        locator = createInVMNonHALocator();
 
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+        sf = createSessionFactory(locator);
 
-         ClientSession session = sf.createSession(false, false, false);
+      ClientSession session = sf.createSession(false, false, false);
 
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
 
@@ -778,18 +766,6 @@
 
          sessionConsumer.close();
 
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    public void testMissingTXEverythingAcked() throws Exception
@@ -800,7 +776,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -816,13 +793,13 @@
 
       try
       {
-         ServerLocator locator = createInVMNonHALocator();
+         locator = createInVMNonHALocator();
 
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, false, false);
 
@@ -927,7 +904,7 @@
 
       server.stop();
 
-      ServerLocator locator = createInVMNonHALocator();
+      locator = createInVMNonHALocator();
 
       locator.setBlockOnNonDurableSend(true);
       locator.setBlockOnDurableSend(true);
@@ -966,10 +943,6 @@
       assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
 
       sess.close();
-
-      locator.close();
-
-      server.stop();
    }
 
    public void testMissingTXEverythingAcked2() throws Exception
@@ -980,7 +953,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -996,13 +970,13 @@
 
       try
       {
-         ServerLocator locator = createInVMNonHALocator();
+         locator = createInVMNonHALocator();
 
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, false, false);
 
@@ -1086,10 +1060,8 @@
 
       server.start();
 
-      try
-      {
 
-         ServerLocator locator = createInVMNonHALocator();
+         locator = createInVMNonHALocator();
 
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
@@ -1129,13 +1101,6 @@
          {
             Thread.sleep(100);
          }
-
-         locator.close();
-      }
-      finally
-      {
-         server.stop();
-      }
    }
 
    public void testTwoQueuesOneNoRouting() throws Exception
@@ -1148,7 +1113,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -1158,15 +1124,13 @@
 
       final int numberOfMessages = 1000;
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
+         locator = createInVMNonHALocator();
 
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, false, false);
 
@@ -1235,22 +1199,6 @@
 
          // It's async, so need to wait a bit for it happening
          assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
-         sf.close();
-
-         locator.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    public void testSendReceivePagingPersistent() throws Exception
@@ -1281,7 +1229,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -1336,6 +1285,7 @@
             this.queue = queue;
          }
 
+         @Override
          public void run()
          {
             try
@@ -1362,13 +1312,13 @@
       try
       {
          {
-            ServerLocator locator = createInVMNonHALocator();
+            locator = createInVMNonHALocator();
 
             locator.setBlockOnNonDurableSend(true);
             locator.setBlockOnDurableSend(true);
             locator.setBlockOnAcknowledge(true);
 
-            ClientSessionFactory sf = createSessionFactory(locator);
+            sf = createSessionFactory(locator);
 
             ClientSession session = sf.createSession(false, false, false);
 
@@ -1441,7 +1391,7 @@
          tcount1.start();
          tcount2.start();
 
-         ServerLocator locator = createInVMNonHALocator();
+         locator = createInVMNonHALocator();
          final ClientSessionFactory sf2 = createSessionFactory(locator);
 
          final AtomicInteger errors = new AtomicInteger(0);
@@ -1455,6 +1405,7 @@
 
             threads[start - 1] = new Thread()
             {
+               @Override
                public void run()
                {
                   try
@@ -1578,7 +1529,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -1597,16 +1549,14 @@
          bb.put(getSamplebyte(j));
       }
 
-      try
       {
-         {
-            ServerLocator locator = createInVMNonHALocator();
+            locator = createInVMNonHALocator();
 
             locator.setBlockOnNonDurableSend(true);
             locator.setBlockOnDurableSend(true);
             locator.setBlockOnAcknowledge(true);
 
-            ClientSessionFactory sf = createSessionFactory(locator);
+            sf = createSessionFactory(locator);
 
             ClientSession session = sf.createSession(false, false, false);
 
@@ -1652,13 +1602,14 @@
                                new HashMap<String, AddressSettings>());
          server.start();
 
-         ServerLocator locator = createInVMNonHALocator();
+         locator = createInVMNonHALocator();
          final ClientSessionFactory sf2 = createSessionFactory(locator);
 
          final AtomicInteger errors = new AtomicInteger(0);
 
          Thread t = new Thread()
          {
+            @Override
             public void run()
             {
                try
@@ -1734,20 +1685,8 @@
 
          assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
 
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
          }
-         catch (Throwable ignored)
-         {
-         }
-      }
 
-   }
-
    private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
    {
 
@@ -1758,7 +1697,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -1769,16 +1709,13 @@
       final int numberOfIntegers = 256;
 
       final int numberOfMessages = 1000;
+      locator = createInVMNonHALocator();
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
-
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -1867,22 +1804,6 @@
          consumer.close();
 
          session.close();
-
-         sf.close();
-
-         locator.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    private void assertBodiesEqual(final byte[] body, final HornetQBuffer buffer)
@@ -1900,7 +1821,7 @@
     * - Consume the entire destination (not in page mode any more)
     * - Add stuff to a transaction again
     * - Check order
-    * 
+    *
     */
    public void testDepageDuringTransaction() throws Exception
    {
@@ -1908,7 +1829,8 @@
 
       Configuration config = createDefaultConfig();
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -1916,14 +1838,12 @@
 
       server.start();
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
+         locator = createInVMNonHALocator();
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
@@ -2015,22 +1935,6 @@
          consumer.close();
 
          session.close();
-
-         sf.close();
-
-         locator.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    /**
@@ -2039,9 +1943,9 @@
     * - Consume the entire destination (not in page mode any more)
     * - Add stuff to a transaction again
     * - Check order
-    * 
+    *
     *  Test under discussion at : http://community.jboss.org/thread/154061?tstart=0
-    * 
+    *
     */
    public void testDepageDuringTransaction2() throws Exception
    {
@@ -2050,7 +1954,8 @@
 
       Configuration config = createDefaultConfig();
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -2058,14 +1963,12 @@
 
       server.start();
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
+      locator = createInVMNonHALocator();
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          byte[] body = new byte[MESSAGE_SIZE];
 
@@ -2175,30 +2078,16 @@
 
          session.close();
 
-         sf.close();
-
-         locator.close();
       }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
 
-   }
-
    public void testDepageDuringTransaction3() throws Exception
    {
       clearData();
 
       Configuration config = createDefaultConfig();
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -2206,14 +2095,12 @@
 
       server.start();
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
+      locator = createInVMNonHALocator();
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          byte[] body = new byte[MESSAGE_SIZE];
 
@@ -2304,23 +2191,7 @@
 
          consumer.close();
 
-         sessionNonTX.close();
-
-         sf.close();
-
-         locator.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
+      sessionNonTX.close();
    }
 
    public void testDepageDuringTransaction4() throws Exception
@@ -2329,7 +2200,8 @@
 
       Configuration config = createDefaultConfig();
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -2344,21 +2216,18 @@
 
       final int numberOfMessages = 10000;
 
-      ServerLocator locator = createInVMNonHALocator();
+      locator = createInVMNonHALocator();
 
       locator.setBlockOnNonDurableSend(true);
       locator.setBlockOnDurableSend(true);
       locator.setBlockOnAcknowledge(false);
+      sf = createSessionFactory(locator);
 
-      try
-      {
-
-         final ClientSessionFactory sf = createSessionFactory(locator);
-
          final byte[] body = new byte[MESSAGE_SIZE];
 
          Thread producerThread = new Thread()
          {
+            @Override
             public void run()
             {
                ClientSession sessionProducer = null;
@@ -2439,17 +2308,6 @@
          sf.close();
 
          assertEquals(0, errors.get());
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
    }
 
    public void testOrderingNonTX() throws Exception
@@ -2458,7 +2316,8 @@
 
       Configuration config = createDefaultConfig();
 
-      final HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                                 config,
                                                 PagingTest.PAGE_SIZE,
                                                 PagingTest.PAGE_SIZE * 2,
@@ -2473,12 +2332,10 @@
 
       final int numberOfMessages = 2000;
 
-      try
-      {
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
-         final ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          final CountDownLatch ready = new CountDownLatch(1);
 
@@ -2486,6 +2343,7 @@
 
          Thread producerThread = new Thread()
          {
+            @Override
             public void run()
             {
                ClientSession sessionProducer = null;
@@ -2568,18 +2426,6 @@
          producerThread.join();
 
          assertEquals(0, errors.get());
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    public void testPageOnSchedulingNoRestart() throws Exception
@@ -2600,7 +2446,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -2612,14 +2459,11 @@
 
       final int numberOfBytes = 1024;
 
-      try
-      {
-
-         locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
          ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2713,18 +2557,6 @@
          consumer.close();
 
          session.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    public void testRollbackOnSend() throws Exception
@@ -2733,7 +2565,8 @@
 
       Configuration config = createDefaultConfig();
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -2745,14 +2578,11 @@
 
       final int numberOfMessages = 10;
 
-      try
-      {
-
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
          ClientSession session = sf.createSession(null, null, false, false, true, false, 0);
 
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2786,18 +2616,6 @@
          Assert.assertNull(consumer.receiveImmediate());
 
          session.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    public void testCommitOnSend() throws Exception
@@ -2806,7 +2624,8 @@
 
       Configuration config = createDefaultConfig();
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -2818,14 +2637,11 @@
 
       final int numberOfMessages = 500;
 
-      try
-      {
-
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
          ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
 
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -2889,18 +2705,6 @@
          }
 
          session.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    public void testParialConsume() throws Exception
@@ -2909,7 +2713,8 @@
 
       Configuration config = createDefaultConfig();
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -2919,14 +2724,11 @@
 
       final int numberOfMessages = 1000;
 
-      try
-      {
-
-         locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
          ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
 
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3019,19 +2821,7 @@
 
          session.close();
       }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
 
-   }
-
    public void testPageMultipleDestinations() throws Exception
    {
       internalTestPageMultipleDestinations(false);
@@ -3055,20 +2845,17 @@
 
       settings.put(PagingTest.ADDRESS.toString(), set);
 
-      HornetQServer server = createServer(true, config, 1024, 10 * 1024, settings);
+      server = createServer(true, config, 1024, 10 * 1024, settings);
 
       server.start();
 
       final int numberOfMessages = 1000;
 
-      try
-      {
-
-         locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
          ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
 
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3169,19 +2956,6 @@
                                       .getPagingManager()
                                       .getPageStore(PagingTest.ADDRESS)
                                       .getAddressSize());
-
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    public void testDropMessagesExpiring() throws Exception
@@ -3197,18 +2971,15 @@
 
       settings.put(PagingTest.ADDRESS.toString(), set);
 
-      HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+      server = createServer(true, config, 1024, 1024 * 1024, settings);
 
       server.start();
 
       final int numberOfMessages = 30000;
 
-      try
-      {
-
          locator.setAckBatchSize(0);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
          ClientSession session = sf.createSession();
 
          session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
@@ -3269,18 +3040,6 @@
          }
 
          session.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    private void internalTestPageMultipleDestinations(final boolean transacted) throws Exception
@@ -3291,22 +3050,19 @@
 
       int NUMBER_OF_MESSAGES = 2;
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
                                           new HashMap<String, AddressSettings>());
 
       server.start();
-
-      try
-      {
-
-         locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
          ClientSession session = sf.createSession(null, null, false, !transacted, true, false, 0);
 
          for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
@@ -3379,26 +3135,14 @@
             Assert.assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getMessageCount());
             Assert.assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getDeliveringCount());
          }
-
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    public void testSyncPage() throws Exception
    {
       Configuration config = createDefaultConfig();
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -3498,7 +3242,8 @@
    {
       Configuration config = createDefaultConfig();
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -3506,9 +3251,7 @@
 
       server.start();
 
-      try
-      {
-         server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
+      server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
 
          final CountDownLatch pageUp = new CountDownLatch(0);
          final CountDownLatch pageDone = new CountDownLatch(1);
@@ -3575,21 +3318,6 @@
          assertTrue(pageUp.await(10, TimeUnit.SECONDS));
 
          assertTrue(pageDone.await(10, TimeUnit.SECONDS));
-
-         server.stop();
-
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
    }
 
    public void testPagingOneDestinationOnly() throws Exception
@@ -3609,13 +3337,11 @@
 
       addresses.put(PAGED_ADDRESS.toString(), pagedDestination);
 
-      HornetQServer server = createServer(true, configuration, -1, -1, addresses);
+      server = createServer(true, configuration, -1, -1, addresses);
 
-      try
-      {
-         server.start();
+      server.start();
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, true, false);
 
@@ -3682,15 +3408,6 @@
          Assert.assertNull(consumerPaged.receiveImmediate());
 
          session.close();
-
-      }
-      finally
-      {
-         if (server.isStarted())
-         {
-            server.stop();
-         }
-      }
    }
 
    public void testPagingDifferentSizes() throws Exception
@@ -3718,14 +3435,11 @@
 
       addresses.put(PAGED_ADDRESS_B.toString(), pagedDestinationB);
 
-      HornetQServer server = createServer(true, configuration, -1, -1, addresses);
+      server = createServer(true, configuration, -1, -1, addresses);
+      server.start();
 
-      try
-      {
-         server.start();
+         sf = createSessionFactory(locator);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
-
          ClientSession session = sf.createSession(false, true, false);
 
          session.createQueue(PAGED_ADDRESS_A, PAGED_ADDRESS_A, true);
@@ -3813,15 +3527,6 @@
          consumerB.close();
 
          session.close();
-
-      }
-      finally
-      {
-         if (server.isStarted())
-         {
-            server.stop();
-         }
-      }
    }
 
    public void testPageAndDepageRapidly() throws Exception
@@ -3835,7 +3540,7 @@
       config.setJournalSyncNonTransactional(false);
       config.setJournalFileSize(10 * 1024 * 1024);
 
-      HornetQServer server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
+      server = createServer(true, config, 512 * 1024, 1024 * 1024, new HashMap<String, AddressSettings>());
 
       server.start();
 
@@ -3843,15 +3548,13 @@
 
       final int numberOfMessages = 200;
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
+         locator = createInVMNonHALocator();
 
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         final ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(true, true);
 
@@ -3863,6 +3566,7 @@
 
          Thread consumeThread = new Thread()
          {
+            @Override
             public void run()
             {
                ClientSession sessionConsumer = null;
@@ -3947,24 +3651,8 @@
          assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
 
          assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
-
-         sf.close();
-
-         locator.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
          }
-         catch (Throwable ignored)
-         {
-         }
-      }
 
-   }
-
    public void testTwoQueuesDifferentFilters() throws Exception
    {
       boolean persistentMessages = true;
@@ -3975,7 +3663,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -3984,11 +3673,8 @@
       server.start();
 
       final int numberOfMessages = 200;
+      locator = createInVMNonHALocator();
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
-
          locator.setClientFailureCheckPeriod(120000);
          locator.setConnectionTTL(5000000);
          locator.setCallTimeout(120000);
@@ -3997,7 +3683,7 @@
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, false, false);
 
@@ -4071,21 +3757,6 @@
 
          // It's async, so need to wait a bit for it happening
          assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
-         sf.close();
-
-         locator.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
    }
 
    public void testTwoQueues() throws Exception
@@ -4098,7 +3769,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -4107,11 +3779,8 @@
       server.start();
 
       final int numberOfMessages = 1000;
+      locator = createInVMNonHALocator();
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
-
          locator.setClientFailureCheckPeriod(120000);
          locator.setConnectionTTL(5000000);
          locator.setCallTimeout(120000);
@@ -4120,7 +3789,7 @@
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, false, false);
 
@@ -4191,21 +3860,6 @@
 
          // It's async, so need to wait a bit for it happening
          assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
-
-         sf.close();
-
-         locator.close();
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
    }
 
    public void testTwoQueuesConsumeOneRestart() throws Exception
@@ -4218,7 +3872,8 @@
 
       config.setJournalSyncNonTransactional(false);
 
-      HornetQServer server = createServer(true,
+      server =
+               createServer(true,
                                           config,
                                           PagingTest.PAGE_SIZE,
                                           PagingTest.PAGE_MAX,
@@ -4227,11 +3882,8 @@
       server.start();
 
       final int numberOfMessages = 1000;
+      locator = createInVMNonHALocator();
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
-
          locator.setClientFailureCheckPeriod(120000);
          locator.setConnectionTTL(5000000);
          locator.setCallTimeout(120000);
@@ -4240,7 +3892,7 @@
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, false, false);
 
@@ -4298,9 +3950,9 @@
          assertNull(consumer.receiveImmediate());
 
          consumer.close();
-         
+
          long timeout = System.currentTimeMillis() + 10000;
-         
+
          PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
 
          // It's async, so need to wait a bit for it happening
@@ -4317,22 +3969,7 @@
 
          server.stop();
          server.start();
-
-         sf.close();
-
-         locator.close();
       }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-   }
 
    public void testDLAOnLargeMessageAndPaging() throws Exception
    {
@@ -4349,12 +3986,10 @@
       dla.setDeadLetterAddress(new SimpleString("DLA"));
       settings.put(ADDRESS.toString(), dla);
 
-      final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+      server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
 
       server.start();
 
-      ServerLocator locator = null;
-      ClientSessionFactory sf = null;
       ClientSession session = null;
       try
       {
@@ -4564,15 +4199,6 @@
       finally
       {
          session.close();
-         sf.close();
-         locator.close();
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
       }
    }
 
@@ -4592,21 +4218,19 @@
       dla.setExpiryAddress(new SimpleString("DLA"));
       settings.put(ADDRESS.toString(), dla);
 
-      final HornetQServer server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
+      server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, settings);
 
       server.start();
 
       final int MESSAGE_SIZE = 20;
 
-      try
-      {
-         ServerLocator locator = createInVMNonHALocator();
+         locator = createInVMNonHALocator();
 
          locator.setBlockOnNonDurableSend(true);
          locator.setBlockOnDurableSend(true);
          locator.setBlockOnAcknowledge(true);
 
-         ClientSessionFactory sf = createSessionFactory(locator);
+         sf = createSessionFactory(locator);
 
          ClientSession session = sf.createSession(false, false, false);
 
@@ -4718,19 +4342,7 @@
 
          assertFalse(pgStoreAddress.isPaging());
 
-         session.close();
-      }
-      finally
-      {
-         locator.close();
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
+      session.close();
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2012-02-01 16:49:50 UTC (rev 12066)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2012-02-01 16:50:09 UTC (rev 12067)
@@ -46,6 +46,7 @@
 import org.hornetq.core.transaction.impl.XidImpl;
 import org.hornetq.jms.client.HornetQTextMessage;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.CountDownSessionFailureListener;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.TransportConfigurationUtils;
 
@@ -94,7 +95,7 @@
    }
 
 	protected ClientSession createSession(ClientSessionFactory sf,
-			boolean autoCommitSends, 
+			boolean autoCommitSends,
 			boolean autoCommitAcks,
 			int ackBatchSize) throws Exception
 	{
@@ -116,7 +117,7 @@
                                          boolean autoCommitSends,
                                          boolean autoCommitAcks) throws Exception
    {
-      return sf.createSession(xa, autoCommitSends, autoCommitAcks);
+      return addClientSession(sf.createSession(xa, autoCommitSends, autoCommitAcks));
    }
 
    // https://issues.jboss.org/browse/HORNETQ-685
@@ -263,7 +264,7 @@
 
       session.close();
    }
-   
+
    public void testTimeoutOnFailoverConsumeBlocked() throws Exception
    {
       locator.setCallTimeout(5000);
@@ -303,6 +304,7 @@
 
       Thread t = new Thread()
       {
+         @Override
          public void run()
          {
             ClientMessage message = null;
@@ -333,7 +335,7 @@
                   {
                      endLatch.countDown();
                   }
-                  
+
                   if (message.getBooleanProperty("end"))
                   {
                      break;
@@ -346,7 +348,7 @@
             }
 
          }
-         
+
          private ClientMessage getMessage()
          {
             while (true)
@@ -482,7 +484,7 @@
       locator.setAckBatchSize(0);
       locator.setReconnectAttempts(-1);
 
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+      createClientSessionFactory();
 
       ClientSession session = createSession(sf, true, true);
 
@@ -490,8 +492,6 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
-
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = session.createMessage(true);
@@ -544,68 +544,35 @@
       System.out.println("received.size() = " + received.size());
       session.close();
 
-      sf.close();
-
       Assert.assertTrue(retry <= 5);
+   }
 
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+   private void createClientSessionFactory() throws Exception
+   {
+      sf = (ClientSessionFactoryInternal)createSessionFactory(locator);
    }
 
    public void testNonTransacted() throws Exception
    {
-      ClientSessionFactoryInternal sf;
 
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session = createSession(sf, true, true);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       crash(session);
 
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
       session.start();
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         // Only the persistent messages will survive
+      receiveDurableMessages(consumer);
 
-         if (i % 2 == 0)
-         {
-            ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
-            assertMessageBody(i, message);
-
-            Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-            message.acknowledge();
-         }
-      }
-
       session.close();
 
       sf.close();
@@ -615,14 +582,19 @@
       Assert.assertEquals(0, sf.numConnections());
    }
 
-   public void testConsumeTransacted() throws Exception
+   private void createSessionFactory() throws Exception
    {
       locator.setBlockOnNonDurableSend(true);
       locator.setBlockOnDurableSend(true);
       locator.setReconnectAttempts(-1);
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
+   }
 
+   public void testConsumeTransacted() throws Exception
+   {
+      createSessionFactory();
+
       ClientSession session = createSession(sf, false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -631,17 +603,8 @@
 
       final int numMessages = 10;
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(true);
+      sendMessages(session, producer, numMessages);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session.commit();
 
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -651,6 +614,7 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = consumer.receive(1000);
+         assertNotNull("Just crashed? " + (i == 6) + " " + i, message);
 
          message.acknowledge();
 
@@ -683,7 +647,7 @@
       {
          ClientMessage message = consumer.receive(1000);
 
-         assertNotNull(message);
+         assertNotNull("Expecting message #" + i, message);
 
          message.acknowledge();
       }
@@ -691,12 +655,6 @@
       session.commit();
 
       session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    // https://jira.jboss.org/jira/browse/HORNETQ-285
@@ -707,7 +665,7 @@
       locator.setFailoverOnInitialConnection(true);
       locator.setReconnectAttempts(-1);
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
       // Crash live server
       crash();
@@ -718,72 +676,30 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(true);
+      sendMessages(session, producer, NUM_MESSAGES);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
       session.start();
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testTransactedMessagesSentSoRollback() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session = createSession(sf, false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       crash(session);
 
       Assert.assertTrue(session.isRollbackOnly());
@@ -805,15 +721,9 @@
 
       ClientMessage message = consumer.receiveImmediate();
 
-      Assert.assertNull(message);
+      Assert.assertNull("message should be null! Was: " + message, message);
 
       session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    /**
@@ -822,31 +732,16 @@
     */
    public void testTransactedMessagesSentSoRollbackAndContinueWork() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session = createSession(sf, false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       crash(session);
 
       Assert.assertTrue(session.isRollbackOnly());
@@ -879,45 +774,24 @@
 
       message = consumer.receiveImmediate();
 
-      Assert.assertNotNull(message);
+      Assert.assertNotNull("expecting a message", message);
       Assert.assertEquals(counter, message.getIntProperty("counter").intValue());
 
       session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testTransactedMessagesNotSentSoNoRollback() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session = createSession(sf, false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session.commit();
 
       crash(session);
@@ -932,45 +806,19 @@
 
       session.start();
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         // Only the persistent messages will survive
+      receiveDurableMessages(consumer);
 
-         if (i % 2 == 0)
-         {
-            ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
-            assertMessageBody(i, message);
-
-            Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-            message.acknowledge();
-         }
-      }
-
       Assert.assertNull(consumer.receiveImmediate());
 
       session.commit();
 
       session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session = createSession(sf, false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -982,19 +830,8 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       // messages will be delivered to the consumer when the session is committed
       session.commit();
 
@@ -1012,64 +849,25 @@
 
       session.start();
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         // Only the persistent messages will survive
+      receiveDurableMessages(consumer);
 
-         if (i % 2 == 0)
-         {
-            ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
-            assertMessageBody(i, message);
-
-            Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-            message.acknowledge();
-         }
-      }
-
       Assert.assertNull(consumer.receiveImmediate());
 
       session.commit();
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testTransactedMessagesConsumedSoRollback() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session1 = createSession(sf, false, false);
 
       session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session1, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session1.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session1.commit();
 
       ClientSession session2 = createSession(sf, false, false);
@@ -1078,19 +876,8 @@
 
       session2.start();
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       crash(session2);
 
       Assert.assertTrue(session2.isRollbackOnly());
@@ -1105,34 +892,20 @@
       {
          Assert.assertEquals(HornetQException.TRANSACTION_ROLLED_BACK, e.getCode());
       }
-
-      session1.close();
-
-      session2.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session1 = createSession(sf, false, false);
 
       session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      
 
+
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = session1.createMessage(true);
@@ -1179,7 +952,7 @@
       {
          ClientMessage message = consumer.receive(1000);
 
-         Assert.assertNotNull(message);
+         Assert.assertNotNull("expecting message " + i, message);
 
          assertMessageBody(i, message);
 
@@ -1191,26 +964,12 @@
       session2.commit();
 
       Assert.assertNull(consumer.receiveImmediate());
-
-      session1.close();
-
-      session2.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testXAMessagesSentSoRollbackOnEnd() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session = createSession(sf, true, false, false);
 
       Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1219,21 +978,12 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
 
+
       session.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+      sendMessagesSomeDurable(session, producer);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       crash(session);
 
       try
@@ -1254,47 +1004,26 @@
       ClientMessage message = consumer.receiveImmediate();
 
       Assert.assertNull(message);
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      final ClientSession session = createSession(sf, true, false, false);
 
-      ClientSession session = createSession(sf, true, false, false);
-
       Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
-      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+      final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
 
+
       session.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+      sendMessagesSomeDurable(session, producer);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session.end(xid, XAResource.TMSUCCESS);
 
       crash(session);
@@ -1308,6 +1037,7 @@
       catch (XAException e)
       {
          Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
+       // XXXX  session.rollback();
       }
 
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -1318,24 +1048,15 @@
 
       Assert.assertNull(message);
 
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      producer.close();
+      consumer.close();
    }
 
    // This might happen if 1PC optimisation kicks in
    public void testXAMessagesSentSoRollbackOnCommit() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session = createSession(sf, true, false, false);
 
       Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1344,21 +1065,12 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
 
+
       session.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+      sendMessagesSomeDurable(session, producer);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session.end(xid, XAResource.TMSUCCESS);
 
       crash(session);
@@ -1381,24 +1093,12 @@
       ClientMessage message = consumer.receiveImmediate();
 
       Assert.assertNull(message);
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session = createSession(sf, true, false, false);
 
       Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
@@ -1407,21 +1107,12 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
 
+
       session.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+      sendMessagesSomeDurable(session, producer);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session.end(xid, XAResource.TMSUCCESS);
 
       session.prepare(xid);
@@ -1438,66 +1129,27 @@
 
       session.start(xid2, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         // Only the persistent messages will survive
+      receiveDurableMessages(consumer);
 
-         if (i % 2 == 0)
-         {
-            ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
-            assertMessageBody(i, message);
-
-            Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-            message.acknowledge();
-         }
-      }
-
       session.end(xid2, XAResource.TMSUCCESS);
 
       session.prepare(xid2);
 
       session.commit(xid2, false);
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session1 = createSession(sf, false, false);
 
       session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session1, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session1.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session1.commit();
 
       ClientSession session2 = createSession(sf, true, false, false);
@@ -1510,19 +1162,8 @@
 
       session2.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       crash(session2);
 
       try
@@ -1535,45 +1176,20 @@
       {
          Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
       }
-
-      session1.close();
-
-      session2.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session1 = createSession(sf, false, false);
 
       session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session1, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session1.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session1.commit();
 
       ClientSession session2 = createSession(sf, true, false, false);
@@ -1586,19 +1202,8 @@
 
       session2.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       session2.end(xid, XAResource.TMSUCCESS);
 
       crash(session2);
@@ -1613,45 +1218,20 @@
       {
          Assert.assertEquals(XAException.XA_RBOTHER, e.errorCode);
       }
-
-      session1.close();
-
-      session2.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    // 1PC optimisation
    public void testXAMessagesConsumedSoRollbackOnCommit() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
-
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      createSessionFactory();
       ClientSession session1 = createSession(sf, false, false);
 
       session1.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session1, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session1.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session1.commit();
 
       ClientSession session2 = createSession(sf, true, false, false);
@@ -1664,19 +1244,8 @@
 
       session2.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       session2.end(xid, XAResource.TMSUCCESS);
 
       // session2.prepare(xid);
@@ -1698,12 +1267,6 @@
       session1.close();
 
       session2.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testCreateNewFactoryAfterFailover() throws Exception
@@ -1711,7 +1274,7 @@
       locator.setBlockOnNonDurableSend(true);
       locator.setBlockOnDurableSend(true);
       locator.setFailoverOnInitialConnection(true);
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
       ClientSession session = sendAndConsume(sf, true);
 
@@ -1721,27 +1284,15 @@
 
       Thread.sleep(5000);
 
-      sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+      createClientSessionFactory();
 
       session = sendAndConsume(sf, true);
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testFailoverMultipleSessionsWithConsumers() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       final int numSessions = 5;
 
       final int numConsumersPerSession = 5;
@@ -1772,19 +1323,9 @@
 
       ClientProducer producer = sendSession.createProducer(FailoverTestBase.ADDRESS);
 
-      
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = sendSession.createMessage(true);
+      sendMessages(sendSession, producer, NUM_MESSAGES);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       Set<ClientSession> sessionSet = sessionConsumerMap.keySet();
       ClientSession[] sessions = new ClientSession[sessionSet.size()];
       sessionSet.toArray(sessions);
@@ -1799,18 +1340,7 @@
       {
          for (ClientConsumer consumer : consumerList)
          {
-            for (int i = 0; i < NUM_MESSAGES; i++)
-            {
-               ClientMessage message = consumer.receive(1000);
-
-               Assert.assertNotNull(message);
-
-               assertMessageBody(i, message);
-
-               Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-               message.acknowledge();
-            }
+            receiveMessages(consumer);
          }
       }
 
@@ -1818,14 +1348,6 @@
       {
          session.close();
       }
-
-      sendSession.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    /*
@@ -1833,105 +1355,56 @@
     */
    public void testFailWithBrowser() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
-
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      createSessionFactory();
       ClientSession session = createSession(sf, true, true);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
 
       session.start();
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer, 0, NUM_MESSAGES, false);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-      }
-
       crash(session);
 
+      receiveDurableMessages(consumer);
+   }
+
+   private void sendMessagesSomeDurable(ClientSession session, ClientProducer producer) throws Exception
+   {
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
-         // Only the persistent messages will survive
-
-         if (i % 2 == 0)
-         {
-            ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
-            assertMessageBody(i, message);
-
-            Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-            message.acknowledge();
-         }
+         // some are durable, some are not!
+         boolean durable = isDurable(i);
+         ClientMessage message = session.createMessage(durable);
+         setBody(i, message);
+         message.putIntProperty("counter", i);
+         producer.send(message);
       }
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
+      createSessionFactory();
 
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-
       ClientSession session = createSession(sf, true, true);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
       session.start();
 
+      // Receive MSGs but don't ack!
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = consumer.receive(1000);
@@ -1947,31 +1420,29 @@
 
       // Should get the same ones after failover since we didn't ack
 
+      receiveDurableMessages(consumer);
+   }
+
+   private void receiveDurableMessages(ClientConsumer consumer) throws HornetQException
+   {
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
          // Only the persistent messages will survive
 
-         if (i % 2 == 0)
+         if (isDurable(i))
          {
             ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
+            Assert.assertNotNull("expecting durable msg " + i, message);
             assertMessageBody(i, message);
-
             Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
             message.acknowledge();
          }
       }
+   }
 
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+   private boolean isDurable(int i)
+   {
+      return i % 2 == 0;
    }
 
    public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception
@@ -1980,7 +1451,7 @@
       locator.setBlockOnDurableSend(true);
       locator.setBlockOnAcknowledge(true);
       locator.setReconnectAttempts(-1);
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
       ClientSession session = createSession(sf, true, true, 0);
 
@@ -1988,43 +1459,21 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
+      sendMessagesSomeDurable(session, producer);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
       session.start();
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       crash(session);
 
       // Send some more
 
       for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
       {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+         ClientMessage message = session.createMessage(isDurable(i));
 
          setBody(i, message);
 
@@ -2034,27 +1483,12 @@
       }
 
       // Should get the same ones after failover since we didn't ack
+      receiveMessages(consumer, NUM_MESSAGES, NUM_MESSAGES * 2, true);
+   }
 
-      for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+   private void receiveMessages(ClientConsumer consumer) throws HornetQException
+   {
+      receiveMessages(consumer, 0, NUM_MESSAGES, true);
    }
 
    public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
@@ -2083,7 +1517,7 @@
       locator.setBlockOnDurableSend(true);
       locator.setBlockOnAcknowledge(true);
       locator.setReconnectAttempts(-1);
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
       ClientSession session = createSession(sf, true, true, 0);
 
@@ -2098,45 +1532,15 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      
-
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
       session.start();
 
       crash(session);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+      sendMessagesSomeDurable(session, producer);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      receiveMessages(consumer);
    }
 
    public void _testForceBlockingReturn() throws Exception
@@ -2145,7 +1549,7 @@
       locator.setBlockOnDurableSend(true);
       locator.setBlockOnAcknowledge(true);
       locator.setReconnectAttempts(-1);
-      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+      createClientSessionFactory();
 
       // Add an interceptor to delay the send method so we can get time to cause failover before it returns
 
@@ -2194,12 +1598,6 @@
       Assert.assertEquals(sender.e.getCode(), HornetQException.UNBLOCKED);
 
       session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
@@ -2209,14 +1607,15 @@
       locator.setReconnectAttempts(-1);
 
       locator.setBlockOnAcknowledge(true);
-      final ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
+
       final ClientSession session = createSession(sf, false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
-      
 
+
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
       String txID = "my-tx-id";
@@ -2267,7 +1666,7 @@
                   }
                   catch (HornetQException e2)
                   {
-
+                     throw new RuntimeException(e2);
                   }
 
                }
@@ -2279,8 +1678,8 @@
 
       Committer committer = new Committer();
 
-      // Commit will occur, but response will never get back, connetion is failed, and commit should be unblocked
-      // with transaction rolled back
+      // Commit will occur, but response will never get back, connection is failed, and commit
+      // should be unblocked with transaction rolled back
 
       committer.start();
 
@@ -2293,7 +1692,7 @@
 
       committer.join();
 
-      Assert.assertFalse(committer.failed);
+      Assert.assertFalse("second attempt succeed?", committer.failed);
 
       session.close();
 
@@ -2324,40 +1723,22 @@
       try
       {
          session2.commit();
+         fail("expecting DUPLICATE_ID_REJECTED exception");
       }
       catch (HornetQException e)
       {
-         assertEquals(HornetQException.DUPLICATE_ID_REJECTED, e.getCode());
+         assertEquals(e.getMessage(), HornetQException.DUPLICATE_ID_REJECTED, e.getCode());
       }
 
       ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
 
       session2.start();
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       ClientMessage message = consumer.receiveImmediate();
 
       Assert.assertNull(message);
-
-      session2.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testCommitDidNotOccurUnblockedAndResend() throws Exception
@@ -2366,27 +1747,17 @@
       locator.setBlockOnDurableSend(true);
       locator.setBlockOnAcknowledge(true);
       locator.setReconnectAttempts(-1);
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
       final ClientSession session = createSession(sf, false, false);
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
-      
 
+
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+      sendMessages(session, producer,NUM_MESSAGES);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session.createMessage(true);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       class Committer extends Thread
       {
          @Override
@@ -2443,75 +1814,36 @@
       producer = session2.createProducer(FailoverTestBase.ADDRESS);
 
       // We now try and resend the messages since we get a transaction rolled back exception
+      sendMessages(session2, producer,NUM_MESSAGES);
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = session2.createMessage(true);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session2.commit();
 
       ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
 
       session2.start();
 
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       ClientMessage message = consumer.receiveImmediate();
 
-      Assert.assertNull(message);
-
-      session2.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      Assert.assertNull("expecting null message", message);
    }
 
    public void testBackupServerNotRemoved() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setFailoverOnInitialConnection(true);
-      locator.setReconnectAttempts(-1);
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-      final CountDownLatch latch = new CountDownLatch(1);
-
-      class MyListener implements SessionFailureListener
+      // HORNETQ-720 Disabling test for replicating backups.
+      if (!backupServer.getServer().getConfiguration().isSharedStore())
       {
-         public void connectionFailed(final HornetQException me, boolean failedOver)
-         {
-            latch.countDown();
-         }
-
-         public void beforeReconnect(HornetQException exception)
-         {
-            System.out.println("MyListener.beforeReconnect");
-         }
+         waitForComponent(backupServer, 1);
+         return;
       }
+      locator.setFailoverOnInitialConnection(true);
+      createSessionFactory();
 
+      CountDownSessionFailureListener listener = new CountDownSessionFailureListener();
       ClientSession session = sendAndConsume(sf, true);
 
-      session.addFailureListener(new MyListener());
+      session.addFailureListener(listener);
 
       backupServer.stop();
 
@@ -2522,7 +1854,7 @@
 
       backupServer.start();
 
-      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      assertTrue("session failure listener", listener.getLatch().await(5, TimeUnit.SECONDS));
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
@@ -2531,23 +1863,12 @@
       setBody(0, message);
 
       producer.send(message);
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testLiveAndBackupLiveComesBack() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
       locator.setFailoverOnInitialConnection(true);
-      locator.setReconnectAttempts(-1);
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      createSessionFactory();
       final CountDownLatch latch = new CountDownLatch(1);
 
       class MyListener implements SessionFailureListener
@@ -2587,23 +1908,13 @@
       setBody(0, message);
 
       producer.send(message);
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
    {
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
       locator.setFailoverOnInitialConnection(true);
-      locator.setReconnectAttempts(-1);
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      createSessionFactory();
+
       final CountDownLatch latch = new CountDownLatch(1);
 
       class MyListener implements SessionFailureListener
@@ -2646,7 +1957,7 @@
 
       sf.close();
 
-      sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+      createClientSessionFactory();
 
       session = createSession(sf);
 
@@ -2659,14 +1970,6 @@
       assertNotNull(cm);
 
       Assert.assertEquals("message0", cm.getBodyBuffer().readString());
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
@@ -2675,25 +1978,12 @@
       locator.setBlockOnDurableSend(true);
       locator.setFailoverOnInitialConnection(true);
       locator.setReconnectAttempts(-1);
-      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 2);
-      final CountDownLatch latch = new CountDownLatch(1);
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
+      CountDownSessionFailureListener listener = new CountDownSessionFailureListener();
 
-      class MyListener implements SessionFailureListener
-      {
-         public void connectionFailed(final HornetQException me, boolean failedOver)
-         {
-            latch.countDown();
-         }
-
-         public void beforeReconnect(HornetQException exception)
-         {
-            System.out.println("MyListener.beforeReconnect");
-         }
-      }
-
       ClientSession session = sendAndConsume(sf, true);
 
-      session.addFailureListener(new MyListener());
+      session.addFailureListener(listener);
 
       backupServer.stop();
 
@@ -2702,9 +1992,16 @@
       // To reload security or other settings that are read during startup
       beforeRestart(backupServer);
 
+      if (!backupServer.getServer().getConfiguration().isSharedStore())
+      {
+    	  // XXX
+         // this test would not make sense in the remote replication use case, without the following
+         backupServer.getServer().getConfiguration().setBackup(false);
+      }
+
       backupServer.start();
 
-      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      assertTrue("session failure listener", listener.getLatch().await(5, TimeUnit.SECONDS));
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
@@ -2718,7 +2015,7 @@
 
       sf.close();
 
-      sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+      createClientSessionFactory();
 
       session = createSession(sf);
 
@@ -2731,14 +2028,6 @@
       assertNotNull(cm);
 
       Assert.assertEquals("message0", cm.getBodyBuffer().readString());
-
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
    }
 
    // Package protected ---------------------------------------------
@@ -2757,25 +2046,6 @@
       return TransportConfigurationUtils.getInVMConnector(live);
    }
 
-   /**
-    * @param i
-    * @param message
-    */
-   protected void assertMessageBody(final int i, final ClientMessage message)
-   {
-      Assert.assertEquals("message" + i, message.getBodyBuffer().readString());
-   }
-
-   /**
-    * @param i
-    * @param message
-    * @throws Exception 
-    */
-   protected void setBody(final int i, final ClientMessage message) throws Exception
-   {
-      message.getBodyBuffer().writeString("message" + i);
-   }
-
    protected void beforeRestart(TestableServer liveServer)
    {
    }



More information about the hornetq-commits mailing list