[hornetq-commits] JBoss hornetq SVN: r11053 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 27 13:24:19 EDT 2011


Author: borges
Date: 2011-07-27 13:24:19 -0400 (Wed, 27 Jul 2011)
New Revision: 11053

Added:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Modified:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Test for replicated backup sync

Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-07-27 17:24:19 UTC (rev 11053)
@@ -0,0 +1,102 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.tests.util.TransportConfigurationUtils;
+
+public class BackupJournalSyncTest extends FailoverTestBase
+{
+
+   private ServerLocatorInternal locator;
+   private ClientSessionFactoryInternal sessionFactory;
+   private ClientSession session;
+   private static final int N_MSGS = 100;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      startBackupServer = false;
+      super.setUp();
+      locator = getServerLocator();
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setReconnectAttempts(-1);
+      sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+   }
+
+   public void testNodeID() throws Exception
+   {
+      backupServer.start();
+      waitForComponent(backupServer, 5);
+      assertTrue("must be running", backupServer.isStarted());
+      assertEquals("backup and live should have the same nodeID", liveServer.getServer().getNodeID(),
+                   backupServer.getServer().getNodeID());
+   }
+
+   public void testMessageSync() throws Exception
+   {
+      session = sessionFactory.createSession(true, true);
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      sendMessages(session, producer, N_MSGS);
+      session.start();
+
+      receiveMsgs(0, N_MSGS / 2);
+      assertFalse("backup is not started!", backupServer.isStarted());
+
+      // BLOCK ON journals
+      // SYNC, (UNSET reclaim, lock, use next file, get file list, sync) iterate
+      backupServer.start();
+
+      waitForBackup(sessionFactory, 5);
+      crash(session);
+
+
+      // consume N/2 from 'new' live (the old backup)
+      receiveMsgs(N_MSGS / 2, N_MSGS);
+   }
+
+   private void receiveMsgs(int start, int end) throws HornetQException
+   {
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      receiveMessagesAndAck(consumer, start, end);
+      session.commit();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (sessionFactory != null)
+         sessionFactory.close();
+      if (session != null)
+         session.close();
+      closeServerLocator(locator);
+
+      super.tearDown();
+   }
+
+   @Override
+   protected void createConfigs() throws Exception
+   {
+      createReplicatedConfigs();
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
+   {
+      return TransportConfigurationUtils.getInVMAcceptor(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
+   {
+      return TransportConfigurationUtils.getInVMConnector(live);
+   }
+
+}

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-07-27 17:23:37 UTC (rev 11052)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-07-27 17:24:19 UTC (rev 11053)
@@ -1120,6 +1120,7 @@
 
       session.start();
 
+      // Receive MSGs but don't ack!
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
          ClientMessage message = consumer.receive(1000);

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-07-27 17:23:37 UTC (rev 11052)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-07-27 17:24:19 UTC (rev 11053)
@@ -71,6 +71,8 @@
 
    protected NodeManager nodeManager;
 
+   protected boolean startBackupServer = true;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -102,7 +104,7 @@
 
       liveServer.start();
 
-      if (backupServer != null)
+      if (backupServer != null && startBackupServer)
       {
          backupServer.start();
       }
@@ -170,7 +172,6 @@
       backupServer = createBackupServer();
       backupServer.getServer().setIdentity("idBackup");
 
-
       liveConfig.getAcceptorConfigurations().clear();
       liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
 
@@ -215,8 +216,8 @@
       }
    }
 
-   protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
-         throws Exception
+   protected ClientSessionFactoryInternal createSessionFactoryAndWaitForTopology(ServerLocator locator,
+                                                                                 int topologyMembers) throws Exception
    {
       ClientSessionFactoryInternal sf;
       CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);



More information about the hornetq-commits mailing list