[hornetq-commits] JBoss hornetq SVN: r11273 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 1 12:15:11 EDT 2011


Author: borges
Date: 2011-09-01 12:15:10 -0400 (Thu, 01 Sep 2011)
New Revision: 11273

Added:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
Removed:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Synchronization of Large Messages

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -27,6 +27,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -372,7 +373,7 @@
 
       final boolean messageJournalAutoReclaim = localMessageJournal.getAutoReclaim();
       final boolean bindingsJournalAutoReclaim = localBindingsJournal.getAutoReclaim();
-
+      Map<String, Long> largeMessageFilesToSync;
       try
       {
          storageManagerLock.writeLock().lock();
@@ -386,6 +387,7 @@
             {
                messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
                bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+               largeMessageFilesToSync = getLargeMessageInformation();
             }
             finally
             {
@@ -399,8 +401,10 @@
          {
             storageManagerLock.writeLock().unlock();
          }
+
          sendJournalFile(messageFiles, JournalContent.MESSAGES);
          sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
+         sendLargeMessageFiles(largeMessageFilesToSync);
 
          storageManagerLock.writeLock().lock();
          try
@@ -420,7 +424,43 @@
       }
    }
 
+   private void sendLargeMessageFiles(Map<String, Long> largeMessageFilesToSync) throws Exception
+   {
+      for (Entry<String, Long> entry : largeMessageFilesToSync.entrySet())
+      {
+         String fileName = entry.getKey();
+         long size = entry.getValue();
+         SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName, 1);
+         if (!seqFile.exists())
+            continue;
+         replicator.syncLargeMessageFile(seqFile, size, getLargeMessageIdFromFilename(fileName));
+      }
+   }
+
+   private long getLargeMessageIdFromFilename(String filename)
+   {
+      return Long.parseLong(filename.split("\\.")[0]);
+   }
+
    /**
+    * Assumes the
+    * @return
+    * @throws Exception
+    */
+   private Map<String, Long> getLargeMessageInformation() throws Exception
+   {
+      Map<String, Long> largeMessages = new HashMap<String, Long>();
+      List<String> filenames = largeMessagesFactory.listFiles("msg");
+      for (String filename : filenames)
+      {
+         SequentialFile seqFile = largeMessagesFactory.createSequentialFile(filename, 1);
+         long size = seqFile.size();
+         largeMessages.put(filename, size);
+      }
+      return largeMessages;
+   }
+
+   /**
     * Send an entire journal file to a replicating server (a backup server that is).
     * @param jf
     * @param replicator2
@@ -431,7 +471,7 @@
    {
       for (JournalFile jf : journalFiles)
       {
-         replicator.sendJournalFile(jf, type);
+         replicator.syncJournalFile(jf, type);
          jf.setCanReclaim(true);
       }
    }
@@ -563,30 +603,44 @@
 
    public void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes) throws Exception
    {
-      file.position(file.size());
+      readLock();
+      try
+      {
+         file.position(file.size());
 
-      file.writeDirect(ByteBuffer.wrap(bytes), false);
+         file.writeDirect(ByteBuffer.wrap(bytes), false);
 
-      if (isReplicated())
+         if (isReplicated())
+         {
+            replicator.largeMessageWrite(messageId, bytes);
+         }
+      }
+      finally
       {
-         replicator.largeMessageWrite(messageId, bytes);
+         readUnLock();
       }
    }
 
    public LargeServerMessage createLargeMessage(final long id, final MessageInternal message)
    {
-      if (isReplicated())
+      readLock();
+      try
       {
-         replicator.largeMessageBegin(id);
-      }
+         if (isReplicated())
+         {
+            replicator.largeMessageBegin(id);
+         }
 
-      LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
+         LargeServerMessageImpl largeMessage = (LargeServerMessageImpl)createLargeMessage();
+         largeMessage.copyHeadersAndProperties(message);
+         largeMessage.setMessageID(id);
 
-      largeMessage.copyHeadersAndProperties(message);
-
-      largeMessage.setMessageID(id);
-
-      return largeMessage;
+         return largeMessage;
+      }
+      finally
+      {
+         readUnLock();
+      }
    }
 
    // Non transactional operations
@@ -604,6 +658,7 @@
       {
       // Note that we don't sync, the add reference that comes immediately after will sync if appropriate
 
+         // XXX HORNETQ-720
       if (message.isLargeMessage())
       {
          messageJournal.appendAddRecord(message.getMessageID(),
@@ -2049,16 +2104,9 @@
     * @param messageID
     * @return
     */
