[hornetq-commits] JBoss hornetq SVN: r11070 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 29 13:27:24 EDT 2011


Author: borges
Date: 2011-07-29 13:27:23 -0400 (Fri, 29 Jul 2011)
New Revision: 11070

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 Add data file sync code and backupServer may only start if journals are sync'ed.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-07-29 17:27:23 UTC (rev 11070)
@@ -343,10 +343,7 @@
    }
 
    /**
-    * XXX FIXME Method ignores the synchronization of LargeMessages and Paging.
-    * <p>
-    * XXX A second version improvement would be to allow new operations to be sent to the backup,
-    * while we synchronize the existing logs.
+    * XXX FIXME HORNETQ-720 Method ignores the synchronization of LargeMessages and Paging.
     * @param replicationManager
     * @throws HornetQException
     */
@@ -354,7 +351,7 @@
    {
       if (!started)
       {
-         throw new IllegalStateException("must be started...");
+         throw new IllegalStateException("JournalStorageManager must be started...");
       }
       assert replicationManager != null;
       replicator = replicationManager;
@@ -364,7 +361,7 @@
          throw new HornetQException(HornetQException.INTERNAL_ERROR,
                                     "journals here are not JournalImpl. You can't set a replicator!");
       }
-      // XXX NEED to take a global lock on the StorageManager.
+      // XXX HORNETQ-720 WRITE LOCK the StorageManager.
 
       final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
       final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
@@ -378,14 +375,15 @@
       localMessageJournal.writeUnlock();
       localBindingsJournal.writeUnlock();
 
-
       bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
       messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
 
+      // XXX HORNETQ-720 UNLOCK StorageManager...
+
       sendJournalFile(messageFiles, JournalContent.MESSAGES);
       sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
 
