[hornetq-commits] JBoss hornetq SVN: r11449 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/persistence and 10 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 30 04:58:29 EDT 2011


Author: borges
Date: 2011-09-30 04:58:28 -0400 (Fri, 30 Sep 2011)
New Revision: 11449

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
HORNETQ-720 Do not use AIO during backup synchronization
Also fix the logic at BackupSyncJournalTest for testing fileID reservation.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -1140,7 +1140,7 @@
             {
                continue;
             }
-            replicator.syncPages(fileFactory, sFile, id, getAddress());
+            replicator.syncPages(sFile, id, getAddress());
          }
       }
       finally

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -24,7 +24,6 @@
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
@@ -32,7 +31,6 @@
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.persistence.config.PersistedAddressSetting;
 import org.hornetq.core.persistence.config.PersistedRoles;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.replication.ReplicationManager;
@@ -225,13 +223,6 @@
    long storePageCounterInc(long queueID, int add) throws Exception;
 
    /**
-    * @param journalContent
-    * @return {@code true} if the underlying {@link SequentialFileFactory} has callback support.
-    * @see SequentialFileFactory#isSupportsCallbacks()
-    */
-   boolean hasCallbackSupport(JournalContent journalContent);
-
-   /**
     * @return the bindings journal
     */
    Journal getBindingsJournal();

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -441,7 +441,6 @@
          PagingStore store = manager.getPageStore(entry.getKey());
          store.sendPages(replicator, entry.getValue());
       }
-
    }
 
    /**
@@ -471,7 +470,7 @@
          SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName, 1);
          if (!seqFile.exists())
             continue;
-         replicator.syncLargeMessageFile(largeMessagesFactory, seqFile, size, getLargeMessageIdFromFilename(fileName));
+         replicator.syncLargeMessageFile(seqFile, size, getLargeMessageIdFromFilename(fileName));
       }
    }
 
@@ -509,7 +508,7 @@
    {
       for (JournalFile jf : journalFiles)
       {
-         replicator.syncJournalFile(factory, jf, type);
+         replicator.syncJournalFile(jf, type);
          jf.setCanReclaim(true);
       }
    }
@@ -3888,13 +3887,6 @@
       journal.stop();
    }
 
-   public boolean hasCallbackSupport(JournalContent journalContent)
-   {
-      if (journalContent == JournalContent.MESSAGES)
-      return messageJournalFileFactory.isSupportsCallbacks();
-      return bindingsJournalFileFactory.isSupportsCallbacks();
-   }
-
    @Override
    public boolean addToPage(PagingManager pagingManager,
       SimpleString address,

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -37,7 +37,6 @@
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.config.PersistedAddressSetting;
 import org.hornetq.core.persistence.config.PersistedRoles;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.replication.ReplicationManager;
@@ -574,12 +573,6 @@
    }
 
    @Override
-   public boolean hasCallbackSupport(JournalContent content)
-   {
-      return false;
-   }
-
-   @Override
    public Journal getBindingsJournal()
    {
       return null;

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -20,7 +20,6 @@
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.persistence.OperationContext;
@@ -89,13 +88,6 @@
    void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
 
    /**
-    * Sends the whole content of the file to be duplicated.
-    * @throws HornetQException
-    * @throws Exception
-    */
-   void syncJournalFile(SequentialFileFactory factory, JournalFile jf, JournalContent type) throws Exception;
-
-   /**
     * Reserve the following fileIDs in the backup server.
     * @param datafiles
     * @param contentType
@@ -111,11 +103,17 @@
    void sendSynchronizationDone();
 
    /**
+    * Sends the whole content of the file to be duplicated.
+    * @throws HornetQException
+    * @throws Exception
+    */
+   void syncJournalFile(JournalFile jf, JournalContent type) throws Exception;
+
+   /**
     * @param seqFile
     * @throws Exception
     */
-   void syncLargeMessageFile(SequentialFileFactory fctr, SequentialFile seqFile, long size, long id)
-                                                                                                       throws Exception;
+   void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws Exception;
 
    /**
     * @param file
@@ -123,5 +121,5 @@
     * @param pageStore
     * @throws Exception
     */