-   SequentialFile createFileForLargeMessage(final long messageID, final boolean durable)
+   SequentialFile createFileForLargeMessage(final long messageID, String extension)
    {
-      if (durable)
-      {
-         return largeMessagesFactory.createSequentialFile(messageID + ".msg", -1);
-      }
-      else
-      {
-         return largeMessagesFactory.createSequentialFile(messageID + ".tmp", -1);
-      }
+      return largeMessagesFactory.createSequentialFile(messageID + extension, -1);
    }
 
    // Private ----------------------------------------------------------------------------------
@@ -2379,14 +2427,11 @@
     */
    private void cleanupIncompleteFiles() throws Exception
    {
-      if (largeMessagesFactory != null)
+      List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
+      for (String tmpFile : tmpFiles)
       {
-         List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
-         for (String tmpFile : tmpFiles)
-         {
-            SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
-            file.delete();
-         }
+         SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
+         file.delete();
       }
    }
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -31,7 +31,7 @@
  * A LargeServerMessageImpl
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * 
+ *
  * Created 30-Sep-08 12:02:45 PM
  *
  *
@@ -49,9 +49,9 @@
    private final JournalStorageManager storageManager;
 
    private LargeServerMessage linkMessage;
-   
+
    private boolean paged;
-
+   private boolean replicationSync;
    // We should only use the NIO implementation on the Journal
    private SequentialFile file;
 
@@ -89,7 +89,7 @@
    {
       paged = true;
    }
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.server.LargeServerMessage#addBytes(byte[])
     */
@@ -231,7 +231,7 @@
 
    public boolean isFileExists() throws Exception
    {
-      SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(), durable);
+      SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(), getExtension());
       return localfile.exists();
    }
 
@@ -243,7 +243,7 @@
    {
       if (memoryEstimate == -1)
       {
-         // The body won't be on memory (aways on-file), so we don't consider this for paging
+         // The body won't be on memory (always on-file), so we don't consider this for paging
          memoryEstimate = getHeadersAndPropertiesEncodeSize() + DataConstants.SIZE_INT +
                           getEncodeSize() +
                           (16 + 4) *
@@ -268,17 +268,18 @@
          }
       }
    }
-   
 
+
+   @Override
    public void setOriginalHeaders(final ServerMessage other, final boolean expiry)
    {
       super.setOriginalHeaders(other, expiry);
-      
+
       LargeServerMessageImpl otherLM = (LargeServerMessageImpl)other;
       this.paged = otherLM.paged;
       if (this.paged)
       {
-         this.removeProperty(Message.HDR_ORIG_MESSAGE_ID); 
+         this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
       }
    }
 
@@ -289,16 +290,16 @@
       if (!paged)
       {
          incrementDelayDeletionCount();
-   
+
          long idToUse = messageID;
-   
+
          if (linkMessage != null)
          {
             idToUse = linkMessage.getMessageID();
          }
-   
-         SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
-   
+
+         SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, getExtension());
+
          ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
                                                                                   : (LargeServerMessageImpl)linkMessage,
                                                                newfile,
@@ -310,19 +311,19 @@
          try
          {
             validateFile();
-            
+
             SequentialFile file = this.file;
-            
-            SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
-            
+
+            SequentialFile newFile = storageManager.createFileForLargeMessage(newID, getExtension());
+
             file.copyTo(newFile);
-            
+
             LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, newFile, newID);
-            
+
             newMessage.linkMessage = null;
-            
+
             newMessage.setPaged();
