Author: borges
Date: 2011-09-01 12:15:10 -0400 (Thu, 01 Sep 2011)
New Revision: 11273
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Synchronization of Large Messages
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -27,6 +27,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -372,7 +373,7 @@
final boolean messageJournalAutoReclaim = localMessageJournal.getAutoReclaim();
final boolean bindingsJournalAutoReclaim = localBindingsJournal.getAutoReclaim();
-
+ Map<String, Long> largeMessageFilesToSync;
try
{
storageManagerLock.writeLock().lock();
@@ -386,6 +387,7 @@
{
messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
+ largeMessageFilesToSync = getLargeMessageInformation();
}
finally
{
@@ -399,8 +401,10 @@
{
storageManagerLock.writeLock().unlock();
}
+
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
+ sendLargeMessageFiles(largeMessageFilesToSync);
storageManagerLock.writeLock().lock();
try
@@ -420,7 +424,43 @@
}
}
+ private void sendLargeMessageFiles(Map<String, Long> largeMessageFilesToSync)
throws Exception
+ {
+ for (Entry<String, Long> entry : largeMessageFilesToSync.entrySet())
+ {
+ String fileName = entry.getKey();
+ long size = entry.getValue();
+ SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName,
1);
+ if (!seqFile.exists())
+ continue;
+ replicator.syncLargeMessageFile(seqFile, size,
getLargeMessageIdFromFilename(fileName));
+ }
+ }
+
+ private long getLargeMessageIdFromFilename(String filename)
+ {
+ return Long.parseLong(filename.split("\\.")[0]);
+ }
+
/**
+ * Assumes the
+ * @return
+ * @throws Exception
+ */
+ private Map<String, Long> getLargeMessageInformation() throws Exception
+ {
+ Map<String, Long> largeMessages = new HashMap<String, Long>();
+ List<String> filenames = largeMessagesFactory.listFiles("msg");
+ for (String filename : filenames)
+ {
+ SequentialFile seqFile = largeMessagesFactory.createSequentialFile(filename,
1);
+ long size = seqFile.size();
+ largeMessages.put(filename, size);
+ }
+ return largeMessages;
+ }
+
+ /**
* Send an entire journal file to a replicating server (a backup server that is).
* @param jf
* @param replicator2
@@ -431,7 +471,7 @@
{
for (JournalFile jf : journalFiles)
{
- replicator.sendJournalFile(jf, type);
+ replicator.syncJournalFile(jf, type);
jf.setCanReclaim(true);
}
}
@@ -563,30 +603,44 @@
public void addBytesToLargeMessage(final SequentialFile file, final long messageId,
final byte[] bytes) throws Exception
{
- file.position(file.size());
+ readLock();
+ try
+ {
+ file.position(file.size());
- file.writeDirect(ByteBuffer.wrap(bytes), false);
+ file.writeDirect(ByteBuffer.wrap(bytes), false);
- if (isReplicated())
+ if (isReplicated())
+ {
+ replicator.largeMessageWrite(messageId, bytes);
+ }
+ }
+ finally
{
- replicator.largeMessageWrite(messageId, bytes);
+ readUnLock();
}
}
public LargeServerMessage createLargeMessage(final long id, final MessageInternal
message)
{
- if (isReplicated())
+ readLock();
+ try
{
- replicator.largeMessageBegin(id);
- }
+ if (isReplicated())
+ {
+ replicator.largeMessageBegin(id);
+ }
- LargeServerMessageImpl largeMessage =
(LargeServerMessageImpl)createLargeMessage();
+ LargeServerMessageImpl largeMessage =
(LargeServerMessageImpl)createLargeMessage();
+ largeMessage.copyHeadersAndProperties(message);
+ largeMessage.setMessageID(id);
- largeMessage.copyHeadersAndProperties(message);
-
- largeMessage.setMessageID(id);
-
- return largeMessage;
+ return largeMessage;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
// Non transactional operations
@@ -604,6 +658,7 @@
{
// Note that we don't sync, the add reference that comes immediately after will
sync if appropriate
+ // XXX HORNETQ-720
if (message.isLargeMessage())
{
messageJournal.appendAddRecord(message.getMessageID(),
@@ -2049,16 +2104,9 @@
* @param messageID
* @return
*/
- SequentialFile createFileForLargeMessage(final long messageID, final boolean durable)
+ SequentialFile createFileForLargeMessage(final long messageID, String extension)
{
- if (durable)
- {
- return largeMessagesFactory.createSequentialFile(messageID + ".msg",
-1);
- }
- else
- {
- return largeMessagesFactory.createSequentialFile(messageID + ".tmp",
-1);
- }
+ return largeMessagesFactory.createSequentialFile(messageID + extension, -1);
}
// Private
----------------------------------------------------------------------------------
@@ -2379,14 +2427,11 @@
*/
private void cleanupIncompleteFiles() throws Exception
{
- if (largeMessagesFactory != null)
+ List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
+ for (String tmpFile : tmpFiles)
{
- List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
- for (String tmpFile : tmpFiles)
- {
- SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile,
-1);
- file.delete();
- }
+ SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
+ file.delete();
}
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -31,7 +31,7 @@
* A LargeServerMessageImpl
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
+ *
* Created 30-Sep-08 12:02:45 PM
*
*
@@ -49,9 +49,9 @@
private final JournalStorageManager storageManager;
private LargeServerMessage linkMessage;
-
+
private boolean paged;
-
+ private boolean replicationSync;
// We should only use the NIO implementation on the Journal
private SequentialFile file;
@@ -89,7 +89,7 @@
{
paged = true;
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.server.LargeServerMessage#addBytes(byte[])
*/
@@ -231,7 +231,7 @@
public boolean isFileExists() throws Exception
{
- SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(),
durable);
+ SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(),
getExtension());
return localfile.exists();
}
@@ -243,7 +243,7 @@
{
if (memoryEstimate == -1)
{
- // The body won't be on memory (aways on-file), so we don't consider
this for paging
+ // The body won't be on memory (always on-file), so we don't consider
this for paging
memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT +
getEncodeSize() +
(16 + 4) *
@@ -268,17 +268,18 @@
}
}
}
-
+
+ @Override
public void setOriginalHeaders(final ServerMessage other, final boolean expiry)
{
super.setOriginalHeaders(other, expiry);
-
+
LargeServerMessageImpl otherLM = (LargeServerMessageImpl)other;
this.paged = otherLM.paged;
if (this.paged)
{
- this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
+ this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
}
}
@@ -289,16 +290,16 @@
if (!paged)
{
incrementDelayDeletionCount();
-
+
long idToUse = messageID;
-
+
if (linkMessage != null)
{
idToUse = linkMessage.getMessageID();
}
-
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
-
+
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
getExtension());
+
ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ?
this
:
(LargeServerMessageImpl)linkMessage,
newfile,
@@ -310,19 +311,19 @@
try
{
validateFile();
-
+
SequentialFile file = this.file;
-
- SequentialFile newFile = storageManager.createFileForLargeMessage(newID,
durable);
-
+
+ SequentialFile newFile = storageManager.createFileForLargeMessage(newID,
getExtension());
+
file.copyTo(newFile);
-
+
LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile,
newID);
-
+
newMessage.linkMessage = null;
-
+
newMessage.setPaged();
-
+
return newMessage;
}
catch (Exception e)
@@ -333,8 +334,9 @@
}
}
- public SequentialFile getFile()
+ public SequentialFile getFile() throws HornetQException
{
+ validateFile();
return file;
}
@@ -369,10 +371,10 @@
throw new RuntimeException("MessageID not set on
LargeMessage");
}
- file = storageManager.createFileForLargeMessage(getMessageID(), durable);
+ file = storageManager.createFileForLargeMessage(getMessageID(),
getExtension());
file.open();
-
+
bodySize = file.size();
}
}
@@ -383,6 +385,13 @@
}
}
+ private String getExtension()
+ {
+ if (replicationSync)
+ return ".sync";
+ return durable ? ".msg" : ".tmp";
+ }
+
/* (non-Javadoc)
* @see
org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
*/
@@ -396,7 +405,7 @@
linkMessage = message;
- file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
+ file = storageManager.createFileForLargeMessage(message.getMessageID(),
getExtension());
try
{
file.open();
@@ -477,4 +486,10 @@
return bodySize;
}
}
+
+ @Override
+ public void setReplicationSync(boolean sync)
+ {
+ replicationSync = sync;
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -14,6 +14,7 @@
package org.hornetq.core.persistence.impl.nullpm;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -21,7 +22,7 @@
* A NullStorageLargeServerMessage
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
+ *
* Created 30-Sep-08 1:51:42 PM
*
*
@@ -164,7 +165,23 @@
{
}
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.server.LargeServerMessage#setReplicationSync(boolean)
+ */
+ @Override
+ public void setReplicationSync(boolean sync)
+ {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public SequentialFile getFile()
+ {
+ return null;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -104,15 +104,15 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
@@ -526,14 +526,14 @@
packet = new HaBackupRegistrationMessage();
break;
}
- case PacketImpl.REPLICATION_START_SYNC:
+ case PacketImpl.REPLICATION_START_STOP_SYNC:
{
packet = new ReplicationStartSyncMessage();
break;
}
- case PacketImpl.REPLICATION_SYNC:
+ case PacketImpl.REPLICATION_SYNC_FILE:
{
- packet = new ReplicationJournalFileMessage();
+ packet = new ReplicationSyncFileMessage();
break;
}
default:
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -178,7 +178,7 @@
public static final byte REPLICATION_COMPARE_DATA = 102;
- public static final byte REPLICATION_SYNC = 103;
+ public static final byte REPLICATION_SYNC_FILE = 103;
// HA
@@ -195,7 +195,7 @@
/** XXX HORNETQ-720 "HA" is not really used anywhere else. Better name? */
public static final byte HA_BACKUP_REGISTRATION = 113;
- public static final byte REPLICATION_START_SYNC = 120;
+ public static final byte REPLICATION_START_STOP_SYNC = 120;
// Static --------------------------------------------------------
Deleted:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -1,98 +0,0 @@
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import java.nio.ByteBuffer;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Message is used to:
- * <ol>
- * <li>copy JournalFile data over to the backup during synchronization;
- * <li>send a up-to-date signal to backup;
- * </ol>
- */
-public final class ReplicationJournalFileMessage extends PacketImpl
-{
-
- private ByteBuffer data;
- private int dataSize;
- private JournalContent journalType;
- /** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()}
*/
- private long fileId;
- private boolean backupIsUpToDate;
- private byte[] byteArray;
-
- public ReplicationJournalFileMessage()
- {
- super(REPLICATION_SYNC);
- }
-
- public ReplicationJournalFileMessage(int size, ByteBuffer buffer, JournalContent
content, long id)
- {
- this();
- this.fileId = id;
- this.backupIsUpToDate = id == -1;
- this.dataSize = size;
- this.data = buffer;
- this.journalType = content;
- }
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeLong(fileId);
- if (fileId == -1)
- return;
- buffer.writeByte(journalType.typeByte);
- buffer.writeInt(dataSize);
- // sending -1 will close the file
- if (dataSize > 0)
- {
- buffer.writeBytes(data);// (data, 0, dataSize);
- }
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- fileId = buffer.readLong();
- if (fileId == -1)
- {
- backupIsUpToDate = true;
- return;
- }
- journalType = JournalContent.getType(buffer.readByte());
- int size = buffer.readInt();
- if (size > 0)
- {
- byteArray = new byte[size];
- buffer.readBytes(byteArray);
- }
- }
-
- public long getFileId()
- {
- return fileId;
- }
-
- public byte[] getData()
- {
- return byteArray;
- }
-
- public JournalContent getJournalContent()
- {
- return journalType;
- }
-
- /**
- * @return {@code true} if the live has finished synchronizing its data and the backup
is
- * therefore up-to-date, {@code false} otherwise.
- */
- public boolean isUpToDate()
- {
- return backupIsUpToDate;
- }
-}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -6,25 +6,30 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
- * Sends all fileIDs used in the live server to the backup. This is done so that we:
- * <ol>
- * <li>reserve those IDs in the backup;
- * <li>start replicating while the journal synchronization is taking place.
- * </ol>
+ * This message may signal start or end of the replication synchronization.
+ * <p>
+ * At start, it sends all fileIDs used in a given journal live server to the backup, so
the backup
+ * can reserve those IDs.
*/
public class ReplicationStartSyncMessage extends PacketImpl
{
private long[] ids;
private JournalContent journalType;
+ private boolean synchronizationIsFinished;
public ReplicationStartSyncMessage()
{
- super(REPLICATION_START_SYNC);
+ super(REPLICATION_START_STOP_SYNC);
}
public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent
contentType)
{
this();
+ if (datafiles == null && contentType == null)
+ {
+ synchronizationIsFinished = true;
+ return;
+ }
ids = new long[datafiles.length];
for (int i = 0; i < datafiles.length; i++)
{
@@ -36,6 +41,9 @@
@Override
public void encodeRest(final HornetQBuffer buffer)
{
+ buffer.writeBoolean(synchronizationIsFinished);
+ if (synchronizationIsFinished)
+ return;
buffer.writeByte(journalType.typeByte);
buffer.writeInt(ids.length);
for (long id : ids)
@@ -47,6 +55,9 @@
@Override
public void decodeRest(final HornetQBuffer buffer)
{
+ synchronizationIsFinished = buffer.readBoolean();
+ if (synchronizationIsFinished)
+ return;
journalType = JournalContent.getType(buffer.readByte());
int length = buffer.readInt();
ids = new long[length];
@@ -56,6 +67,15 @@
}
}
+ /**
+ * @return {@code true} if the live has finished synchronizing its data and the backup
is
+ * therefore up-to-date, {@code false} otherwise.
+ */
+ public boolean isSynchronizationFinished()
+ {
+ return synchronizationIsFinished;
+ }
+
public JournalContent getJournalContentType()
{
return journalType;
Copied:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
(from rev 11272,
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java)
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -0,0 +1,108 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import java.nio.ByteBuffer;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Message is used to:
+ * <ol>
+ * <li>copy JournalFile data over to the backup during synchronization;
+ * <li>send a up-to-date signal to backup;
+ * </ol>
+ */
+public final class ReplicationSyncFileMessage extends PacketImpl
+{
+
+ /**
+ * The JournalType or {@code null} if sync'ing large-messages.
+ */
+ private JournalContent journalType;
+ /**
+ * This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()},
or the
+ * message id if we are sync'ing a large-message.
+ */
+ private long fileId;
+ private int dataSize;
+ private ByteBuffer byteBuffer;
+ private byte[] byteArray;
+
+ public ReplicationSyncFileMessage()
+ {
+ super(REPLICATION_SYNC_FILE);
+ }
+
+ public ReplicationSyncFileMessage(JournalContent content, long id, int size,
ByteBuffer buffer)
+ {
+ this();
+ this.byteBuffer = buffer;
+ this.dataSize = size;
+ this.fileId = id;
+ this.journalType = content;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(fileId);
+ if (fileId == -1)
+ return;
+ boolean isJournal = journalType != null;
+ buffer.writeBoolean(isJournal);
+ if (isJournal)
+ buffer.writeByte(journalType.typeByte);
+ buffer.writeInt(dataSize);
+ /*
+ * sending -1 will close the file in case of a journal, but not in case of a
largeMessage
+ * (which might receive appends)
+ */
+ if (dataSize > 0)
+ {
+ buffer.writeBytes(byteBuffer);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ fileId = buffer.readLong();
+ if (buffer.readBoolean())
+ {
+ journalType = JournalContent.getType(buffer.readByte());
+ }
+ int size = buffer.readInt();
+ if (size > 0)
+ {
+ byteArray = new byte[size];
+ buffer.readBytes(byteArray);
+ }
+ }
+
+ public long getId()
+ {
+ return fileId;
+ }
+
+ public JournalContent getJournalContent()
+ {
+ return journalType;
+ }
+
+ /**
+ * @return
+ */
+ public byte[] getData()
+ {
+ return byteArray;
+ }
+
+ /**
+ * @return
+ */
+ public boolean isLargeMessage()
+ {
+ return journalType == null;
+ }
+}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
@@ -91,7 +92,7 @@
* @throws HornetQException
* @throws Exception
*/
- void sendJournalFile(JournalFile jf, JournalContent type) throws Exception;
+ void syncJournalFile(JournalFile jf, JournalContent type) throws Exception;
/**
* Reserve the following fileIDs in the backup server.
@@ -108,4 +109,9 @@
*/
void sendSynchronizationDone();
+ /**
+ * @param seqFile
+ * @throws Exception
+ */
+ void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws
Exception;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -17,6 +17,7 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -48,15 +49,15 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
@@ -86,9 +87,13 @@
/** Files reserved in each journal for synchronization of existing data from the
'live' server. */
private final Map<JournalContent, Map<Long, JournalFile>>
filesReservedForSync =
new HashMap<JournalContent, Map<Long, JournalFile>>();
+ private Map<Long, LargeServerMessage> largeMessagesOnSync = new HashMap<Long,
LargeServerMessage>();
- /** Used to hold the real Journals before the backup is synchronized. */
- private final Map<JournalContent, Journal> journalsHolder = new
HashMap<JournalContent, Journal>();
+ /**
+ * Used to hold the real Journals before the backup is synchronized. This field should
be
+ * {@code null} on an up-to-date server.
+ */
+ private Map<JournalContent, Journal> journalsHolder = new
HashMap<JournalContent, Journal>();
private StorageManager storage;
@@ -192,13 +197,13 @@
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
}
- else if (type == PacketImpl.REPLICATION_START_SYNC)
+ else if (type == PacketImpl.REPLICATION_START_STOP_SYNC)
{
handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
}
- else if (type == PacketImpl.REPLICATION_SYNC)
+ else if (type == PacketImpl.REPLICATION_SYNC_FILE)
{
- handleReplicationSynchronization((ReplicationJournalFileMessage)packet);
+ handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
}
else
{
@@ -306,7 +311,7 @@
pageManager.stop();
- started = false;
+ started = false;
}
/* (non-Javadoc)
@@ -387,57 +392,109 @@
// Private -------------------------------------------------------
- private void handleReplicationSynchronization(ReplicationJournalFileMessage msg)
throws Exception
+ private void finishSynchronization() throws Exception
{
- if (msg.isUpToDate())
+ for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
- for (JournalContent jc : EnumSet.allOf(JournalContent.class))
+ JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
+ journal.writeLock();
+ try
{
- JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
- journal.writeLock();
- try
+ if (journal.getDataFiles().length != 0)
{
- if (journal.getDataFiles().length != 0)
+ throw new IllegalStateException("Journal should not have any data
files at this point");
+ }
+ // files should be already in place.
+ filesReservedForSync.remove(jc);
+ getJournal(jc.typeByte).stop();
+ registerJournal(jc.typeByte, journal);
+ journal.loadInternalOnly();
+ }
+ finally
+ {
+ journal.writeUnlock();
+ }
+ }
+ synchronized (largeMessagesOnSync)
+ {
+ synchronized (largeMessages)
+ {
+ ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+ for (Entry<Long, LargeServerMessage> entry : largeMessages.entrySet())
+ {
+ Long id = entry.getKey();
+ LargeServerMessage lm = entry.getValue();
+ if (largeMessagesOnSync.containsKey(id))
{
- throw new IllegalStateException("Journal should not have any data
files at this point");
+ SequentialFile sq = lm.getFile();
+ LargeServerMessage mainLM = largeMessagesOnSync.get(id);
+ SequentialFile mainSeqFile = mainLM.getFile();
+ System.out.println(mainSeqFile);
+ for (;;)
+ {
+ buffer.rewind();
+ int size = sq.read(buffer);
+ mainSeqFile.writeInternal(buffer);
+ if (size < buffer.capacity())
+ {
+ break;
+ }
+ }
}
- // files should be already in place.
- filesReservedForSync.remove(jc);
- getJournal(jc.typeByte).stop();
- registerJournal(jc.typeByte, journal);
- journal.loadInternalOnly();
- // XXX HORNETQ-720 must reload journals
- // XXX HORNETQ-720 must start using real journals
+ else
+ {
+ // these are large-messages created after sync started
+ largeMessagesOnSync.put(id, lm);
+ }
+ }
+ largeMessages.clear();
+ largeMessages.putAll(largeMessagesOnSync);
+ }
+ }
+ largeMessagesOnSync = null;
+ journalsHolder = null;
+ server.setRemoteBackupUpToDate();
+ log.info("Backup server " + server + " is synchronized with
live-server.");
+ return;
+ }
- }
- finally
+ private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws
Exception
+ {
+ Long id = Long.valueOf(msg.getId());
+ byte[] data = msg.getData();
+ SequentialFile sf;
+ if (msg.isLargeMessage())
+ {
+ synchronized (largeMessagesOnSync)
+ {
+ LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
+ if (largeMessage == null)
{
- journal.writeUnlock();
+ largeMessage = storage.createLargeMessage();
+ largeMessage.setDurable(true);
+ largeMessage.setMessageID(id);
+ largeMessagesOnSync.put(id, largeMessage);
}
-
+ sf = largeMessage.getFile();
}
- server.setRemoteBackupUpToDate();
- log.info("Backup server " + server + " is synchronized with
live-server.");
- return;
}
+ else
+ {
+ JournalFile journalFile =
filesReservedForSync.get(msg.getJournalContent()).get(id);
+ sf = journalFile.getFile();
- long id = msg.getFileId();
- JournalFile journalFile =
filesReservedForSync.get(msg.getJournalContent()).get(Long.valueOf(id));
-
- byte[] data = msg.getData();
+ }
if (data == null)
{
- journalFile.getFile().close();
+ sf.close();
+ return;
}
- else
+
+ if (!sf.isOpen())
{
- SequentialFile sf = journalFile.getFile();
- if (!sf.isOpen())
- {
- sf.open(1, false);
- }
- sf.writeDirect(ByteBuffer.wrap(data), true);
+ sf.open(1, false);
}
+ sf.writeDirect(ByteBuffer.wrap(data), true);
}
/**
@@ -452,6 +509,13 @@
{
throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup
can not be up-to-date!");
}
+
+ if (packet.isSynchronizationFinished())
+ {
+ finishSynchronization();
+ return;
+ }
+
final Journal journalIf = journalsHolder.get(packet.getJournalContentType());
JournalImpl journal = assertJournalImpl(journalIf);
@@ -520,6 +584,18 @@
else
{
message = largeMessages.get(messageId);
+ if (message == null)
+ {
+ synchronized (largeMessages)
+ {
+ if (!server.isRemoteBackupUpToDate())
+ {
+ // in case we need to append data to a file while still sync'ing
the backup
+ createLargeMessage(messageId, true);
+ message = largeMessages.get(messageId);
+ }
+ }
+ }
}
if (message == null)
@@ -537,13 +613,20 @@
*/
private void handleLargeMessageBegin(final ReplicationLargeMessageBeingMessage
packet)
{
- LargeServerMessage largeMessage = storage.createLargeMessage();
- largeMessage.setDurable(true);
- largeMessage.setMessageID(packet.getMessageId());
- log.trace("Receiving Large Message " + largeMessage.getMessageID() +
" on backup");
- largeMessages.put(largeMessage.getMessageID(), largeMessage);
+ final long id = packet.getMessageId();
+ createLargeMessage(id, false);
+ log.trace("Receiving Large Message " + id + " on backup");
}
+ private void createLargeMessage(final long id, boolean sync)
+ {
+ LargeServerMessage msg = storage.createLargeMessage();
+ msg.setDurable(true);
+ msg.setMessageID(id);
+ msg.setReplicationSync(sync);
+ largeMessages.put(id, msg);
+ }
+
/**
* @param packet
*/
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -44,7 +44,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -52,6 +51,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.utils.ExecutorFactory;
@@ -504,26 +504,58 @@
}
@Override
- public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
+ public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
{
SequentialFile file = jf.getFile().copy();
log.info("Replication: sending " + jf + " (size=" + file.size()
+ ") to backup. " + file);
+ sendLargeFile(content, jf.getFileID(), file, Long.MAX_VALUE);
+ }
+
+ @Override
+ public void syncLargeMessageFile(SequentialFile file, long size, long id) throws
Exception
+ {
+ sendLargeFile(null, id, file, size);
+ }
+
+ /**
+ * Sends large files in reasonably sized chunks to the backup during replication
synchronization.
+ * @param content journal type or {@code null} for large-messages
+ * @param id journal file id or (large) message id
+ * @param file
+ * @param maxBytesToSend maximum number of bytes to read and send from the file
+ * @throws Exception
+ */
+ private void sendLargeFile(JournalContent content, final long id, SequentialFile file,
long maxBytesToSend)
+ throws Exception
+ {
if (!file.isOpen())
{
file.open(1, false);
}
- final long id = jf.getFileID();
final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
while (true)
{
+ buffer.rewind();
int bytesRead = file.read(buffer);
+ int toSend = bytesRead;
if (bytesRead > 0)
- buffer.limit(bytesRead);
+ {
+ if (bytesRead >= maxBytesToSend)
+ {
+ toSend = (int)maxBytesToSend;
+ maxBytesToSend = 0;
+ }
+ else
+ {
+ maxBytesToSend = maxBytesToSend - bytesRead;
+ }
+ buffer.limit(toSend);
+ }
buffer.rewind();
// sending -1 or 0 bytes will close the file at the backup
- sendReplicatePacket(new ReplicationJournalFileMessage(bytesRead, buffer,
content, id));
- if (bytesRead == -1 || bytesRead == 0)
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, id, bytesRead,
buffer));
+ if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
}
}
@@ -537,7 +569,7 @@
@Override
public void sendSynchronizationDone()
{
- sendReplicatePacket(new ReplicationJournalFileMessage(-1, null, null, -1));
+ ReplicationStartSyncMessage msg = new ReplicationStartSyncMessage(null, null);
+ sendReplicatePacket(msg);
}
-
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -13,14 +13,13 @@
package org.hornetq.core.server;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.journal.SequentialFile;
+
/**
* A LargeMessage
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
- * Created 30-Sep-08 10:58:04 AM
- *
- *
*/
public interface LargeServerMessage extends ServerMessage
{
@@ -30,13 +29,13 @@
void setLinkedMessage(LargeServerMessage message);
boolean isFileExists() throws Exception;
-
+
/**
* We have to copy the large message content in case of DLQ and paged messages
* For that we need to pre-mark the LargeMessage with a flag when it is paged
*/
void setPaged();
-
+
/** Close the files if opened */
void releaseResources();
@@ -45,4 +44,17 @@
void incrementDelayDeletionCount();
void decrementDelayDeletionCount() throws Exception;
+
+ /**
+ * This method only has relevance in a backup server.
+ * @param sync {@code true} if this file is meant for appends of a message that needs
to be
+ * sync'ed with the live.
+ */
+ void setReplicationSync(boolean sync);
+
+ /**
+ * @return
+ * @throws HornetQException
+ */
+ SequentialFile getFile() throws HornetQException;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -17,9 +17,9 @@
import java.util.List;
/**
- *
+ *
* A SequentialFileFactory
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
@@ -28,6 +28,12 @@
{
SequentialFile createSequentialFile(String fileName, int maxIO);
+ /**
+ * @param extension extension to filter files with. Its value should not contain
'.', as the
+ * method appends one to it.
+ * @return
+ * @throws Exception
+ */
List<String> listFiles(String extension) throws Exception;
boolean isSupportsCallbacks();
@@ -59,7 +65,7 @@
void stop();
- /**
+ /**
* Create the directory if it doesn't exist yet
*/
void createDirs() throws Exception;
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -25,7 +25,7 @@
private ClientSession session;
private ClientProducer producer;
private BackupSyncDelay syncDelay;
- private static final int N_MSGS = 100;
+ private static final int N_MSGS = 10;
@Override
protected void setUp() throws Exception
@@ -115,7 +115,7 @@
assertFalse("backup is started?", backupServer.isStarted());
liveServer.removeInterceptor(syncDelay);
backupServer.start();
- waitForBackup(sessionFactory, 5);
+ waitForBackup(sessionFactory, 20);
crash(session);
waitForServerInitialization(backupServer, 5);
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-01
16:13:06 UTC (rev 11272)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-01
16:15:10 UTC (rev 11273)
@@ -13,8 +13,8 @@
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -27,7 +27,7 @@
* <p>
* We need to hijack the replication channel handler, because we need to
* <ol>
- * <li>send an early answer to the {@link PacketImpl#REPLICATION_SYNC} packet that
signals being
+ * <li>send an early answer to the {@link PacketImpl#REPLICATION_SYNC_FILE} packet
that signals being
* up-to-date
* <li>not send an answer to it, when we deliver the packet later.
* </ol>
@@ -135,10 +135,10 @@
deliver();
}
- if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
+ if (packet.getType() == PacketImpl.REPLICATION_START_STOP_SYNC &&
mustHold)
{
- ReplicationJournalFileMessage syncMsg =
(ReplicationJournalFileMessage)packet;
- if (syncMsg.isUpToDate() && !deliver)
+ ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
+ if (syncMsg.isSynchronizationFinished() && !deliver)
{
receivedUpToDate = true;
assert onHold == null;