-   void syncPages(SequentialFileFactory factory, SequentialFile file, long id, SimpleString pageStore) throws Exception;
+   void syncPages(SequentialFile file, long id, SimpleString pageStore) throws Exception;
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -23,8 +23,10 @@
 import org.hornetq.core.journal.LoaderCallback;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalFilesRepository;
 import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.OperationContext;
@@ -579,7 +581,7 @@
    }
 
    @Override
-   public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception
+   public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
    {
       throw new UnsupportedOperationException("This method should only be called at a replicating backup");
    }
@@ -620,6 +622,18 @@
       throw new UnsupportedOperationException();
    }
 
+   @Override
+   public SequentialFileFactory getFileFactory()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public JournalFilesRepository getFilesRepository()
+   {
+      throw new UnsupportedOperationException();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -13,7 +13,11 @@
 
 package org.hornetq.core.replication.impl;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
@@ -85,8 +89,8 @@
    private final JournalLoadInformation[] journalLoadInformation = new JournalLoadInformation[2];
 
    /** Files reserved in each journal for synchronization of existing data from the 'live' server. */
-   private final Map<JournalContent, Map<Long, JournalFile>> filesReservedForSync =
-            new HashMap<JournalContent, Map<Long, JournalFile>>();
+   private final Map<JournalContent, Map<Long, JournalSyncFile>> filesReservedForSync =
+            new HashMap<JournalContent, Map<Long, JournalSyncFile>>();
    private Map<Long, LargeServerMessage> largeMessagesOnSync = new HashMap<Long, LargeServerMessage>();
 
    /**
@@ -240,7 +244,7 @@
 
       for (JournalContent jc : EnumSet.allOf(JournalContent.class))
       {
-         filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
+         filesReservedForSync.put(jc, new HashMap<Long, JournalSyncFile>());
          // We only need to load internal structures on the backup...
          journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly();
       }
@@ -446,7 +450,7 @@
    {
       Long id = Long.valueOf(msg.getId());
       byte[] data = msg.getData();
-      SequentialFile sf;
+      SequentialFile channel;
       switch (msg.getFileType())
       {
          case LARGE_MESSAGE:
@@ -461,38 +465,44 @@
                   largeMessage.setMessageID(id);
                   largeMessagesOnSync.put(id, largeMessage);
                }
-               sf = largeMessage.getFile();
+               channel = largeMessage.getFile();
             }
             break;
          }
-         case JOURNAL:
-         {
-            JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(id);
-            sf = journalFile.getFile();
-            break;
-         }
          case PAGE:
          {
             Page page = getPage(msg.getPageStore(), (int)msg.getId());
 
-            sf = page.getFile();
+            channel = page.getFile();
             break;
          }
+         case JOURNAL:
+         {
+            JournalSyncFile journalSyncFile = filesReservedForSync.get(msg.getJournalContent()).get(id);
+            FileChannel channel2 = journalSyncFile.getChannel();
+            if (data == null)
+            {
+               channel2.close();
+               return;
+            }
+            channel2.write(ByteBuffer.wrap(data));
+            return;
+         }
          default:
             throw new HornetQException(HornetQException.INTERNAL_ERROR, "Unhandled file type " + msg.getFileType());
       }
 
       if (data == null)
       {
-         sf.close();
+         channel.close();
          return;
       }
 
-      if (!sf.isOpen())
+      if (!channel.isOpen())
       {
-         sf.open(1, false);
+         channel.open(1, false);
       }
-      sf.writeDirect(data);
+      channel.writeDirect(ByteBuffer.wrap(data), true);
    }
 
    /**
@@ -516,10 +526,12 @@
 
       final Journal journal = journalsHolder.get(packet.getJournalContentType());
 
-      Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
-      JournalFile current = journal.createFilesForBackupSync(packet.getFileIds(), mapToFill);
-      registerJournal(packet.getJournalContentType().typeByte,
-                      new FileWrapperJournal(current, storage.hasCallbackSupport(packet.getJournalContentType())));
+      Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
+      for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
+      {
+         mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
+      }
+      registerJournal(packet.getJournalContentType().typeByte, new FileWrapperJournal(journal));
      }
 
    private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
@@ -808,4 +820,38 @@
    {
       return journals[journalID];
    }
+
+   public static class JournalSyncFile
+   {
+
+      private FileChannel channel;
+      private final File file;
+
+      public JournalSyncFile(JournalFile jFile) throws Exception
+      {
+         SequentialFile seqFile = jFile.getFile();
+         file = seqFile.getJavaFile();
+         seqFile.close();
+      }
+
+      FileChannel getChannel() throws Exception
+      {
+         if (channel == null)
+         {
+            channel = new FileOutputStream(file).getChannel();
+         }
+         return channel;
+      }
+
+      void close() throws IOException
+      {
+         channel.close();
+      }
+
+      @Override
+      public String toString()
+      {
+         return "JournalSyncFile(file=" + file.getAbsolutePath() + ")";
+      }
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -13,7 +13,9 @@
 
 package org.hornetq.core.replication.impl;
 
+import java.io.FileInputStream;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.LinkedHashSet;
 import java.util.Queue;
 import java.util.Set;
@@ -26,7 +28,6 @@
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagedMessage;
@@ -463,28 +464,26 @@
    }
 
    @Override
-   public void syncJournalFile(SequentialFileFactory factory, JournalFile jf, JournalContent content) throws Exception
+   public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
    {
       if (enabled)
       {
          SequentialFile file = jf.getFile().copy();
          log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
-         sendLargeFile(content, null, jf.getFileID(), file, factory, Long.MAX_VALUE);
+         sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
       }
    }
 
    @Override
-   public void
-      syncLargeMessageFile(SequentialFileFactory factory, SequentialFile file, long size, long id) throws Exception
+   public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception
    {
-      sendLargeFile(null, null, id, file, factory, size);
+      sendLargeFile(null, null, id, file, size);
    }
 
    @Override
-   public void
-      syncPages(SequentialFileFactory factory, SequentialFile file, long id, SimpleString queueName) throws Exception
+   public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception
    {
-      sendLargeFile(null, queueName, id, file, factory, Long.MAX_VALUE);
+      sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
    }
 
    /**
@@ -500,9 +499,7 @@
       SimpleString pageStore,
       final long id,
       SequentialFile file,
-      SequentialFileFactory factory,
-            long maxBytesToSend)
-            throws Exception
+      long maxBytesToSend) throws Exception
    {
       if (!enabled)
          return;
@@ -510,16 +507,17 @@
       {
          file.open();
       }
-      final ByteBuffer buffer = factory.newBuffer(1 << 17);
+      final FileChannel channel = (new FileInputStream(file.getJavaFile())).getChannel();
       try
       {
+         final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
          while (true)
-      {
-         buffer.rewind();
-         int bytesRead = file.read(buffer);
-         int toSend = bytesRead;
-         if (bytesRead > 0)
          {
+            buffer.clear();
+            int bytesRead = channel.read(buffer);
+            int toSend = bytesRead;
+            if (bytesRead > 0)
+            {
             if (bytesRead >= maxBytesToSend)
             {
                toSend = (int)maxBytesToSend;
@@ -541,7 +539,7 @@
       }
       finally
       {
-         factory.releaseBuffer(buffer);
+         channel.close();
       }
    }
 

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -17,6 +17,7 @@
 import java.util.Map;
 
 import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalFilesRepository;
 import org.hornetq.core.server.HornetQComponent;
 
 /**
@@ -148,13 +149,11 @@
     * During the synchronization between a live server and backup, we reserve in the backup the
     * journal file IDs used in the live server. This call also makes sure the files are created
     * empty without any kind of headers added.
-    * @param fileIds ids to reserve for synchronization
-    * @param mapToFill map to be filled with id and journal file pairs for <b>synchronization</b>.
-    * @return a new {@link JournalFile} to be used for regular <b>replication</b> during
-    *         synchronization
+    * @param fileIds IDs to reserve for synchronization
+    * @return map to be filled with id and journal file pairs for <b>synchronization</b>.
     * @throws Exception
     */
-   JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception;
+   Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception;
 
    /**
     * @return whether automatic reclaiming of Journal files is enabled
@@ -190,4 +189,9 @@
     */
    JournalFile[] getDataFiles();
 
+   SequentialFileFactory getFileFactory();
+
+   JournalFilesRepository getFilesRepository();
+
+   int getFileSize();
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.journal;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -128,4 +129,9 @@
    void copyTo(SequentialFile newFileName) throws Exception;
 
    void setTimedBuffer(TimedBuffer buffer);
+
+   /**
+    * Returns a native File of the file underlying this sequential file.
+    */
+   File getJavaFile();
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -37,8 +37,6 @@
 
    void debugWait() throws Exception;
 
-   int getFileSize();
-
    int getMinFiles();
 
    String getFilePrefix();

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -24,7 +24,6 @@
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.logging.Logger;
 
 /**
  *
@@ -35,8 +34,6 @@
  */
 public class AIOSequentialFile extends AbstractSequentialFile
 {
-   private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
-
    private boolean opened = false;
 
    private final int maxIO;

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -389,4 +389,9 @@
 
    }
 
+   @Override
+   public File getJavaFile()
+   {
+      return getFile().getAbsoluteFile();
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -15,6 +15,8 @@
 import org.hornetq.core.journal.LoaderCallback;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
 import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
@@ -36,15 +38,14 @@
 
    private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<Long, AtomicInteger>();
 
-   private final JournalFile currentFile;
-
    /**
-    * @param file
+    * @param journal
+    * @throws Exception
     */
-   public FileWrapperJournal(JournalFile file, boolean hasCallbackSupport)
+   public FileWrapperJournal(Journal journal) throws Exception
    {
-      super(hasCallbackSupport);
-      currentFile = file;
+      super(journal.getFileFactory(), journal.getFilesRepository(), journal.getFileSize());
+      setUpCurrentFile(JournalImpl.SIZE_HEADER);
    }
 
    @Override
@@ -56,7 +57,11 @@
    @Override
    public void stop() throws Exception
    {
-      currentFile.getFile().close();
+      SequentialFile seqFile = currentFile.getFile();
+      long pos = seqFile.position();
+      seqFile.close();
+      seqFile.open();
+      seqFile.position(pos);
    }
 
    @Override
@@ -92,7 +97,7 @@
          {
             callback.storeLineUp();
          }
-
+         switchFileIfNecessary(encoder.getEncodeSize());
          encoder.setFileID(currentFile.getRecordID());
 
          if (callback != null)
@@ -189,6 +194,12 @@
       return defaultValue.get();
    }
 
+   @Override
+   public String toString()
+   {
+      return FileWrapperJournal.class.getName() + "(currentFile=[" + currentFile + "], hash=" + super.toString() + ")";
+   }
+
    // UNSUPPORTED STUFF
 
    @Override
@@ -260,7 +271,7 @@
    }
 
    @Override
