Author: borges
Date: 2011-08-10 07:43:38 -0400 (Wed, 10 Aug 2011)
New Revision: 11179
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-720 Add support for appendDelete & more tests
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-10
10:30:16 UTC (rev 11178)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-08-10
11:43:38 UTC (rev 11179)
@@ -13,6 +13,7 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
/**
@@ -111,9 +112,10 @@
}
@Override
- public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback)
throws Exception
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws
Exception
{
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
+ writeRecord(deleteRecord, sync, callback);
}
@Override
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-10
10:30:16 UTC (rev 11178)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-10
11:43:38 UTC (rev 11179)
@@ -24,7 +24,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.TransportConfigurationUtils;
@@ -49,6 +48,8 @@
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+ handler = new ReplicationChannelHandler();
+ liveServer.addInterceptor(new BackupSyncDelay(handler));
}
public void testNodeID() throws Exception
@@ -62,8 +63,6 @@
public void testReserveFileIdValuesOnBackup() throws Exception
{
- handler = new ReplicationChannelHandler();
- liveServer.addInterceptor(new BackupSyncDelay(handler));
createProducerSendSomeMessages();
JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
for (int i = 0; i < 5; i++)
@@ -71,28 +70,90 @@
messageJournal.forceMoveNextFile();
sendMessages(session, producer, N_MSGS);
}
+
backupServer.start();
+
waitForBackup(sessionFactory, 10, false);
// SEND more messages, now with the backup replicating
sendMessages(session, producer, N_MSGS);
+ Set<Long> liveIds = getFileIds(messageJournal);
+
+ finishSyncAndFailover();
+
+ JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+ Set<Long> backupIds = getFileIds(backupMsgJournal);
+ assertEquals("File IDs must match!", liveIds, backupIds);
+ }
+
+ public void testReplicationDuringSync() throws Exception
+ {
+ createProducerSendSomeMessages();
+ backupServer.start();
+ waitForBackup(sessionFactory, 10, false);
+
+ sendMessages(session, producer, N_MSGS);
+ session.commit();
+ receiveMsgs(0, N_MSGS);
+ finishSyncAndFailover();
+ }
+
+ private void finishSyncAndFailover() throws Exception
+ {
handler.deliver = true;
+ // must send one more message to have the "SYNC is DONE" msg delivered.
sendMessages(session, producer, 1);
-
waitForBackup(sessionFactory, 10, true);
-
- Set<Long> liveIds = getFileIds(messageJournal);
assertFalse("should not be initialized",
backupServer.getServer().isInitialised());
crash(session);
- waitForServerInitialization(backupServer.getServer(), 5);
+ waitForServerInitialization(backupServer, 5);
+ }
- JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
- Set<Long> backupIds = getFileIds(backupMsgJournal);
- assertEquals("File IDs must match!", liveIds, backupIds);
+ public void testMessageSyncSimple() throws Exception
+ {
+ createProducerSendSomeMessages();
+ startBackupCrashLive();
+ receiveMsgs(0, N_MSGS);
}
- private static void waitForServerInitialization(HornetQServer server, int seconds)
+ public void testMessageSync() throws Exception
{
+ createProducerSendSomeMessages();
+ receiveMsgs(0, N_MSGS / 2);
+ startBackupCrashLive();
+ receiveMsgs(N_MSGS / 2, N_MSGS);
+ }
+
+ private void startBackupCrashLive() throws Exception
+ {
+ assertFalse("backup is started?", backupServer.isStarted());
+ handler.setHold(false);
+ backupServer.start();
+ waitForBackup(sessionFactory, 5);
+ crash(session);
+ waitForServerInitialization(backupServer, 5);
+ }
+
+ private void createProducerSendSomeMessages() throws HornetQException, Exception
+ {
+ session = sessionFactory.createSession(true, true);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+ producer = session.createProducer(FailoverTestBase.ADDRESS);
+ sendMessages(session, producer, N_MSGS);
+ session.commit();
+ }
+
+ private void receiveMsgs(int start, int end) throws HornetQException
+ {
+ session.start();
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ receiveMessagesAndAck(consumer, start, end);
+ consumer.close();
+ session.commit();
+ }
+
+ private static void waitForServerInitialization(TestableServer server, int seconds)
+ {
long time = System.currentTimeMillis();
long toWait = seconds * 1000;
while (!server.isInitialised())
@@ -111,7 +172,6 @@
}
}
}
-
private Set<Long> getFileIds(JournalImpl journal)
{
Set<Long> results = new HashSet<Long>();
@@ -128,40 +188,6 @@
return (JournalImpl)sm.getMessageJournal();
}
- public void testMessageSync() throws Exception
- {
- createProducerSendSomeMessages();
-
- receiveMsgs(0, N_MSGS / 2);
- assertFalse("backup is not started!", backupServer.isStarted());
-
- // BLOCK ON journals
- backupServer.start();
-
- waitForBackup(sessionFactory, 5);
- crash(session);
-
- // consume N/2 from 'new' live (the old backup)
- receiveMsgs(N_MSGS / 2, N_MSGS);
- }
-
- private void createProducerSendSomeMessages() throws HornetQException, Exception
- {
- session = sessionFactory.createSession(true, true);
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
- producer = session.createProducer(FailoverTestBase.ADDRESS);
-
- sendMessages(session, producer, N_MSGS);
- session.start();
- }
-
- private void receiveMsgs(int start, int end) throws HornetQException
- {
- ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
- receiveMessagesAndAck(consumer, start, end);
- session.commit();
- }
-
@Override
protected void tearDown() throws Exception
{
@@ -233,6 +259,7 @@
private Packet onHold;
private Channel channel;
public volatile boolean deliver;
+ private boolean mustHold = true;
public void addSubHandler(ReplicationEndpoint handler)
{
@@ -244,6 +271,11 @@
this.channel = channel;
}
+ public void setHold(boolean hold)
+ {
+ mustHold = hold;
+ }
+
@Override
public void handlePacket(Packet packet)
{
@@ -264,7 +296,7 @@
}
}
- if (packet.getType() == PacketImpl.REPLICATION_SYNC)
+ if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
{
ReplicationJournalFileMessage syncMsg =
(ReplicationJournalFileMessage)packet;
if (syncMsg.isUpToDate())
Modified:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-08-10
10:30:16 UTC (rev 11178)
+++
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-08-10
11:43:38 UTC (rev 11179)
@@ -513,7 +513,7 @@
}
/**
- * Send messages with pre-specified body.
+ * Send durable messages with pre-specified body.
* @param session
* @param producer
* @param numMessages
@@ -531,13 +531,13 @@
}
- protected final
- void receiveMessagesAndAck(ClientConsumer consumer, int start, int msgCount)
throws HornetQException
+ protected final void receiveMessagesAndAck(ClientConsumer consumer, final int start,
int msgCount)
+ throws HornetQException
{
for (int i = start; i < msgCount; i++)
{
ClientMessage message = consumer.receive(1000);
- Assert.assertNotNull(message);
+ Assert.assertNotNull("Expecting a message " + i, message);
assertMessageBody(i, message);
Assert.assertEquals(i, message.getIntProperty("counter").intValue());
message.acknowledge();