[hornetq-commits] JBoss hornetq SVN: r10931 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 6 06:21:28 EDT 2011


Author: borges
Date: 2011-07-06 06:21:27 -0400 (Wed, 06 Jul 2011)
New Revision: 10931

Modified:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
clean up

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-07-06 10:20:45 UTC (rev 10930)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-07-06 10:21:27 UTC (rev 10931)
@@ -57,6 +57,7 @@
 public class FailoverTest extends FailoverTestBase
 {
    private static final Logger log = Logger.getLogger(FailoverTest.class);
+   private static final int NUM_MESSAGES = 100;
 
    private ServerLocator locator;
    private ClientSessionFactoryInternal sf;
@@ -145,9 +146,7 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
-
-      for (int i = 0; i < numMessages; i++)
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = session.createMessage(true);
 
@@ -187,7 +186,7 @@
       crash(session);
 
       int retry = 0;
-      while (received.size() >= numMessages)
+      while (received.size() >= NUM_MESSAGES)
       {
          Thread.sleep(1000);
          retry++;
@@ -199,19 +198,13 @@
       System.out.println("received.size() = " + received.size());
       session.close();
 
-      sf.close();
-
       Assert.assertTrue(retry <= 5);
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testNonTransacted() throws Exception
    {
 
-
       createSessionFactory();
 
       ClientSession session = createSession(sf, true, true);
@@ -220,50 +213,19 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       crash(session);
 
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
       session.start();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         // Only the persistent messages will survive
+      receiveDurableMessages(consumer);
 
-         if (i % 2 == 0)
-         {
-            ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
-            assertMessageBody(i, message);
-
-            Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-            message.acknowledge();
-         }
-      }
-
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    private void createSessionFactory() throws Exception
@@ -348,11 +310,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    // https://jira.jboss.org/jira/browse/HORNETQ-285
@@ -374,9 +332,9 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
 
-      for (int i = 0; i < numMessages; i++)
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = session.createMessage(true);
 
@@ -391,26 +349,11 @@
 
       session.start();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testTransactedMessagesSentSoRollback() throws Exception
@@ -423,19 +366,8 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       crash(session);
 
       Assert.assertTrue(session.isRollbackOnly());
@@ -461,11 +393,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    /**
@@ -482,19 +410,8 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       crash(session);
 
       Assert.assertTrue(session.isRollbackOnly());
@@ -532,11 +449,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testTransactedMessagesNotSentSoNoRollback() throws Exception
@@ -549,19 +462,8 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session.commit();
 
       crash(session);
@@ -576,35 +478,15 @@
 
       session.start();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         // Only the persistent messages will survive
+      receiveDurableMessages(consumer);
 
-         if (i % 2 == 0)
-         {
-            ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
-            assertMessageBody(i, message);
-
-            Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-            message.acknowledge();
-         }
-      }
-
       Assert.assertNull(consumer.receiveImmediate());
 
       session.commit();
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testTransactedMessagesWithConsumerStartedBeforeFailover() throws Exception
@@ -622,19 +504,8 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       // messages will be delivered to the consumer when the session is committed
       session.commit();
 
@@ -652,35 +523,15 @@
 
       session.start();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         // Only the persistent messages will survive
+      receiveDurableMessages(consumer);
 
-         if (i % 2 == 0)
-         {
-            ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
-            assertMessageBody(i, message);
-
-            Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-            message.acknowledge();
-         }
-      }
-
       Assert.assertNull(consumer.receiveImmediate());
 
       session.commit();
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testTransactedMessagesConsumedSoRollback() throws Exception
@@ -693,19 +544,8 @@
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session1, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session1.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session1.commit();
 
       ClientSession session2 = createSession(sf, false, false);
@@ -714,19 +554,8 @@
 
       session2.start();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       crash(session2);
 
       Assert.assertTrue(session2.isRollbackOnly());
@@ -746,11 +575,7 @@
 
       session2.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testTransactedMessagesNotConsumedSoNoRollback() throws Exception
@@ -763,9 +588,9 @@
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
 
-      for (int i = 0; i < numMessages; i++)
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = session1.createMessage(true);
 
@@ -784,7 +609,7 @@
 
       session2.start();
 
-      for (int i = 0; i < numMessages / 2; i++)
+      for (int i = 0; i < NUM_MESSAGES / 2; i++)
       {
          ClientMessage message = consumer.receive(1000);
 
@@ -807,7 +632,7 @@
 
       consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
 
-      for (int i = numMessages / 2; i < numMessages; i++)
+      for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = consumer.receive(1000);
 
@@ -828,11 +653,7 @@
 
       session2.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testXAMessagesSentSoRollbackOnEnd() throws Exception
@@ -847,21 +668,12 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
 
+
       session.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+      sendMessages(session, producer);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       crash(session);
 
       try
@@ -885,11 +697,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testXAMessagesSentSoRollbackOnPrepare() throws Exception
@@ -904,21 +712,12 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
 
+
       session.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+      sendMessages(session, producer);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session.end(xid, XAResource.TMSUCCESS);
 
       crash(session);
@@ -944,11 +743,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    // This might happen if 1PC optimisation kicks in
@@ -964,21 +759,12 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
 
+
       session.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+      sendMessages(session, producer);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session.end(xid, XAResource.TMSUCCESS);
 
       crash(session);
@@ -1004,11 +790,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testXAMessagesNotSentSoNoRollbackOnCommit() throws Exception
@@ -1023,21 +805,12 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
 
+
       session.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+      sendMessages(session, producer);
 
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session.end(xid, XAResource.TMSUCCESS);
 
       session.prepare(xid);
@@ -1054,24 +827,8 @@
 
       session.start(xid2, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         // Only the persistent messages will survive
+      receiveDurableMessages(consumer);
 
-         if (i % 2 == 0)
-         {
-            ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
-            assertMessageBody(i, message);
-
-            Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-            message.acknowledge();
-         }
-      }
-
       session.end(xid2, XAResource.TMSUCCESS);
 
       session.prepare(xid2);
@@ -1080,11 +837,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testXAMessagesConsumedSoRollbackOnEnd() throws Exception
@@ -1097,19 +850,8 @@
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session1, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session1.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session1.commit();
 
       ClientSession session2 = createSession(sf, true, false, false);
@@ -1122,19 +864,8 @@
 
       session2.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       crash(session2);
 
       try
@@ -1152,11 +883,7 @@
 
       session2.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testXAMessagesConsumedSoRollbackOnPrepare() throws Exception
@@ -1169,19 +896,8 @@
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session1, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session1.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session1.commit();
 
       ClientSession session2 = createSession(sf, true, false, false);
@@ -1194,19 +910,8 @@
 
       session2.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       session2.end(xid, XAResource.TMSUCCESS);
 
       crash(session2);
@@ -1226,11 +931,7 @@
 
       session2.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    // 1PC optimisation
@@ -1243,19 +944,8 @@
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session1, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session1.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       session1.commit();
 
       ClientSession session2 = createSession(sf, true, false, false);
@@ -1268,19 +958,8 @@
 
       session2.start(xid, XAResource.TMNOFLAGS);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       session2.end(xid, XAResource.TMSUCCESS);
 
      // session2.prepare(xid);
@@ -1303,11 +982,7 @@
 
       session2.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testCreateNewFactoryAfterFailover() throws Exception
@@ -1331,11 +1006,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testFailoverMultipleSessionsWithConsumers() throws Exception
@@ -1372,9 +1043,9 @@
 
       ClientProducer producer = sendSession.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
 
-      for (int i = 0; i < numMessages; i++)
+
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = sendSession.createMessage(true);
 
@@ -1399,18 +1070,7 @@
       {
          for (ClientConsumer consumer : consumerList)
          {
-            for (int i = 0; i < numMessages; i++)
-            {
-               ClientMessage message = consumer.receive(1000);
-
-               Assert.assertNotNull(message);
-
-               assertMessageBody(i, message);
-
-               Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-               message.acknowledge();
-            }
+            receiveMessages(consumer);
          }
       }
 
@@ -1421,11 +1081,7 @@
 
       sendSession.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    /*
@@ -1440,24 +1096,13 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
 
       session.start();
 
-      for (int i = 0; i < numMessages; i++)
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = consumer.receive(1000);
 
@@ -1470,31 +1115,22 @@
 
       crash(session);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         // Only the persistent messages will survive
+      receiveDurableMessages(consumer);
 
-         if (i % 2 == 0)
-         {
-            ClientMessage message = consumer.receive(1000);
-
-            Assert.assertNotNull(message);
-
-            assertMessageBody(i, message);
-
-            Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-            message.acknowledge();
-         }
-      }
-
       session.close();
 
-      sf.close();
+      closeSessionFactory();
+   }
 
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+   private void sendMessages(ClientSession session, ClientProducer producer) throws Exception, HornetQException
+   {
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         ClientMessage message = session.createMessage(isDurable(i));
+         setBody(i, message);
+         message.putIntProperty("counter", i);
+         producer.send(message);
+      }
    }
 
    public void testFailThenReceiveMoreMessagesAfterFailover() throws Exception
@@ -1507,24 +1143,13 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
       session.start();
 
-      for (int i = 0; i < numMessages; i++)
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = consumer.receive(1000);
 
@@ -1539,31 +1164,33 @@
 
       // Should get the same ones after failover since we didn't ack
 
-      for (int i = 0; i < numMessages; i++)
+      receiveDurableMessages(consumer);
+
+      session.close();
+
+      closeSessionFactory();
+   }
+
+   private void receiveDurableMessages(ClientConsumer consumer) throws HornetQException
+   {
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          // Only the persistent messages will survive
 
-         if (i % 2 == 0)
+         if (isDurable(i))
          {
             ClientMessage message = consumer.receive(1000);
-
             Assert.assertNotNull(message);
-
             assertMessageBody(i, message);
-
             Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
             message.acknowledge();
          }
       }
+   }
 
-      session.close();
-
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+   private boolean isDurable(int i)
+   {
+      return i % 2 == 0;
    }
 
    public void testFailThenReceiveMoreMessagesAfterFailover2() throws Exception
@@ -1580,43 +1207,21 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
+      sendMessages(session, producer);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
-
-         setBody(i, message);
-
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
       session.start();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       crash(session);
 
       // Send some more
 
-      for (int i = numMessages; i < numMessages * 2; i++)
+      for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
       {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+         ClientMessage message = session.createMessage(isDurable(i));
 
          setBody(i, message);
 
@@ -1627,7 +1232,7 @@
 
       // Should get the same ones after failover since we didn't ack
 
-      for (int i = numMessages; i < numMessages * 2; i++)
+      for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
       {
          ClientMessage message = consumer.receive(1000);
 
@@ -1642,11 +1247,19 @@
 
       session.close();
 
-      sf.close();
+      closeSessionFactory();
+   }
 
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+   private void receiveMessages(ClientConsumer consumer) throws HornetQException
+   {
+      for (int i = 0; i < NUM_MESSAGES; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+         Assert.assertNotNull(message);
+         assertMessageBody(i, message);
+         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+         message.acknowledge();
+      }
    }
 
    public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
@@ -1690,45 +1303,19 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      final int numMessages = 100;
-
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
       session.start();
 
       crash(session);
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = session.createMessage(i % 2 == 0);
+      sendMessages(session, producer);
 
-         setBody(i, message);
+      receiveMessages(consumer);
 
-         message.putIntProperty("counter", i);
-
-         producer.send(message);
-      }
-
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void _testForceBlockingReturn() throws Exception
@@ -1787,11 +1374,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testCommitOccurredUnblockedAndResendNoDuplicates() throws Exception
@@ -1807,13 +1390,13 @@
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
-      final int numMessages = 100;
 
+
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
       String txID = "my-tx-id";
 
-      for (int i = 0; i < numMessages; i++)
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = session.createMessage(true);
 
@@ -1871,8 +1454,8 @@
 
       Committer committer = new Committer();
 
-      // Commit will occur, but response will never get back, connetion is failed, and commit should be unblocked
-      // with transaction rolled back
+      // Commit will occur, but response will never get back, connection is failed, and commit
+      // should be unblocked with transaction rolled back
 
       committer.start();
 
@@ -1896,7 +1479,7 @@
       // We now try and resend the messages since we get a transaction rolled back exception
       // but the commit actually succeeded, duplicate detection should kick in and prevent dups
 
-      for (int i = 0; i < numMessages; i++)
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = session2.createMessage(true);
 
@@ -1926,29 +1509,21 @@
 
       session2.start();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       ClientMessage message = consumer.receiveImmediate();
 
       Assert.assertNull(message);
 
       session2.close();
 
-      sf.close();
+      closeSessionFactory();
+   }
 
+   private void closeSessionFactory()
+   {
+      sf.close();
       Assert.assertEquals(0, sf.numSessions());
-
       Assert.assertEquals(0, sf.numConnections());
    }
 
@@ -1964,11 +1539,9 @@
 
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
 
-      final int numMessages = 100;
-
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      for (int i = 0; i < numMessages; i++)
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = session.createMessage(true);
 
@@ -2036,7 +1609,7 @@
 
       // We now try and resend the messages since we get a transaction rolled back exception
 
-      for (int i = 0; i < numMessages; i++)
+      for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = session2.createMessage(true);
 
@@ -2053,30 +1626,15 @@
 
       session2.start();
 
-      for (int i = 0; i < numMessages; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
+      receiveMessages(consumer);
 
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       ClientMessage message = consumer.receiveImmediate();
 
       Assert.assertNull(message);
 
       session2.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testBackupServerNotRemoved() throws Exception
@@ -2126,11 +1684,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testLiveAndBackupLiveComesBack() throws Exception
@@ -2182,11 +1736,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testLiveAndBackupLiveComesBackNewFactory() throws Exception
@@ -2254,11 +1804,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    public void testLiveAndBackupBackupComesBackNewFactory() throws Exception
@@ -2326,11 +1872,7 @@
 
       session.close();
 
-      sf.close();
-
-      Assert.assertEquals(0, sf.numSessions());
-
-      Assert.assertEquals(0, sf.numConnections());
+      closeSessionFactory();
    }
 
    // Package protected ---------------------------------------------



More information about the hornetq-commits mailing list