Author: borges
Date: 2011-09-01 12:12:50 -0400 (Thu, 01 Sep 2011)
New Revision: 11271
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
Removed:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
Log:
HORNETQ-720 tests for LargeMessage sync'ing
Deleted:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-09-01
13:36:39 UTC (rev 11270)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-09-01
16:12:50 UTC (rev 11271)
@@ -1,208 +0,0 @@
-package org.hornetq.tests.integration.cluster.failover;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
-import org.hornetq.tests.util.TransportConfigurationUtils;
-
-public class BackupJournalSyncTest extends FailoverTestBase
-{
-
- private ServerLocatorInternal locator;
- private ClientSessionFactoryInternal sessionFactory;
- private ClientSession session;
- private ClientProducer producer;
- private BackupSyncDelay syncDelay;
- private static final int N_MSGS = 100;
-
- @Override
- protected void setUp() throws Exception
- {
- startBackupServer = false;
- super.setUp();
- locator = getServerLocator();
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setReconnectAttempts(-1);
- sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
- syncDelay = new BackupSyncDelay(backupServer, liveServer);
- }
-
- public void testNodeID() throws Exception
- {
- backupServer.start();
- waitForComponent(backupServer, 5);
- assertTrue("must be running", backupServer.isStarted());
- assertEquals("backup and live should have the same nodeID",
liveServer.getServer().getNodeID(),
- backupServer.getServer().getNodeID());
- }
-
- public void testReserveFileIdValuesOnBackup() throws Exception
- {
- createProducerSendSomeMessages();
- JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
- for (int i = 0; i < 5; i++)
- {
- messageJournal.forceMoveNextFile();
- sendMessages(session, producer, N_MSGS);
- }
-
- backupServer.start();
-
- waitForBackup(sessionFactory, 10, false);
-
- // SEND more messages, now with the backup replicating
- sendMessages(session, producer, N_MSGS);
- Set<Long> liveIds = getFileIds(messageJournal);
-
- finishSyncAndFailover();
-
- JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
- Set<Long> backupIds = getFileIds(backupMsgJournal);
- assertEquals("File IDs must match!", liveIds, backupIds);
- }
-
- public void testReplicationDuringSync() throws Exception
- {
- createProducerSendSomeMessages();
- backupServer.start();
- waitForBackup(sessionFactory, 10, false);
-
- sendMessages(session, producer, N_MSGS);
- session.commit();
- receiveMsgs(0, N_MSGS);
- finishSyncAndFailover();
- }
-
- private void finishSyncAndFailover() throws Exception
- {
- syncDelay.deliverUpToDateMsg();
- waitForBackup(sessionFactory, 10, true);
- assertFalse("should not be initialized",
backupServer.getServer().isInitialised());
- crash(session);
- waitForServerInitialization(backupServer, 5);
- }
-
- public void testMessageSyncSimple() throws Exception
- {
- createProducerSendSomeMessages();
- startBackupCrashLive();
- receiveMsgs(0, N_MSGS);
- }
-
- public void testMessageSync() throws Exception
- {
- createProducerSendSomeMessages();
- receiveMsgs(0, N_MSGS / 2);
- startBackupCrashLive();
- receiveMsgs(N_MSGS / 2, N_MSGS);
- }
-
- private void startBackupCrashLive() throws Exception
- {
- assertFalse("backup is started?", backupServer.isStarted());
- liveServer.removeInterceptor(syncDelay);
- backupServer.start();
- waitForBackup(sessionFactory, 5);
- crash(session);
- waitForServerInitialization(backupServer, 5);
- }
-
- private void createProducerSendSomeMessages() throws HornetQException, Exception
- {
- session = sessionFactory.createSession(true, true);
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
- producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer, N_MSGS);
- session.commit();
- }
-
- private void receiveMsgs(int start, int end) throws HornetQException
- {
- session.start();
- ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
- receiveMessagesAndAck(consumer, start, end);
- consumer.close();
- session.commit();
- }
-
- private static void waitForServerInitialization(TestableServer server, int seconds)
- {
- long time = System.currentTimeMillis();
- long toWait = seconds * 1000;
- while (!server.isInitialised())
- {
- try
- {
- Thread.sleep(50);
- }
- catch (InterruptedException e)
- {
- // ignore
- }
- if (System.currentTimeMillis() > (time + toWait))
- {
- fail("component did not start within timeout of " + seconds);
- }
- }
- }
- private Set<Long> getFileIds(JournalImpl journal)
- {
- Set<Long> results = new HashSet<Long>();
- for (JournalFile jf : journal.getDataFiles())
- {
- results.add(Long.valueOf(jf.getFileID()));
- }
- return results;
- }
-
- static JournalImpl getMessageJournalFromServer(TestableServer server)
- {
- JournalStorageManager sm =
(JournalStorageManager)server.getServer().getStorageManager();
- return (JournalImpl)sm.getMessageJournal();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- if (sessionFactory != null)
- sessionFactory.close();
- if (session != null)
- session.close();
- closeServerLocator(locator);
-
- super.tearDown();
- }
-
- @Override
- protected void createConfigs() throws Exception
- {
- createReplicatedConfigs();
- }
-
- @Override
- protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
- {
- return TransportConfigurationUtils.getInVMAcceptor(live);
- }
-
- @Override
- protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
- {
- return TransportConfigurationUtils.getInVMConnector(live);
- }
-
-
-}
Copied:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
(from rev 11243,
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java)
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
(rev 0)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-01
16:12:50 UTC (rev 11271)
@@ -0,0 +1,209 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.TransportConfigurationUtils;
+
+public class BackupSyncJournalTest extends FailoverTestBase
+{
+
+ private ServerLocatorInternal locator;
+ private ClientSessionFactoryInternal sessionFactory;
+ private ClientSession session;
+ private ClientProducer producer;
+ private BackupSyncDelay syncDelay;
+ private static final int N_MSGS = 100;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ startBackupServer = false;
+ super.setUp();
+ locator = getServerLocator();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+ sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+ syncDelay = new BackupSyncDelay(backupServer, liveServer);
+ }
+
+ public void testNodeID() throws Exception
+ {
+ backupServer.start();
+ waitForComponent(backupServer, 5);
+ assertTrue("must be running", backupServer.isStarted());
+ assertEquals("backup and live should have the same nodeID",
liveServer.getServer().getNodeID(),
+ backupServer.getServer().getNodeID());
+ }
+
+ public void testReserveFileIdValuesOnBackup() throws Exception
+ {
+ createProducerSendSomeMessages();
+ JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
+ for (int i = 0; i < 5; i++)
+ {
+ messageJournal.forceMoveNextFile();
+ sendMessages(session, producer, N_MSGS);
+ }
+
+ backupServer.start();
+
+ waitForBackup(sessionFactory, 10, false);
+
+ // SEND more messages, now with the backup replicating
+ sendMessages(session, producer, N_MSGS);
+ Set<Long> liveIds = getFileIds(messageJournal);
+
+ finishSyncAndFailover();
+
+ JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+ Set<Long> backupIds = getFileIds(backupMsgJournal);
+ assertEquals("File IDs must match!", liveIds, backupIds);
+ }
+
+ public void testReplicationDuringSync() throws Exception
+ {
+ createProducerSendSomeMessages();
+ backupServer.start();
+ waitForBackup(sessionFactory, 10, false);
+
+ sendMessages(session, producer, N_MSGS);
+ session.commit();
+ receiveMsgs(0, N_MSGS);
+ finishSyncAndFailover();
+ }
+
+ private void finishSyncAndFailover() throws Exception
+ {
+ syncDelay.deliverUpToDateMsg();
+ waitForBackup(sessionFactory, 10, true);
+ assertFalse("should not be initialized",
backupServer.getServer().isInitialised());
+ crash(session);
+ waitForServerInitialization(backupServer, 5);
+ }
+
+ public void testMessageSyncSimple() throws Exception
+ {
+ createProducerSendSomeMessages();
+ startBackupCrashLive();
+ receiveMsgs(0, N_MSGS);
+ }
+
+ public void testMessageSync() throws Exception
+ {
+ createProducerSendSomeMessages();
+ receiveMsgs(0, N_MSGS / 2);
+ startBackupCrashLive();
+ receiveMsgs(N_MSGS / 2, N_MSGS);
+ }
+
+ private void startBackupCrashLive() throws Exception
+ {
+ assertFalse("backup is started?", backupServer.isStarted());
+ liveServer.removeInterceptor(syncDelay);
+ backupServer.start();
+ waitForBackup(sessionFactory, 5);
+ crash(session);
+ waitForServerInitialization(backupServer, 5);
+ }
+
+ private void createProducerSendSomeMessages() throws HornetQException, Exception
+ {
+ session = sessionFactory.createSession(true, true);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+ producer = session.createProducer(FailoverTestBase.ADDRESS);
+ sendMessages(session, producer, N_MSGS);
+ session.commit();
+ }
+
+ private void receiveMsgs(int start, int end) throws HornetQException
+ {
+ session.start();
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ receiveMessagesAndAck(consumer, start, end);
+ consumer.close();
+ session.commit();
+ }
+
+ private static void waitForServerInitialization(TestableServer server, int seconds)
+ {
+ long time = System.currentTimeMillis();
+ long toWait = seconds * 1000;
+ while (!server.isInitialised())
+ {
+ try
+ {
+ Thread.sleep(50);
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ if (System.currentTimeMillis() > (time + toWait))
+ {
+ fail("component did not start within timeout of " + seconds);
+ }
+ }
+ }
+
+ private Set<Long> getFileIds(JournalImpl journal)
+ {
+ Set<Long> results = new HashSet<Long>();
+ for (JournalFile jf : journal.getDataFiles())
+ {
+ results.add(Long.valueOf(jf.getFileID()));
+ }
+ return results;
+ }
+
+ static JournalImpl getMessageJournalFromServer(TestableServer server)
+ {
+ JournalStorageManager sm =
(JournalStorageManager)server.getServer().getStorageManager();
+ return (JournalImpl)sm.getMessageJournal();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (sessionFactory != null)
+ sessionFactory.close();
+ if (session != null)
+ session.close();
+ closeServerLocator(locator);
+
+ super.tearDown();
+ }
+
+ @Override
+ protected void createConfigs() throws Exception
+ {
+ createReplicatedConfigs();
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMAcceptor(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMConnector(live);
+ }
+
+
+}
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
(rev 0)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-01
16:12:50 UTC (rev 11271)
@@ -0,0 +1,37 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+
+public class BackupSyncLargeMessageTest extends BackupSyncJournalTest
+{
+
+ /**
+ * @param i
+ * @param message
+ */
+ @Override
+ protected void assertMessageBody(final int i, final ClientMessage message)
+ {
+ assertLargeMessageBody(i, message);
+ }
+
+ @Override
+ protected ServerLocatorInternal getServerLocator() throws Exception
+ {
+ ServerLocator locator = super.getServerLocator();
+ locator.setMinLargeMessageSize(MIN_LARGE_MESSAGE);
+ return (ServerLocatorInternal)locator;
+ }
+
+ /**
+ * @param i
+ * @param message
+ */
+ @Override
+ protected void setBody(final int i, final ClientMessage message) throws Exception
+ {
+ setLargeMessageBody(i, message);
+ }
+}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-01
13:36:39 UTC (rev 11270)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-01
16:12:50 UTC (rev 11271)
@@ -24,10 +24,12 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
@@ -47,6 +49,7 @@
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.ReplicatedBackupUtils;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
/**
* A FailoverTestBase
@@ -59,6 +62,11 @@
protected static final SimpleString ADDRESS = new
SimpleString("FailoverTestAddress");
+ /*
+ * Used only by tests of large messages.
+ */
+ protected static final int MIN_LARGE_MESSAGE = 1024;
+ private static final int LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE * 3;
// Attributes ----------------------------------------------------
@@ -121,6 +129,40 @@
return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig,
nodeManager));
}
+ /**
+ * Large message version of {@link #setBody(int, ClientMessage)}.
+ * @param i
+ * @param message
+ * @param size
+ */
+ protected static void setLargeMessageBody(final int i, final ClientMessage message)
+ {
+ try
+ {
+
message.setBodyInputStream(UnitTestCase.createFakeLargeStream(LARGE_MESSAGE_SIZE));
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Large message version of {@link #assertMessageBody(int, ClientMessage)}.
+ * @param i
+ * @param message
+ */
+ protected static void assertLargeMessageBody(final int i, final ClientMessage
message)
+ {
+ HornetQBuffer buffer = message.getBodyBuffer();
+
+ for (int j = 0; j < LARGE_MESSAGE_SIZE; j++)
+ {
+ Assert.assertTrue("expecting " + LARGE_MESSAGE_SIZE + " bytes,
got " + j, buffer.readable());
+ Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j),
buffer.readByte());
+ }
+ }
+
protected void createConfigs() throws Exception
{
nodeManager = new InVMNodeManager();
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2011-09-01
13:36:39 UTC (rev 11270)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2011-09-01
16:12:50 UTC (rev 11271)
@@ -13,14 +13,10 @@
package org.hornetq.tests.integration.cluster.failover;
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.logging.Logger;
-import org.hornetq.tests.util.UnitTestCase;
/**
* A LargeMessageFailoverTest
@@ -36,11 +32,6 @@
private static final Logger log = Logger.getLogger(LargeMessageFailoverTest.class);
-
- private static final int MIN_LARGE_MESSAGE = 1024;
-
- private static final int LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE * 3;
-
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -53,17 +44,11 @@
// Protected -----------------------------------------------------
- /**
- * @param name
- */
public LargeMessageFailoverTest(final String name)
{
super(name);
}
- /**
- *
- */
public LargeMessageFailoverTest()
{
super();
@@ -90,13 +75,7 @@
@Override
protected void assertMessageBody(final int i, final ClientMessage message)
{
- HornetQBuffer buffer = message.getBodyBuffer();
-
- for (int j = 0; j < LARGE_MESSAGE_SIZE; j++)
- {
- Assert.assertTrue("expecting more bytes", buffer.readable());
- Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j),
buffer.readByte());
- }
+ assertLargeMessageBody(i, message);
}
@@ -104,12 +83,10 @@
protected ServerLocatorInternal getServerLocator() throws Exception
{
ServerLocator locator = super.getServerLocator();
- locator.setMinLargeMessageSize(LARGE_MESSAGE_SIZE);
- return (ServerLocatorInternal) locator;
+ locator.setMinLargeMessageSize(MIN_LARGE_MESSAGE);
+ return (ServerLocatorInternal)locator;
}
-
-
/**
* @param i
* @param message
@@ -117,11 +94,6 @@
@Override
protected void setBody(final int i, final ClientMessage message) throws Exception
{
-
message.setBodyInputStream(UnitTestCase.createFakeLargeStream(LARGE_MESSAGE_SIZE));
+ setLargeMessageBody(i, message);
}
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}