-   public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception
+   public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
    {
       throw new UnsupportedOperationException();
    }
@@ -300,4 +311,22 @@
    {
       throw new UnsupportedOperationException();
    }
+
+   @Override
+   void scheduleReclaim()
+   {
+      // no-op
+   }
+
+   @Override
+   public SequentialFileFactory getFileFactory()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public JournalFilesRepository getFilesRepository()
+   {
+      throw new UnsupportedOperationException();
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -3,16 +3,35 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.hornetq.core.logging.Logger;
 
 abstract class JournalBase
 {
 
-   private final boolean hasCallbackSupport;
+   protected final JournalFilesRepository filesRepository;
+   protected final SequentialFileFactory fileFactory;
+   protected volatile JournalFile currentFile;
+   protected final int fileSize;
 
-   public JournalBase(boolean hasCallbackSupport)
+   private static final Logger log = Logger.getLogger(JournalBase.class);
+   private static final boolean trace = log.isTraceEnabled();
+
+   public JournalBase(SequentialFileFactory fileFactory, JournalFilesRepository journalFilesRepository, int fileSize)
    {
-      this.hasCallbackSupport = hasCallbackSupport;
+      if (fileSize < JournalImpl.MIN_FILE_SIZE)
+      {
+         throw new IllegalArgumentException("File size cannot be less than " + JournalImpl.MIN_FILE_SIZE + " bytes");
+      }
+      if (fileSize % fileFactory.getAlignment() != 0)
+      {
+         throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " +
+                  fileFactory.getAlignment());
+      }
+      this.fileFactory = fileFactory;
+      this.filesRepository = journalFilesRepository;
+      this.fileSize = fileSize;
    }
 
    abstract public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record,
@@ -179,9 +198,56 @@
       }
    }
 
