Author: borges
Date: 2011-09-02 07:52:16 -0400 (Fri, 02 Sep 2011)
New Revision: 11291
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Add tests for large-message delete, and improve other tests
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-02
00:30:14 UTC (rev 11290)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-02
11:52:16 UTC (rev 11291)
@@ -443,7 +443,10 @@
}
/**
- * Assumes the
+ * Collects a list of existing large messages and their current size.
+ * <p>
+ * So we know how much of a given message to sync with the backup. Further data
appends to the
+ * messages will be replicated normally.
* @return
* @throws Exception
*/
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-02
00:30:14 UTC (rev 11290)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-02
11:52:16 UTC (rev 11291)
@@ -6,6 +6,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
@@ -20,12 +21,13 @@
public class BackupSyncJournalTest extends FailoverTestBase
{
+ private static final int BACKUP_WAIT_TIME = 20;
private ServerLocatorInternal locator;
private ClientSessionFactoryInternal sessionFactory;
private ClientSession session;
private ClientProducer producer;
private BackupSyncDelay syncDelay;
- private static final int N_MSGS = 10;
+ protected static final int N_MSGS = 10;
@Override
protected void setUp() throws Exception
@@ -51,9 +53,10 @@
public void testReserveFileIdValuesOnBackup() throws Exception
{
+ final int totalRounds = 5;
createProducerSendSomeMessages();
JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
- for (int i = 0; i < 5; i++)
+ for (int i = 0; i < totalRounds; i++)
{
messageJournal.forceMoveNextFile();
sendMessages(session, producer, N_MSGS);
@@ -61,7 +64,7 @@
backupServer.start();
- waitForBackup(sessionFactory, 10, false);
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
// SEND more messages, now with the backup replicating
sendMessages(session, producer, N_MSGS);
@@ -72,24 +75,53 @@
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
Set<Long> backupIds = getFileIds(backupMsgJournal);
assertEquals("File IDs must match!", liveIds, backupIds);
+
+ // "+ 2": there two other calls that send N_MSGS.
+ for (int i = 0; i < totalRounds + 2; i++)
+ {
+ receiveMsgsInRange(0, N_MSGS);
+ }
+ assertNoMoreMessages();
}
+ private void assertNoMoreMessages() throws HornetQException
+ {
+ session.start();
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ ClientMessage msg = consumer.receive(200);
+ assertNull("there should be no more messages to receive! " + msg, msg);
+ consumer.close();
+ session.commit();
+
+ }
+
+ protected void startBackupFinishSyncing() throws Exception
+ {
+ backupServer.start();
+ syncDelay.deliverUpToDateMsg();
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME, true);
+ }
+
public void testReplicationDuringSync() throws Exception
{
createProducerSendSomeMessages();
backupServer.start();
- waitForBackup(sessionFactory, 10, false);
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
sendMessages(session, producer, N_MSGS);
session.commit();
- receiveMsgs(0, N_MSGS);
+ receiveMsgsInRange(0, N_MSGS);
+
finishSyncAndFailover();
+
+ receiveMsgsInRange(0, N_MSGS);
+ assertNoMoreMessages();
}
private void finishSyncAndFailover() throws Exception
{
syncDelay.deliverUpToDateMsg();
- waitForBackup(sessionFactory, 10, true);
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME, true);
assertFalse("should not be initialized",
backupServer.getServer().isInitialised());
crash(session);
waitForServerInitialization(backupServer, 5);
@@ -99,15 +131,17 @@
{
createProducerSendSomeMessages();
startBackupCrashLive();
- receiveMsgs(0, N_MSGS);
+ receiveMsgsInRange(0, N_MSGS);
+ assertNoMoreMessages();
}
public void testMessageSync() throws Exception
{
createProducerSendSomeMessages();
- receiveMsgs(0, N_MSGS / 2);
+ receiveMsgsInRange(0, N_MSGS / 2);
startBackupCrashLive();
- receiveMsgs(N_MSGS / 2, N_MSGS);
+ receiveMsgsInRange(N_MSGS / 2, N_MSGS);
+ assertNoMoreMessages();
}
private void startBackupCrashLive() throws Exception
@@ -115,12 +149,12 @@
assertFalse("backup is started?", backupServer.isStarted());
liveServer.removeInterceptor(syncDelay);
backupServer.start();
- waitForBackup(sessionFactory, 20);
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME);
crash(session);
waitForServerInitialization(backupServer, 5);
}
- private void createProducerSendSomeMessages() throws HornetQException, Exception
+ protected void createProducerSendSomeMessages() throws HornetQException, Exception
{
session = sessionFactory.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
@@ -129,7 +163,7 @@
session.commit();
}
- private void receiveMsgs(int start, int end) throws HornetQException
+ protected void receiveMsgsInRange(int start, int end) throws HornetQException
{
session.start();
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-02
00:30:14 UTC (rev 11290)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-02
11:52:16 UTC (rev 11291)
@@ -1,5 +1,9 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -7,10 +11,6 @@
public class BackupSyncLargeMessageTest extends BackupSyncJournalTest
{
- /**
- * @param i
- * @param message
- */
@Override
protected void assertMessageBody(final int i, final ClientMessage message)
{
@@ -25,13 +25,36 @@
return (ServerLocatorInternal)locator;
}
- /**
- * @param i
- * @param message
- */
@Override
protected void setBody(final int i, final ClientMessage message) throws Exception
{
setLargeMessageBody(i, message);
}
+
+ // ------------------------
+
+ public void testDeleteLargeMessages() throws Exception
+ {
+ createProducerSendSomeMessages();
+ startBackupFinishSyncing();
+ File dir = new
File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
+ System.out.println("Dir " + dir.getAbsolutePath() + " " +
dir.exists());
+ // Set<Long> idsOnBkp = getAllMessageFileIds(dir);
+ receiveMsgsInRange(0, N_MSGS / 2);
+ assertEquals("we really ought to delete these after delivery", N_MSGS /
2, getAllMessageFileIds(dir).size());
+ }
+
+ private Set<Long> getAllMessageFileIds(File dir)
+ {
+ Set<Long> idsOnBkp = new HashSet<Long>();
+ for (String filename : dir.list())
+ {
+ if (filename.endsWith(".msg"))
+ {
+ idsOnBkp.add(Long.valueOf(filename.split("\\.")[0]));
+ }
+ }
+ return idsOnBkp;
+ }
+
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-02
00:30:14 UTC (rev 11290)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-02
11:52:16 UTC (rev 11291)
@@ -43,6 +43,7 @@
{
if (backup.isStarted())
handler.deliver();
+ live.removeInterceptor(this);
}
public BackupSyncDelay(TestableServer backup, TestableServer live)