-      // SEND "SYNC_DONE" msg to backup telling it can become operational.
+      replicator.sendSynchronizationDone();
    }
 
    /**
@@ -408,8 +406,8 @@
    {
       journal.setAutoReclaim(false);
       /*
-       * XXX need to check whether it is safe to proceed if compacting is running (specially at the
-       * end of it)
+       * XXX HORNETQ-720 need to check whether it is safe to proceed if compacting is running
+       * (specially at the end of it)
        */
       journal.forceMoveNextFile();
       JournalFile[] datafiles = journal.getDataFiles();

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-07-29 17:27:23 UTC (rev 11070)
@@ -5,7 +5,11 @@
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 
 /**
- * Used to copy JournalFile data over to the backup during synchronization.
+ * Message is used to:
+ * <ol>
+ * <li>copy JournalFile data over to the backup during synchronization;
+ * <li>send a up-to-date signal to backup;
+ * </ol>
  */
 public final class ReplicationJournalFileMessage extends PacketImpl
 {
@@ -13,7 +17,9 @@
    private byte[] data;
    private int dataSize;
    private JournalContent journalType;
+   /** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()} */
    private long fileId;
+   private boolean backupIsUpToDate = false;
 
    public ReplicationJournalFileMessage()
    {
@@ -33,18 +39,56 @@
    public void encodeRest(final HornetQBuffer buffer)
    {
       buffer.writeLong(fileId);
+      if (fileId == -1)
+         return;
       buffer.writeByte(journalType.typeByte);
       buffer.writeInt(dataSize);
-      buffer.writeBytes(data, 0, dataSize);
+      // sending -1 will close the file
+      if (dataSize > -1)
+      {
+         buffer.writeBytes(data, 0, dataSize);
+      }
    }
 
    @Override
    public void decodeRest(final HornetQBuffer buffer)
    {
       fileId = buffer.readLong();
+      if (fileId == -1)
+      {
+         backupIsUpToDate = true;
+         return;
+      }
       journalType = JournalContent.getType(buffer.readByte());
       int size = buffer.readInt();
-      data = new byte[size];
-      buffer.readBytes(data);
+      if (size > -1)
+      {
+         data = new byte[size];
+         buffer.readBytes(data);
+      }
    }
+
+   public long getFileId()
+   {
+      return fileId;
+   }
+
+   public byte[] getData()
+   {
+      return data;
+   }
+
+   public JournalContent getJournalContent()
+   {
+      return journalType;
+   }
+
+   /**
+    * @return {@code true} if the live has finished synchronizing its data and the backup is
+    *         therefore up-to-date, {@code false} otherwise.
+    */
+   public boolean isUpToDate()
+   {
+      return backupIsUpToDate;
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-07-29 17:27:23 UTC (rev 11070)
@@ -101,4 +101,11 @@
     */
    void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException;
 
+   /**
+    * Informs backup that data synchronization is done.
+    * <p>
+    * So if 'live' fails, the (up-to-date) backup now may take over its duties.
+    */
+   void sendSynchronizationDone();
+
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-07-29 17:27:23 UTC (rev 11070)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.replication.impl;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -21,6 +22,8 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
@@ -42,6 +45,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -50,15 +54,12 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
 
 /**
- *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
  */
 public class ReplicationEndpointImpl implements ReplicationEndpoint
 {
@@ -71,7 +72,7 @@
 
    private static final boolean trace = log.isTraceEnabled();
 
-   private final HornetQServer server;
+   private final HornetQServerImpl server;
 
    private Channel channel;
 
@@ -94,7 +95,7 @@
    private boolean started;
 
     // Constructors --------------------------------------------------
-   public ReplicationEndpointImpl(final HornetQServer server)
+   public ReplicationEndpointImpl(final HornetQServerImpl server)
    {
       this.server = server;
    }
@@ -184,6 +185,10 @@
          {
             handleJournalFileIdReservation((ReplicationFileIdMessage)packet);
          }
+         else if (type == PacketImpl.REPLICATION_SYNC)
+         {
+            handleReplicationSynchronization((ReplicationJournalFileMessage)packet);
+         }
          else
          {
             log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
@@ -362,21 +367,63 @@
 
    // Private -------------------------------------------------------
 
-   private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet) throws HornetQException
+   private void handleReplicationSynchronization(ReplicationJournalFileMessage msg) throws Exception
    {
-      final Journal journalIf = journals[packet.getJournalContentType().typeByte];
-      if (journalIf.isStarted())
+      if (msg.isUpToDate())
       {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Journal can not be started!");
+         // XXX HORNETQ-720 must reload journals(?)
+         for (Journal j : journals)
+         {
+            JournalImpl journal = (JournalImpl)j;
+            journal.finishRemoteBackupSync();
+         }
+         server.setRemoteBackupUpToDate(true);
+         return;
       }
 
+      Journal journalIf = getJournal(msg.getJournalContent().typeByte);
+      JournalImpl journal = assertJournalImpl(journalIf);
+
+      long id = msg.getFileId();
+      JournalFile journalFile = journal.getRemoteBackupSyncFile(id);
+      byte[] data = msg.getData();
+      if (data == null)
+      {
+         journalFile.getFile().close();
+      }
+      else
+      {
+         SequentialFile sf = journalFile.getFile();
+         if (!sf.isOpen())
+         {
+            sf.open(1, false);
+         }
+         sf.writeDirect(ByteBuffer.wrap(data), true);
+      }
+      // journal.get
+   }
+
+   private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet) throws Exception
+   {
+      if (server.isRemoteBackupUpToDate())
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup can not be up-to-date!");
+      }
+
+      final Journal journalIf = journals[packet.getJournalContentType().typeByte];
+
+      JournalImpl journal = assertJournalImpl(journalIf);
+      journal.createFilesForRemoteSync(packet.getFileIds());
+   }
+
+   private static JournalImpl assertJournalImpl(final Journal journalIf) throws HornetQException
+   {
       if (!(journalIf instanceof JournalImpl))
       {
          throw new HornetQException(HornetQException.INTERNAL_ERROR,
                                     "Journals of backup server are expected to be JournalImpl");
       }
-      JournalImpl journal = (JournalImpl)journalIf;
-
+      return (JournalImpl)journalIf;
    }
 
    private void handleLargeMessageEnd(final ReplicationLargemessageEndMessage packet)

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-29 17:27:23 UTC (rev 11070)
@@ -513,9 +513,10 @@
       while (true)
       {
          int bytesRead = file.read(data);
+         // sending -1 bytes will close the file.
+         replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, data, content, id));
          if (bytesRead == -1)
             break;
-         replicatingChannel.sendBlocking(new ReplicationJournalFileMessage(bytesRead, data, content, id));
       }
       // XXX probably need to sync the JournalFile(?)
       throw new UnsupportedOperationException();
@@ -524,7 +525,13 @@
    @Override
    public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
    {
-      replicatingChannel.sendBlocking(new ReplicationFileIdMessage(datafiles, contentType));
+      replicatingChannel.send(new ReplicationFileIdMessage(datafiles, contentType));
    }
 