-            
+
             return newMessage;
          }
          catch (Exception e)
@@ -333,8 +334,9 @@
       }
    }
 
-   public SequentialFile getFile()
+   public SequentialFile getFile() throws HornetQException
    {
+      validateFile();
       return file;
    }
 
@@ -369,10 +371,10 @@
                throw new RuntimeException("MessageID not set on LargeMessage");
             }
 
-            file = storageManager.createFileForLargeMessage(getMessageID(), durable);
+            file = storageManager.createFileForLargeMessage(getMessageID(), getExtension());
 
             file.open();
-            
+
             bodySize = file.size();
          }
       }
@@ -383,6 +385,13 @@
       }
    }
 
+   private String getExtension()
+   {
+      if (replicationSync)
+         return ".sync";
+      return durable ? ".msg" : ".tmp";
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
     */
@@ -396,7 +405,7 @@
 
       linkMessage = message;
 
-      file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
+      file = storageManager.createFileForLargeMessage(message.getMessageID(), getExtension());
       try
       {
          file.open();
@@ -477,4 +486,10 @@
          return bodySize;
       }
    }
+
+   @Override
+   public void setReplicationSync(boolean sync)
+   {
+      replicationSync = sync;
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -14,6 +14,7 @@
 package org.hornetq.core.persistence.impl.nullpm;
 
 import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 
@@ -21,7 +22,7 @@
  * A NullStorageLargeServerMessage
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * 
+ *
  * Created 30-Sep-08 1:51:42 PM
  *
  *
@@ -164,7 +165,23 @@
    {
    }
 
+   /*
+    * (non-Javadoc)
+    * @see org.hornetq.core.server.LargeServerMessage#setReplicationSync(boolean)
+    */
+   @Override
+   public void setReplicationSync(boolean sync)
+   {
+      // TODO Auto-generated method stub
 
+   }
+
+   @Override
+   public SequentialFile getFile()
+   {
+      return null;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -104,15 +104,15 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
@@ -526,14 +526,14 @@
             packet = new HaBackupRegistrationMessage();
             break;
          }
-         case PacketImpl.REPLICATION_START_SYNC:
+         case PacketImpl.REPLICATION_START_STOP_SYNC:
          {
             packet = new ReplicationStartSyncMessage();
             break;
          }
-         case PacketImpl.REPLICATION_SYNC:
+         case PacketImpl.REPLICATION_SYNC_FILE:
          {
-            packet = new ReplicationJournalFileMessage();
+            packet = new ReplicationSyncFileMessage();
             break;
          }
          default:

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -178,7 +178,7 @@
 
    public static final byte REPLICATION_COMPARE_DATA = 102;
 
-   public static final byte REPLICATION_SYNC = 103;
+   public static final byte REPLICATION_SYNC_FILE = 103;
 
    // HA
 
@@ -195,7 +195,7 @@
    /** XXX HORNETQ-720 "HA" is not really used anywhere else. Better name? */
    public static final byte HA_BACKUP_REGISTRATION = 113;
 
-   public static final byte REPLICATION_START_SYNC = 120;
+   public static final byte REPLICATION_START_STOP_SYNC = 120;
 
    // Static --------------------------------------------------------
 

Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -1,98 +0,0 @@
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import java.nio.ByteBuffer;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Message is used to:
- * <ol>
- * <li>copy JournalFile data over to the backup during synchronization;
- * <li>send a up-to-date signal to backup;
- * </ol>
- */
-public final class ReplicationJournalFileMessage extends PacketImpl
-{
-
-   private ByteBuffer data;
-   private int dataSize;
-   private JournalContent journalType;
-   /** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()} */
-   private long fileId;
-   private boolean backupIsUpToDate;
-   private byte[] byteArray;
-
-   public ReplicationJournalFileMessage()
-   {
-      super(REPLICATION_SYNC);
-   }
-
-   public ReplicationJournalFileMessage(int size, ByteBuffer buffer, JournalContent content, long id)
-   {
-      this();
-      this.fileId = id;
-      this.backupIsUpToDate = id == -1;
-      this.dataSize = size;
-      this.data = buffer;
-      this.journalType = content;
-   }
-
-   @Override
-   public void encodeRest(final HornetQBuffer buffer)
-   {
-      buffer.writeLong(fileId);
-      if (fileId == -1)
-         return;
-      buffer.writeByte(journalType.typeByte);
-      buffer.writeInt(dataSize);
-      // sending -1 will close the file
-      if (dataSize > 0)
-      {
-         buffer.writeBytes(data);// (data, 0, dataSize);
-      }
-   }
-
-   @Override
-   public void decodeRest(final HornetQBuffer buffer)
-   {
-      fileId = buffer.readLong();
-      if (fileId == -1)
-      {
-         backupIsUpToDate = true;
-         return;
-      }
-      journalType = JournalContent.getType(buffer.readByte());
-      int size = buffer.readInt();
-      if (size > 0)
-      {
-         byteArray = new byte[size];
-         buffer.readBytes(byteArray);
-      }
-   }
-
-   public long getFileId()
-   {
-      return fileId;
-   }
-
-   public byte[] getData()
-   {
-      return byteArray;
-   }
-
-   public JournalContent getJournalContent()
-   {
-      return journalType;
-   }
-
-   /**
-    * @return {@code true} if the live has finished synchronizing its data and the backup is
-    *         therefore up-to-date, {@code false} otherwise.
-    */
-   public boolean isUpToDate()
-   {
-      return backupIsUpToDate;
-   }
-}

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -6,25 +6,30 @@
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 
 /**
- * Sends all fileIDs used in the live server to the backup. This is done so that we:
- * <ol>
- * <li>reserve those IDs in the backup;
- * <li>start replicating while the journal synchronization is taking place.
- * </ol>
+ * This message may signal start or end of the replication synchronization.
+ * <p>
+ * At start, it sends all fileIDs used in a given journal live server to the backup, so the backup
+ * can reserve those IDs.
  */
 public class ReplicationStartSyncMessage extends PacketImpl
 {
    private long[] ids;
    private JournalContent journalType;
+   private boolean synchronizationIsFinished;
 
    public ReplicationStartSyncMessage()
    {
-      super(REPLICATION_START_SYNC);
+      super(REPLICATION_START_STOP_SYNC);
    }
 
    public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
    {
       this();
+      if (datafiles == null && contentType == null)
+      {
+         synchronizationIsFinished = true;
+         return;
+      }
       ids = new long[datafiles.length];
       for (int i = 0; i < datafiles.length; i++)
       {
@@ -36,6 +41,9 @@
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
+      buffer.writeBoolean(synchronizationIsFinished);
+      if (synchronizationIsFinished)
+         return;
       buffer.writeByte(journalType.typeByte);
       buffer.writeInt(ids.length);
       for (long id : ids)
@@ -47,6 +55,9 @@
    @Override
    public void decodeRest(final HornetQBuffer buffer)
    {
+      synchronizationIsFinished = buffer.readBoolean();
+      if (synchronizationIsFinished)
+         return;
       journalType = JournalContent.getType(buffer.readByte());
       int length = buffer.readInt();
       ids = new long[length];
@@ -56,6 +67,15 @@
       }
    }
 
+   /**
+    * @return {@code true} if the live has finished synchronizing its data and the backup is
+    *         therefore up-to-date, {@code false} otherwise.
+    */
+   public boolean isSynchronizationFinished()
+   {
+      return synchronizationIsFinished;
+   }
+
    public JournalContent getJournalContentType()
    {
       return journalType;

Copied: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java (from rev 11272, branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java)
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -0,0 +1,108 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import java.nio.ByteBuffer;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Message is used to:
+ * <ol>
+ * <li>copy JournalFile data over to the backup during synchronization;
+ * <li>send a up-to-date signal to backup;
+ * </ol>
+ */
+public final class ReplicationSyncFileMessage extends PacketImpl
+{
+
+   /**
+    * The JournalType or {@code null} if sync'ing large-messages.
+    */
+   private JournalContent journalType;
+   /**
+    * This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()}, or the
+    * message id if we are sync'ing a large-message.
+    */
+   private long fileId;
+   private int dataSize;
+   private ByteBuffer byteBuffer;
+   private byte[] byteArray;
+
+   public ReplicationSyncFileMessage()
+   {
+      super(REPLICATION_SYNC_FILE);
+   }
+
+   public ReplicationSyncFileMessage(JournalContent content, long id, int size, ByteBuffer buffer)
+   {
+      this();
+      this.byteBuffer = buffer;
+      this.dataSize = size;
+      this.fileId = id;
+      this.journalType = content;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeLong(fileId);
+      if (fileId == -1)
+         return;
+      boolean isJournal = journalType != null;
+      buffer.writeBoolean(isJournal);
+      if (isJournal)
+         buffer.writeByte(journalType.typeByte);
+      buffer.writeInt(dataSize);
+      /*
+       * sending -1 will close the file in case of a journal, but not in case of a largeMessage
+       * (which might receive appends)
+       */
+      if (dataSize > 0)
+      {
+         buffer.writeBytes(byteBuffer);
+      }
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      fileId = buffer.readLong();
+      if (buffer.readBoolean())
+      {
+         journalType = JournalContent.getType(buffer.readByte());
+      }
+      int size = buffer.readInt();
+      if (size > 0)
+      {
+         byteArray = new byte[size];
+         buffer.readBytes(byteArray);
+      }
+   }
+
+   public long getId()
+   {
+      return fileId;
+   }
+
+   public JournalContent getJournalContent()
+   {
+      return journalType;
+   }
+
+   /**
+    * @return
+    */
+   public byte[] getData()
+   {
+      return byteArray;
+   }
+
+   /**
+    * @return
+    */
+   public boolean isLargeMessage()
+   {
+      return journalType == null;
+   }
+}

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -19,6 +19,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.persistence.OperationContext;
@@ -91,7 +92,7 @@
     * @throws HornetQException
     * @throws Exception
     */
-   void sendJournalFile(JournalFile jf, JournalContent type) throws Exception;
+   void syncJournalFile(JournalFile jf, JournalContent type) throws Exception;
 
    /**
     * Reserve the following fileIDs in the backup server.
@@ -108,4 +109,9 @@
     */
    void sendSynchronizationDone();
 
+   /**
+    * @param seqFile
+    * @throws Exception
+    */
+   void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws Exception;
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -17,6 +17,7 @@
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -48,15 +49,15 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.ServerMessage;
@@ -86,9 +87,13 @@
    /** Files reserved in each journal for synchronization of existing data from the 'live' server. */
    private final Map<JournalContent, Map<Long, JournalFile>> filesReservedForSync =
             new HashMap<JournalContent, Map<Long, JournalFile>>();
+   private Map<Long, LargeServerMessage> largeMessagesOnSync = new HashMap<Long, LargeServerMessage>();
 
-   /** Used to hold the real Journals before the backup is synchronized. */
-   private final Map<JournalContent, Journal> journalsHolder = new HashMap<JournalContent, Journal>();
+   /**
+    * Used to hold the real Journals before the backup is synchronized. This field should be
+    * {@code null} on an up-to-date server.
+    */
+   private Map<JournalContent, Journal> journalsHolder = new HashMap<JournalContent, Journal>();
 
    private StorageManager storage;
 
@@ -192,13 +197,13 @@
             handleCompareDataMessage((ReplicationCompareDataMessage)packet);
             response = new NullResponseMessage();
          }
-         else if (type == PacketImpl.REPLICATION_START_SYNC)
+         else if (type == PacketImpl.REPLICATION_START_STOP_SYNC)
          {
             handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
          }
-         else if (type == PacketImpl.REPLICATION_SYNC)
+         else if (type == PacketImpl.REPLICATION_SYNC_FILE)
          {
-            handleReplicationSynchronization((ReplicationJournalFileMessage)packet);
+            handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
          }
          else
          {
@@ -306,7 +311,7 @@
 
       pageManager.stop();
 
-       started = false;
+      started = false;
    }
 
    /* (non-Javadoc)
@@ -387,57 +392,109 @@
 
    // Private -------------------------------------------------------
 
-   private void handleReplicationSynchronization(ReplicationJournalFileMessage msg) throws Exception
+   private void finishSynchronization() throws Exception
    {
-      if (msg.isUpToDate())
+      for (JournalContent jc : EnumSet.allOf(JournalContent.class))
       {
-         for (JournalContent jc : EnumSet.allOf(JournalContent.class))
+         JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
+         journal.writeLock();
+         try
          {
-            JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
-            journal.writeLock();
-            try
+            if (journal.getDataFiles().length != 0)
             {
-               if (journal.getDataFiles().length != 0)
+               throw new IllegalStateException("Journal should not have any data files at this point");
+            }
+            // files should be already in place.
+            filesReservedForSync.remove(jc);
+            getJournal(jc.typeByte).stop();
+            registerJournal(jc.typeByte, journal);
+            journal.loadInternalOnly();
+         }
+         finally
+         {
+            journal.writeUnlock();
+         }
+      }
+      synchronized (largeMessagesOnSync)
+      {
+         synchronized (largeMessages)
+         {
+            ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
+            for (Entry<Long, LargeServerMessage> entry : largeMessages.entrySet())
+            {
+               Long id = entry.getKey();
+               LargeServerMessage lm = entry.getValue();
+               if (largeMessagesOnSync.containsKey(id))
                {
-                  throw new IllegalStateException("Journal should not have any data files at this point");
+                  SequentialFile sq = lm.getFile();
+                  LargeServerMessage mainLM = largeMessagesOnSync.get(id);
+                  SequentialFile mainSeqFile = mainLM.getFile();
+                  System.out.println(mainSeqFile);
+                  for (;;)
+                  {
+                     buffer.rewind();
+                     int size = sq.read(buffer);
+                     mainSeqFile.writeInternal(buffer);
+                     if (size < buffer.capacity())
+                     {
+                        break;
+                     }
+                  }
                }
-               // files should be already in place.
-               filesReservedForSync.remove(jc);
-               getJournal(jc.typeByte).stop();
-               registerJournal(jc.typeByte, journal);
-               journal.loadInternalOnly();
-               // XXX HORNETQ-720 must reload journals
-               // XXX HORNETQ-720 must start using real journals
+               else
+               {
+                  // these are large-messages created after sync started
+                  largeMessagesOnSync.put(id, lm);
+               }
+            }
+            largeMessages.clear();
+            largeMessages.putAll(largeMessagesOnSync);
+         }
+      }
+      largeMessagesOnSync = null;
+      journalsHolder = null;
+      server.setRemoteBackupUpToDate();
+      log.info("Backup server " + server + " is synchronized with live-server.");
+      return;
+   }
 
-            }
-            finally
+   private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws Exception
+   {
+      Long id = Long.valueOf(msg.getId());
+      byte[] data = msg.getData();
+      SequentialFile sf;
+      if (msg.isLargeMessage())
+      {
+         synchronized (largeMessagesOnSync)
+         {
+            LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
+            if (largeMessage == null)
             {
-               journal.writeUnlock();
+               largeMessage = storage.createLargeMessage();
+               largeMessage.setDurable(true);
+               largeMessage.setMessageID(id);
+               largeMessagesOnSync.put(id, largeMessage);
             }
-
+            sf = largeMessage.getFile();
          }
-         server.setRemoteBackupUpToDate();
-         log.info("Backup server " + server + " is synchronized with live-server.");
-         return;
       }
+      else
+      {
+         JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(id);
+         sf = journalFile.getFile();
 
-      long id = msg.getFileId();
-      JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(Long.valueOf(id));
-
-      byte[] data = msg.getData();
+      }
       if (data == null)
       {
-         journalFile.getFile().close();
+         sf.close();
+         return;
       }
-      else
+
+      if (!sf.isOpen())
       {
-         SequentialFile sf = journalFile.getFile();
-         if (!sf.isOpen())
-         {
-            sf.open(1, false);
-         }
-         sf.writeDirect(ByteBuffer.wrap(data), true);
+         sf.open(1, false);
       }
+      sf.writeDirect(ByteBuffer.wrap(data), true);
    }
 
    /**
@@ -452,6 +509,13 @@
       {
          throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup can not be up-to-date!");
       }
+
+      if (packet.isSynchronizationFinished())
+      {
+         finishSynchronization();
+         return;
+      }
+
       final Journal journalIf = journalsHolder.get(packet.getJournalContentType());
 
       JournalImpl journal = assertJournalImpl(journalIf);
@@ -520,6 +584,18 @@
       else
       {
          message = largeMessages.get(messageId);
+         if (message == null)
+         {
+            synchronized (largeMessages)
+            {
+               if (!server.isRemoteBackupUpToDate())
+               {
+                  // in case we need to append data to a file while still sync'ing the backup
+                  createLargeMessage(messageId, true);
+                  message = largeMessages.get(messageId);
+               }
+            }
+         }
       }
 
       if (message == null)
@@ -537,13 +613,20 @@
     */
    private void handleLargeMessageBegin(final ReplicationLargeMessageBeingMessage packet)
    {
-      LargeServerMessage largeMessage = storage.createLargeMessage();
-      largeMessage.setDurable(true);
-      largeMessage.setMessageID(packet.getMessageId());
-      log.trace("Receiving Large Message " + largeMessage.getMessageID() + " on backup");
-      largeMessages.put(largeMessage.getMessageID(), largeMessage);
+      final long id = packet.getMessageId();
+      createLargeMessage(id, false);
+      log.trace("Receiving Large Message " + id + " on backup");
    }
 
+   private void createLargeMessage(final long id, boolean sync)
+   {
+      LargeServerMessage msg = storage.createLargeMessage();
+      msg.setDurable(true);
+      msg.setMessageID(id);
+      msg.setReplicationSync(sync);
+      largeMessages.put(id, msg);
+   }
+
    /**
     * @param packet
     */

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -44,7 +44,6 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -52,6 +51,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.utils.ExecutorFactory;
 
@@ -504,26 +504,58 @@
    }
 
    @Override
-   public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
+   public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
    {
       SequentialFile file = jf.getFile().copy();
       log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
+      sendLargeFile(content, jf.getFileID(), file, Long.MAX_VALUE);
+   }
+
+   @Override
+   public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception
+   {
+      sendLargeFile(null, id, file, size);
+   }
+
+   /**
+    * Sends large files in reasonably sized chunks to the backup during replication synchronization.
+    * @param content journal type or {@code null} for large-messages
+    * @param id journal file id or (large) message id
+    * @param file
+    * @param maxBytesToSend maximum number of bytes to read and send from the file
+    * @throws Exception
+    */
+   private void sendLargeFile(JournalContent content, final long id, SequentialFile file, long maxBytesToSend)
+            throws Exception
+   {
       if (!file.isOpen())
       {
          file.open(1, false);
       }
-      final long id = jf.getFileID();
       final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
       while (true)
       {
+         buffer.rewind();
          int bytesRead = file.read(buffer);
+         int toSend = bytesRead;
          if (bytesRead > 0)
-            buffer.limit(bytesRead);
+         {
+            if (bytesRead >= maxBytesToSend)
+            {
+               toSend = (int)maxBytesToSend;
+               maxBytesToSend = 0;
+            }
+            else
+            {
+               maxBytesToSend = maxBytesToSend - bytesRead;
+            }
+            buffer.limit(toSend);
+         }
          buffer.rewind();
 
          // sending -1 or 0 bytes will close the file at the backup
-         sendReplicatePacket(new ReplicationJournalFileMessage(bytesRead, buffer, content, id));
-         if (bytesRead == -1 || bytesRead == 0)
+         sendReplicatePacket(new ReplicationSyncFileMessage(content, id, bytesRead, buffer));
+         if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
             break;
       }
    }
@@ -537,7 +569,7 @@
    @Override
    public void sendSynchronizationDone()
    {
-      sendReplicatePacket(new ReplicationJournalFileMessage(-1, null, null, -1));
+      ReplicationStartSyncMessage msg = new ReplicationStartSyncMessage(null, null);
+      sendReplicatePacket(msg);
    }
-
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -13,14 +13,13 @@
 
 package org.hornetq.core.server;
 
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.journal.SequentialFile;
+
 /**
  * A LargeMessage
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * 
- * Created 30-Sep-08 10:58:04 AM
- *
- *
  */
 public interface LargeServerMessage extends ServerMessage
 {
@@ -30,13 +29,13 @@
    void setLinkedMessage(LargeServerMessage message);
 
    boolean isFileExists() throws Exception;
-   
+
    /**
     * We have to copy the large message content in case of DLQ and paged messages
     * For that we need to pre-mark the LargeMessage with a flag when it is paged
     */
    void setPaged();
-   
+
    /** Close the files if opened */
    void releaseResources();
 
@@ -45,4 +44,17 @@
    void incrementDelayDeletionCount();
 
    void decrementDelayDeletionCount() throws Exception;
+
+   /**
+    * This method only has relevance in a backup server.
+    * @param sync {@code true} if this file is meant for appends of a message that needs to be
+    *           sync'ed with the live.
+    */
+   void setReplicationSync(boolean sync);
+
+   /**
+    * @return
+    * @throws HornetQException
+    */
+   SequentialFile getFile() throws HornetQException;
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFileFactory.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -17,9 +17,9 @@
 import java.util.List;
 
 /**
- * 
+ *
  * A SequentialFileFactory
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
@@ -28,6 +28,12 @@
 {
    SequentialFile createSequentialFile(String fileName, int maxIO);
 
+   /**
+    * @param extension extension to filter files with. Its value should not contain '.', as the
+    *           method appends one to it.
+    * @return
+    * @throws Exception
+    */
    List<String> listFiles(String extension) throws Exception;
 
    boolean isSupportsCallbacks();
@@ -59,7 +65,7 @@
 
    void stop();
 
-   /** 
+   /**
     * Create the directory if it doesn't exist yet
     */
    void createDirs() throws Exception;

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -25,7 +25,7 @@
    private ClientSession session;
    private ClientProducer producer;
    private BackupSyncDelay syncDelay;
-   private static final int N_MSGS = 100;
+   private static final int N_MSGS = 10;
 
    @Override
    protected void setUp() throws Exception
@@ -115,7 +115,7 @@
       assertFalse("backup is started?", backupServer.isStarted());
       liveServer.removeInterceptor(syncDelay);
       backupServer.start();
-      waitForBackup(sessionFactory, 5);
+      waitForBackup(sessionFactory, 20);
       crash(session);
       waitForServerInitialization(backupServer, 5);
    }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-09-01 16:13:06 UTC (rev 11272)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-09-01 16:15:10 UTC (rev 11273)
@@ -13,8 +13,8 @@
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 
@@ -27,7 +27,7 @@
  * <p>
  * We need to hijack the replication channel handler, because we need to
  * <ol>
- * <li>send an early answer to the {@link PacketImpl#REPLICATION_SYNC} packet that signals being
+ * <li>send an early answer to the {@link PacketImpl#REPLICATION_SYNC_FILE} packet that signals being
  * up-to-date
  * <li>not send an answer to it, when we deliver the packet later.
  * </ol>
@@ -135,10 +135,10 @@
             deliver();
          }
 
-         if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
+         if (packet.getType() == PacketImpl.REPLICATION_START_STOP_SYNC && mustHold)
          {
-            ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
-            if (syncMsg.isUpToDate() && !deliver)
+            ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
+            if (syncMsg.isSynchronizationFinished() && !deliver)
             {
                receivedUpToDate = true;
                assert onHold == null;



More information about the hornetq-commits mailing list