Author: borges
Date: 2011-07-27 13:24:19 -0400 (Wed, 27 Jul 2011)
New Revision: 11053
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Test for replicated backup sync
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
(rev 0)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-07-27
17:24:19 UTC (rev 11053)
@@ -0,0 +1,102 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.tests.util.TransportConfigurationUtils;
+
+public class BackupJournalSyncTest extends FailoverTestBase
+{
+
+ private ServerLocatorInternal locator;
+ private ClientSessionFactoryInternal sessionFactory;
+ private ClientSession session;
+ private static final int N_MSGS = 100;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ startBackupServer = false;
+ super.setUp();
+ locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+ sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+ }
+
+ public void testNodeID() throws Exception
+ {
+ backupServer.start();
+ waitForComponent(backupServer, 5);
+ assertTrue("must be running", backupServer.isStarted());
+ assertEquals("backup and live should have the same nodeID",
liveServer.getServer().getNodeID(),
+ backupServer.getServer().getNodeID());
+ }
+
+ public void testMessageSync() throws Exception
+ {
+ session = sessionFactory.createSession(true, true);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ sendMessages(session, producer, N_MSGS);
+ session.start();
+
+ receiveMsgs(0, N_MSGS / 2);
+ assertFalse("backup is not started!", backupServer.isStarted());
+
+ // BLOCK ON journals
+ // SYNC, (UNSET reclaim, lock, use next file, get file list, sync) iterate
+ backupServer.start();
+
+ waitForBackup(sessionFactory, 5);
+ crash(session);
+
+
+ // consume N/2 from 'new' live (the old backup)
+ receiveMsgs(N_MSGS / 2, N_MSGS);
+ }
+
+ private void receiveMsgs(int start, int end) throws HornetQException
+ {
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ receiveMessagesAndAck(consumer, start, end);
+ session.commit();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (sessionFactory != null)
+ sessionFactory.close();
+ if (session != null)
+ session.close();
+ closeServerLocator(locator);
+
+ super.tearDown();
+ }
+
+ @Override
+ protected void createConfigs() throws Exception
+ {
+ createReplicatedConfigs();
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMAcceptor(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMConnector(live);
+ }
+
+}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-27
17:23:37 UTC (rev 11052)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-27
17:24:19 UTC (rev 11053)
@@ -1120,6 +1120,7 @@
session.start();
+ // Receive MSGs but don't ack!
for (int i = 0; i < NUM_MESSAGES; i++)
{
ClientMessage message = consumer.receive(1000);
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-27
17:23:37 UTC (rev 11052)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-27
17:24:19 UTC (rev 11053)
@@ -71,6 +71,8 @@
protected NodeManager nodeManager;
+ protected boolean startBackupServer = true;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -102,7 +104,7 @@
liveServer.start();
- if (backupServer != null)
+ if (backupServer != null && startBackupServer)
{
backupServer.start();
}
@@ -170,7 +172,6 @@
backupServer = createBackupServer();
backupServer.getServer().setIdentity("idBackup");
-
liveConfig.getAcceptorConfigurations().clear();
liveConfig.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
@@ -215,8 +216,8 @@
}
}
- protected ClientSessionFactoryInternal
createSessionFactoryAndWaitForTopology(ServerLocator locator, int topologyMembers)
- throws Exception
+ protected ClientSessionFactoryInternal
createSessionFactoryAndWaitForTopology(ServerLocator locator,
+ int
topologyMembers) throws Exception
{
ClientSessionFactoryInternal sf;
CountDownLatch countDownLatch = new CountDownLatch(topologyMembers);