+   /**
+    * @param size
+    * @throws Exception
+    */
+   protected void switchFileIfNecessary(int size) throws Exception
+   {
+      // We take into account the fileID used on the Header
+      if (size > fileSize - currentFile.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER))
+      {
+         throw new IllegalArgumentException("Record is too large to store " + size);
+      }
+
+      if (!currentFile.getFile().fits(size))
+      {
+         moveNextFile(true);
+
+         // The same check needs to be done at the new file also
+         if (!currentFile.getFile().fits(size))
+         {
+            // Sanity check, this should never happen
+            throw new IllegalStateException("Invalid logic on buffer allocation");
+         }
+      }
+   }
+
+   abstract void scheduleReclaim();
+
+   // You need to guarantee lock.acquire() before calling this method
+   protected void moveNextFile(final boolean scheduleReclaim) throws Exception
+   {
+      filesRepository.closeFile(currentFile);
+
+      currentFile = filesRepository.openFile();
+
+      if (scheduleReclaim)
+      {
+         scheduleReclaim();
+      }
+
+      if (trace)
+      {
+         log.info("moveNextFile: " + currentFile);
+      }
+
+      fileFactory.activateBuffer(currentFile.getFile());
+   }
+
    protected SyncIOCompletion getSyncCallback(final boolean sync)
    {
-      if (hasCallbackSupport)
+      if (fileFactory.isSupportsCallbacks())
       {
          if (sync)
          {
@@ -213,4 +279,40 @@
       }
    }
 
+   /**
+    * @param lastDataPos
+    * @throws Exception
+    */
+   protected void setUpCurrentFile(int lastDataPos) throws Exception
+   {
+      // Create any more files we need
+
+      filesRepository.ensureMinFiles();
+
+      // The current file is the last one that has data
+
+      currentFile = filesRepository.pollLastDataFile();
+
+      if (currentFile != null)
+      {
+         currentFile.getFile().open();
+
+         currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
+      }
+      else
+      {
+         currentFile = filesRepository.getFreeFile();
+
+         filesRepository.openFile(currentFile, true);
+      }
+
+      fileFactory.activateBuffer(currentFile.getFile());
+
+      filesRepository.pushOpenedFile();
+   }
+
+   public int getFileSize()
+   {
+      return fileSize;
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -33,8 +33,6 @@
  * Guaranteeing that they will be delivered in order to the Journal
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
  */
 public class JournalFilesRepository
 {
@@ -91,6 +89,18 @@
                                  final int fileSize,
                                  final int minFiles)
    {
+      if (filePrefix == null)
+      {
+         throw new IllegalArgumentException("filePrefix cannot be null");
+      }
+      if (fileExtension == null)
+      {
+         throw new IllegalArgumentException("fileExtension cannot be null");
+      }
+      if (maxAIO <= 0)
+      {
+         throw new IllegalArgumentException("maxAIO must be a positive number");
+      }
       this.fileFactory = fileFactory;
       this.maxAIO = maxAIO;
       this.filePrefix = filePrefix;
@@ -447,9 +457,9 @@
     * Creates files for journal synchronization of a replicated backup.
     * @param isCurrent a current file is initialized and kept open.
     */
-   public JournalFile createRemoteBackupSyncFile(long fileID, boolean isCurrent) throws Exception
+   public JournalFile createRemoteBackupSyncFile(long fileID) throws Exception
    {
-      return createFile(isCurrent, false, isCurrent, false, fileID);
+      return createFile(false, false, true, false, fileID);
    }
 
    // Package protected ---------------------------------------------
@@ -576,4 +586,11 @@
 
       return jf;
    }
+
+   @Override
+   public String toString()
+   {
+      return "JournalFilesRepository(dataFiles=" + dataFiles + ", freeFiles=" + freeFiles + ", openedFiles=" +
+               openedFiles + ")";
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -18,6 +18,7 @@
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -171,18 +172,12 @@
 
    private final int userVersion;
 
-   private final int fileSize;
-
    private final int minFiles;
 
    private final float compactPercentage;
 
    private final int compactMinFiles;
 
-   private final SequentialFileFactory fileFactory;
-
-   private final JournalFilesRepository filesRepository;
-
    // Compacting may replace this structure
    private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
 
@@ -212,8 +207,6 @@
     */
    private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
 
-   private volatile JournalFile currentFile;
-
    private volatile JournalState state = JournalState.STOPPED;
 
    private final Reclaimer reclaimer = new Reclaimer();
@@ -229,47 +222,12 @@
                       final String fileExtension,
                       final int maxAIO)
    {
-      this(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
-   }
-
-   public JournalImpl(final int fileSize,
-                      final int minFiles,
-                      final int compactMinFiles,
-                      final int compactPercentage,
-                      final SequentialFileFactory fileFactory,
-                      final String filePrefix,
-                      final String fileExtension,
-                      final int maxAIO,
-                      final int userVersion)
-   {
-      super(fileFactory.isSupportsCallbacks());
-      if (fileSize < JournalImpl.MIN_FILE_SIZE)
-      {
-         throw new IllegalArgumentException("File size cannot be less than " + JournalImpl.MIN_FILE_SIZE + " bytes");
-      }
-      if (fileSize % fileFactory.getAlignment() != 0)
-      {
-         throw new IllegalArgumentException("Invalid journal-file-size " + fileSize +
-                                            ", It should be multiple of " +
-                                            fileFactory.getAlignment());
-      }
+      super(fileFactory, new JournalFilesRepository(fileFactory, filePrefix, fileExtension, 0, maxAIO, fileSize,
+                                                    minFiles), fileSize);
       if (minFiles < 2)
       {
          throw new IllegalArgumentException("minFiles cannot be less than 2");
       }
-      if (filePrefix == null)
-      {
-         throw new NullPointerException("filePrefix is null");
-      }
-      if (fileExtension == null)
-      {
-         throw new NullPointerException("fileExtension is null");
-      }
-      if (maxAIO <= 0)
-      {
-         throw new IllegalStateException("maxAIO should aways be a positive number");
-      }
-
       if (compactPercentage < 0 || compactPercentage > 100)
       {
          throw new IllegalArgumentException("Compact Percentage out of range");
@@ -285,28 +243,14 @@
       }
 
       this.compactMinFiles = compactMinFiles;
-
-      this.fileSize = fileSize;
-
       this.minFiles = minFiles;
-
-      this.fileFactory = fileFactory;
-
-      filesRepository = new JournalFilesRepository(fileFactory,
-                                                   filePrefix,
-                                                   fileExtension,
-                                                   userVersion,
-                                                   maxAIO,
-                                                   fileSize,
-                                                   minFiles);
-
-      this.userVersion = userVersion;
+      this.userVersion = 0;
    }
 
    @Override
    public String toString()
    {
-      return super.toString() + " " + state;
+      return "JournalImpl(state=" + state + ", currentFile=[" + currentFile + "], hash=" + super.toString() + ")";
    }
 
    public void runDirectJournalBlast() throws Exception
@@ -2041,31 +1985,8 @@
          return new JournalLoadInformation(0, -1);
       }
 
-      // Create any more files we need
+      setUpCurrentFile(lastDataPos);
 
-      filesRepository.ensureMinFiles();
-
-      // The current file is the last one that has data
-
-      currentFile = filesRepository.pollLastDataFile();
-
-      if (currentFile != null)
-      {
-         currentFile.getFile().open();
-
-         currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
-      }
-      else
-      {
-         currentFile = filesRepository.getFreeFile();
-
-         filesRepository.openFile(currentFile, true);
-      }
-
-      fileFactory.activateBuffer(currentFile.getFile());
-
-      filesRepository.pushOpenedFile();
-
       setJournalState(JournalState.LOADED);
 
       for (TransactionHolder transaction : loadTransactions.values())
@@ -2162,7 +2083,7 @@
          totalLiveSize += file.getLiveSize();
       }
 
-      long totalBytes = (long)dataFiles.length * (long)fileSize;
+      long totalBytes = dataFiles.length * (long)fileSize;
 
       long compactMargin = (long)(totalBytes * compactPercentage);
 
@@ -2224,7 +2145,7 @@
    // TestableJournal implementation
    // --------------------------------------------------------------
 
-   public void setAutoReclaim(final boolean autoReclaim)
+   public synchronized void setAutoReclaim(final boolean autoReclaim)
    {
       this.autoReclaim = autoReclaim;
    }
@@ -2329,6 +2250,7 @@
       return records.size();
    }
 