+   @Override
+   public void sendSynchronizationDone()
+   {
+      replicatingChannel.send(new ReplicationJournalFileMessage(-1, null, null, -1));
+   }
+
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-29 17:27:23 UTC (rev 11070)
@@ -219,6 +219,12 @@
 
    private boolean initialised;
 
+   /**
+    * Only applicable to 'remote backup servers'. If this flag is false the backup may not become
+    * 'live'.
+    */
+   private volatile boolean backupUpToDate = true;
+
   // private FailoverManager replicationFailoverManager;
 
    private ReplicationManager replicationManager;
@@ -587,8 +593,20 @@
             // Server node (i.e. Life node) is not running, now the backup takes over.
             //we must remember to close stuff we don't need any more
             nodeManager.awaitLiveNode();
+
             serverLocator.close();
             replicationEndpoint.stop();
+
+            if (!isRemoteBackupUpToDate())
+            {
+               /*
+                * XXX HORNETQ-720 Live is down, and this server was not in sync. Perhaps we should
+                * first try to wait a little longer to see if the 'live' comes back?
+                */
+               throw new RuntimeException("Backup Server was not yet in sync with live");
+            }
+
+
             configuration.setBackup(false);
 
             initialisePart2();
@@ -603,7 +621,7 @@
       }
 
       public void close(final boolean permanently) throws Exception
-   {
+      {
          if(serverLocator != null)
          {
             serverLocator.close();
@@ -696,6 +714,7 @@
          else
          {
             assert replicationEndpoint == null;
+            backupUpToDate = false;
             replicationEndpoint = new ReplicationEndpointImpl(this);
             activation = new SharedNothingBackupActivation();
          }
@@ -2014,4 +2033,21 @@
 
       journalStorageManager.setReplicator(replicationManager);
    }
+
+   /**
+    * Whether a remote backup server was in sync with its live server. If it was not in sync, it may
+    * not take over the live's functions.
+    * <p>
+    * A local backup server or a live server should always return {@code true}
+    * @return
+    */
+   public boolean isRemoteBackupUpToDate()
+   {
+      return backupUpToDate;
+   }
+
+   public void setRemoteBackupUpToDate(boolean isUpToDate)
+   {
+      backupUpToDate = isUpToDate;
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-07-29 17:27:23 UTC (rev 11070)
@@ -15,6 +15,7 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -63,6 +64,8 @@
 
    private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
 
+   private Map<Long, JournalFile> filesReservedForSync;
+
    private final AtomicLong nextFileID = new AtomicLong(0);
 
    private final int maxAIO;
@@ -175,7 +178,7 @@
          for (int i = 0; i < filesToCreate; i++)
          {
             // Keeping all files opened can be very costly (mainly on AIO)
-            freeFiles.add(createFile(false, false, true, false));
+            freeFiles.add(createFile(false, false, true, false, -1));
          }
       }
 
@@ -416,7 +419,7 @@
 
       if (nextFile == null)
       {
-         nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+         nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension, -1);
       }
       else
       {
@@ -434,6 +437,12 @@
       return nextFile;
    }
 
+   public void createRemoteBackupSyncFile(long fileID) throws Exception
+   {
+      assert !filesReservedForSync.containsKey(Long.valueOf(fileID));
+      filesReservedForSync.put(Long.valueOf(fileID), createFile(false, false, false, false, fileID));
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -449,9 +458,10 @@
    private JournalFile createFile(final boolean keepOpened,
                                   final boolean multiAIO,
                                   final boolean init,
-                                  final boolean tmpCompact) throws Exception
+                                  final boolean tmpCompact,
+                                  final long fileIdPreSet) throws Exception
    {
-      long fileID = generateFileID();
+      long fileID = fileIdPreSet != -1 ? fileIdPreSet : generateFileID();
 
       String fileName;
 
@@ -560,6 +570,22 @@
       return jf;
    }
 
-   // Inner classes -------------------------------------------------
+   /**
+    * @param id
+    * @return
+    */
+   public JournalFile getRemoteBackupSyncFile(long id)
+   {
+      return filesReservedForSync.get(Long.valueOf(id));
+   }
 
+   public Collection<? extends JournalFile> getSyncFiles()
+   {
+      return filesReservedForSync.values();
+   }
+
+   public void clearSyncFiles()
+   {
+      filesReservedForSync.clear();
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-07-29 17:27:23 UTC (rev 11070)
@@ -3266,4 +3266,64 @@
    {
       journalLock.writeLock().unlock();
    }
+
+   /**
+    * @param fileIds
+    * @throws Exception
+    */
+   public void createFilesForRemoteSync(long[] fileIds) throws Exception
+   {
+      writeLock();
+      try
+      {
+         long maxID = -1;
+         for (long id : fileIds)
+         {
+            maxID = Math.max(maxID, id);
+            filesRepository.createRemoteBackupSyncFile(id);
+         }
+      }
+      finally
+      {
+         writeUnlock();
+      }
+   }
+
+   /**
+    * @param id
+    * @return
+    */
+   public JournalFile getRemoteBackupSyncFile(long id)
+   {
+      return filesRepository.getRemoteBackupSyncFile(id);
+   }
+
+   /**
+    *
+    */
+   public void finishRemoteBackupSync()
+   {
+      writeLock();
+      try
+      {
+         lockAppend.lock();
+         List<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>();
+         dataFilesToProcess.addAll(filesRepository.getDataFiles());
+         filesRepository.clearDataFiles();
+         dataFilesToProcess.addAll(filesRepository.getSyncFiles());
+         filesRepository.clearSyncFiles();
+         Collections.sort(dataFilesToProcess, new JournalFileComparator());
+         for (JournalFile file : dataFilesToProcess)
+         {
+            filesRepository.addDataFileOnTop(file);
+         }
+         // XXX HORNETQ-720 still missing a "reload" call
+      }
+      finally
+      {
+         lockAppend.unlock();
+         writeUnlock();
+      }
+
+   }
 }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-07-29 17:25:36 UTC (rev 11069)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-07-29 17:27:23 UTC (rev 11070)
@@ -1,5 +1,8 @@
 package org.hornetq.tests.integration.cluster.failover;
 
+import java.util.HashSet;
+import java.util.Set;
+
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
@@ -7,6 +10,10 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
 import org.hornetq.tests.util.TransportConfigurationUtils;
 
 public class BackupJournalSyncTest extends FailoverTestBase
@@ -15,6 +22,7 @@
    private ServerLocatorInternal locator;
    private ClientSessionFactoryInternal sessionFactory;
    private ClientSession session;
+   private ClientProducer producer;
    private static final int N_MSGS = 100;
 
    @Override
@@ -38,15 +46,50 @@
                    backupServer.getServer().getNodeID());
    }
 
