Author: borges
Date: 2011-08-02 06:18:08 -0400 (Tue, 02 Aug 2011)
New Revision: 11094
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 FIX sending of replicated packets & document mechanism.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java 2011-08-02
03:27:37 UTC (rev 11093)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java 2011-08-02
10:18:08 UTC (rev 11094)
@@ -6,7 +6,11 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
- * Send all fileIDs used in the live server to the backup.
+ * Sends all fileIDs used in the live server to the backup. This is done so that we:
+ * <ol>
+ * <li>reserve those IDs in the backup;
+ * <li>start replicating while the journal synchronization is taking place.
+ * </ol>
*/
public class ReplicationFileIdMessage extends PacketImpl
{
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-02
03:27:37 UTC (rev 11093)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-02
10:18:08 UTC (rev 11094)
@@ -410,7 +410,6 @@
{
throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup
can not be up-to-date!");
}
-
final Journal journalIf = journals[packet.getJournalContentType().typeByte];
JournalImpl journal = assertJournalImpl(journalIf);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-02
03:27:37 UTC (rev 11093)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-02
10:18:08 UTC (rev 11094)
@@ -69,10 +69,6 @@
// Attributes ----------------------------------------------------
private final ResponseHandler responseHandler = new ResponseHandler();
-
-// private final ClientSessionFactoryInternal sessionFactory;
-// private CoreRemotingConnection replicatingConnection;
-
private final Channel replicatingChannel;
private boolean started;
@@ -424,6 +420,11 @@
}
}
+ /**
+ * @throws IllegalStateException By default, all replicated packets generate a
replicated
+ * response. If your packets are triggering this exception, it may be
because the
+ * packets were not sent with {@link #sendReplicatePacket(Packet)}.
+ */
private void replicated()
{
OperationContext ctx = pendingTokens.poll();
@@ -523,7 +524,7 @@
buffer.rewind();
// sending -1 or 0 bytes will close the file at the backup
- replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, buffer,
content, id));
+ sendReplicatePacket(new ReplicationJournalFileMessage(bytesRead, buffer,
content, id));
if (bytesRead == -1 || bytesRead == 0)
break;
}
@@ -532,13 +533,13 @@
@Override
public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws
HornetQException
{
- replicatingChannel.send(new ReplicationFileIdMessage(datafiles, contentType));
+ sendReplicatePacket(new ReplicationFileIdMessage(datafiles, contentType));
}
@Override
public void sendSynchronizationDone()
{
- replicatingChannel.send(new ReplicationJournalFileMessage(-1, null, null, -1));
+ sendReplicatePacket(new ReplicationJournalFileMessage(-1, null, null, -1));
}
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-02
03:27:37 UTC (rev 11093)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-02
10:18:08 UTC (rev 11094)
@@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -3276,6 +3277,7 @@
writeLock();
try
{
+ log.info("Reserving fileIDs before synchronization: " +
Arrays.toString(fileIds));
long maxID = -1;
for (long id : fileIds)
{
@@ -3321,7 +3323,7 @@
{
filesRepository.addDataFileOnTop(file);
}
- // XXX HORNETQ-720 still missing a "reload" call
+ // XXX HORNETQ-720 still missing a "reload" call?
}
finally
{
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-02
03:27:37 UTC (rev 11093)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-02
10:18:08 UTC (rev 11094)
@@ -58,6 +58,10 @@
}
backupServer.start();
waitForBackup(sessionFactory, 10);
+
+ // SEND more messages, now with the backup replicating
+ sendMessages(session, producer, N_MSGS);
+
Set<Long> liveIds = getFileIds(messageJournal);
assertFalse("should not be initialized",
backupServer.getServer().isInitialised());
crash(session);
@@ -65,7 +69,7 @@
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
Set<Long> backupIds = getFileIds(backupMsgJournal);
- assertEquals("sets must match! " + liveIds, liveIds, backupIds);
+ assertEquals("File IDs must match!", liveIds, backupIds);
}
private static void waitForServerInitialization(HornetQServer server, int seconds)