+   @Override
    public int getFileSize()
    {
       return fileSize;
@@ -2775,26 +2697,10 @@
 
       final IOAsyncTask callback;
 
-      int size = encoder.getEncodeSize();
+      final int size = encoder.getEncodeSize();
 
-      // We take into account the fileID used on the Header
-      if (size > fileSize - currentFile.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER))
-      {
-         throw new IllegalArgumentException("Record is too large to store " + size);
-      }
+      switchFileIfNecessary(size);
 
-      if (!currentFile.getFile().fits(size))
-      {
-         moveNextFile(true);
-
-         // The same check needs to be done at the new file also
-         if (!currentFile.getFile().fits(size))
-         {
-            // Sanity check, this should never happen
-            throw new IllegalStateException("Invalid logic on buffer allocation");
-         }
-      }
-
       if (tx != null)
       {
          // The callback of a transaction has to be taken inside the lock,
@@ -2842,28 +2748,9 @@
       return currentFile;
    }
 
-   // You need to guarantee lock.acquire() before calling this method
-   private void moveNextFile(final boolean scheduleReclaim) throws Exception
+   @Override
+   void scheduleReclaim()
    {
-      filesRepository.closeFile(currentFile);
-
-      currentFile = filesRepository.openFile();
-
-      if (scheduleReclaim)
-      {
-         scheduleReclaim();
-      }
-
-      if (JournalImpl.trace)
-      {
-         JournalImpl.trace("moveNextFile: " + currentFile);
-      }
-
-      fileFactory.activateBuffer(currentFile.getFile());
-   }
-
-   private void scheduleReclaim()
-   {
       if (state != JournalState.LOADED)
       {
          return;
@@ -3102,21 +2989,21 @@
     * @throws Exception
     */
    @Override
-   public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
+   public synchronized Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
    {
       writeLock();
       try
       {
+         Map<Long, JournalFile> map = new HashMap<Long, JournalFile>();
          log.info("Reserving fileIDs before synchronization: " + Arrays.toString(fileIds));
          long maxID = -1;
          for (long id : fileIds)
          {
             maxID = Math.max(maxID, id);
-            map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id, false));
+            map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
          }
-         maxID += 1;
          filesRepository.setNextFileID(maxID);
-         return filesRepository.createRemoteBackupSyncFile(maxID, true);
+         return map;
       }
       finally
       {
@@ -3128,4 +3015,16 @@
    {
       return autoReclaim;
    }
+
+   @Override
+   public SequentialFileFactory getFileFactory()
+   {
+      return fileFactory;
+   }
+
+   @Override
+   public JournalFilesRepository getFilesRepository()
+   {
+      return filesRepository;
+   }
 }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -28,7 +28,7 @@
    private ClientSession session;
    private ClientProducer producer;
    private BackupSyncDelay syncDelay;
-   protected int n_msgs = 10;
+   protected int n_msgs = 20;
 
    @Override
    protected void setUp() throws Exception
@@ -64,12 +64,19 @@
       }
 
       backupServer.start();