+   public void testReserveFileIdValuesOnBackup() throws Exception
+   {
+      createProducerSendSomeMessages();
+      JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
+      for (int i = 0; i < 5; i++)
+      {
+         messageJournal.forceMoveNextFile();
+         sendMessages(session, producer, N_MSGS);
+      }
+      backupServer.start();
+      waitForBackup(sessionFactory, 5);
+      // XXX HORNETQ-720 must wait for backup to sync!
+
+      JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+      Set<Long> bckpIds = getFileIds(backupMsgJournal);
+      assertFalse(bckpIds.isEmpty());
+      Set<Long> liveIds = getFileIds(messageJournal);
+      assertEquals("sets must match! " + liveIds, bckpIds, liveIds);
+   }
+
+   /**
+    * @param backupMsgJournal
+    * @return
+    */
+   private Set<Long> getFileIds(JournalImpl journal)
+   {
+      Set<Long> results = new HashSet<Long>();
+      for (JournalFile jf : journal.getDataFiles())
+      {
+         results.add(Long.valueOf(jf.getFileID()));
+      }
+      return results;
+   }
+
+   static JournalImpl getMessageJournalFromServer(TestableServer server)
+   {
+      JournalStorageManager sm = (JournalStorageManager)server.getServer().getStorageManager();
+      return (JournalImpl)sm.getMessageJournal();
+   }
+
    public void testMessageSync() throws Exception
    {
-      session = sessionFactory.createSession(true, true);
-      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
-      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+      createProducerSendSomeMessages();
 
-      sendMessages(session, producer, N_MSGS);
-      session.start();
-
       receiveMsgs(0, N_MSGS / 2);
       assertFalse("backup is not started!", backupServer.isStarted());
 
@@ -56,11 +99,20 @@
       waitForBackup(sessionFactory, 5);
       crash(session);
 
-
       // consume N/2 from 'new' live (the old backup)
       receiveMsgs(N_MSGS / 2, N_MSGS);
    }
 
+   private void createProducerSendSomeMessages() throws HornetQException, Exception
+   {
+      session = sessionFactory.createSession(true, true);
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+      producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      sendMessages(session, producer, N_MSGS);
+      session.start();
+   }
+
    private void receiveMsgs(int start, int end) throws HornetQException
    {
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);



More information about the hornetq-commits mailing list