-      syncDelay.deliverUpToDateMsg();
+
+      // Deliver messages with Backup in-sync
       waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
+      sendMessages(session, producer, n_msgs);
 
+      // Deliver messages with Backup up-to-date
+      syncDelay.deliverUpToDateMsg();
+      waitForBackup(sessionFactory, BACKUP_WAIT_TIME, true);
       // SEND more messages, now with the backup replicating
       sendMessages(session, producer, n_msgs);
+
       Set<Long> liveIds = getFileIds(messageJournal);
+      int size = messageJournal.getFileSize();
       PagingStore ps = liveServer.getServer().getPagingManager().getPageStore(ADDRESS);
       if (ps.getPageSizeBytes() == PAGE_SIZE)
       {
@@ -79,11 +86,14 @@
       finishSyncAndFailover();
 
       JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+      System.out.println("backup journal " + backupMsgJournal);
+      System.out.println("live journal " + messageJournal);
+      assertEquals("file sizes must be the same", size, backupMsgJournal.getFileSize());
       Set<Long> backupIds = getFileIds(backupMsgJournal);
       assertEquals("File IDs must match!", liveIds, backupIds);
 
       // "+ 2": there two other calls that send N_MSGS.
-      for (int i = 0; i < totalRounds + 2; i++)
+      for (int i = 0; i < totalRounds + 3; i++)
       {
          receiveMsgsInRange(0, n_msgs);
       }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -1571,7 +1571,7 @@
 
       ClientMessage message = consumer.receiveImmediate();
 
-      Assert.assertNull("Null message", message);
+      Assert.assertNull("expecting null message", message);
 
       session2.close();
 
@@ -1610,7 +1610,7 @@
 
       backupServer.start();
 
-      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      assertTrue("session failure listener", latch.await(5, TimeUnit.SECONDS));
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
@@ -1772,7 +1772,7 @@
 
       backupServer.start();
 
-      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      assertTrue("session failure listener", latch.await(5, TimeUnit.SECONDS));
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -84,6 +84,7 @@
       private Packet onHold;
       private Channel channel;
       public volatile boolean deliver;
+      private volatile boolean delivered;
       private boolean receivedUpToDate;
       private boolean mustHold = true;
 
@@ -97,6 +98,8 @@
          deliver = true;
          if (!receivedUpToDate)
             return;
+         if (delivered)
+            return;
 
          if (onHold == null)
          {
@@ -108,6 +111,7 @@
          try
          {
             handler.handlePacket(onHold);
+            delivered = true;
          }
          finally
          {

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -51,8 +51,10 @@
 import org.hornetq.core.journal.LoaderCallback;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalFilesRepository;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
@@ -853,7 +855,7 @@
       }
 
       @Override
-      public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception
+      public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws Exception
       {
          return null;
       }
@@ -894,5 +896,17 @@
          return null;
       }
 
+      @Override
+      public SequentialFileFactory getFileFactory()
+      {
+         return null;
+      }
+
+      @Override
+      public JournalFilesRepository getFilesRepository()
+      {
+         return null;
+      }
+
    }
 }

Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2011-09-30 08:55:35 UTC (rev 11448)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2011-09-30 08:58:28 UTC (rev 11449)
@@ -13,6 +13,7 @@
 
 package org.hornetq.tests.unit.core.journal.impl.fakes;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -28,7 +29,6 @@
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.TimedBuffer;
-import org.hornetq.core.logging.Logger;
 
 /**
  *
@@ -40,8 +40,6 @@
  */
 public class FakeSequentialFileFactory implements SequentialFileFactory
 {
-   private static final Logger log = Logger.getLogger(FakeSequentialFileFactory.class);
-
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
@@ -285,11 +283,6 @@
          }
       }
 
-      public boolean isSendError()
-      {
-         return sendError;
-      }
-
       public void setSendError(final boolean sendError)
       {
          this.sendError = sendError;
@@ -688,6 +681,12 @@
          HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
          write(outbuffer, true);
       }
+
+      @Override
+      public File getJavaFile()
+      {
+         throw new UnsupportedOperationException();
+      }
    }
 
    /* (non-Javadoc)



More information about the hornetq-commits mailing list