[hornetq-commits] JBoss hornetq SVN: r11120 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 4 11:28:13 EDT 2011


Author: borges
Date: 2011-08-04 11:28:12 -0400 (Thu, 04 Aug 2011)
New Revision: 11120

Added:
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
HORNETQ-720 Move the sync-files control into ReplicationEndpointImpl

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-04 15:26:34 UTC (rev 11119)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-04 15:28:12 UTC (rev 11120)
@@ -14,6 +14,9 @@
 package org.hornetq.core.replication.impl;
 
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -77,13 +80,17 @@
    private Channel channel;
 
    private Journal[] journals;
+   private JournalLoadInformation[] journalLoadInformation;
+   // Files reserved in each journal for synchronization of existing data from the 'live' server
+   private final Map<JournalContent, Map<Long, JournalFile>> filesReservedForSync =
+            new HashMap<JournalContent, Map<Long, JournalFile>>();
 
    private JournalStorageManager storage;
 
    private PagingManager pageManager;
 
-   private JournalLoadInformation[] journalLoadInformation;
 
+
    private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex =
             new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
    private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
@@ -104,6 +111,7 @@
 
    public void registerJournal(final byte id, final Journal journal)
    {
+
       if (journals == null || id >= journals.length)
       {
          Journal[] oldJournals = journals;
@@ -228,9 +236,14 @@
 
       server.getManagementService().setStorageManager(storage);
 
+      registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
       registerJournal(JournalContent.MESSAGES.typeByte, storage.getMessageJournal());
-      registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
 
+      for (JournalContent jc : EnumSet.allOf(JournalContent.class))
+      {
+         filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
+      }
+
       // We only need to load internal structures on the backup...
       journalLoadInformation = storage.loadInternalOnly();
 
@@ -371,22 +384,34 @@
    {
       if (msg.isUpToDate())
       {
-         // XXX HORNETQ-720 must reload journals(?)
-         for (Journal j : journals)
+         for (Journal journal2 : journals)
          {
-            JournalImpl journal = (JournalImpl)j;
-            journal.finishRemoteBackupSync();
+            JournalImpl journal = (JournalImpl)journal2;
+            journal.writeLock();
+            try
+            {
+               if (journal.getDataFiles().length != 0)
+               {
+                  throw new IllegalStateException("Journal should not have any data files at this point");
+               }
+               // files should be already in place.
+               filesReservedForSync.remove(msg.getJournalContent());
+               // XXX HORNETQ-720 must reload journals
+               // XXX HORNETQ-720 must start using real journals
+            }
+            finally
+            {
+               journal.writeUnlock();
+            }
+
          }
          server.setRemoteBackupUpToDate();
          log.info("Backup server " + server + " is synchronized with live-server.");
          return;
       }
 
-      Journal journalIf = getJournal(msg.getJournalContent().typeByte);
-      JournalImpl journal = assertJournalImpl(journalIf);
-
       long id = msg.getFileId();
-      JournalFile journalFile = journal.getRemoteBackupSyncFile(id);
+      JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(Long.valueOf(id));
       byte[] data = msg.getData();
       if (data == null)
       {
@@ -401,7 +426,6 @@
          }
          sf.writeDirect(ByteBuffer.wrap(data), true);
       }
-      // journal.get
    }
 
    private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet) throws Exception
@@ -413,7 +437,7 @@
       final Journal journalIf = journals[packet.getJournalContentType().typeByte];
 
       JournalImpl journal = assertJournalImpl(journalIf);
-      journal.createFilesForRemoteSync(packet.getFileIds());
+      journal.createFilesForRemoteSync(packet.getFileIds(), filesReservedForSync.get(packet.getJournalContentType()));
    }
 
    private static JournalImpl assertJournalImpl(final Journal journalIf) throws HornetQException

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-08-04 15:26:34 UTC (rev 11119)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-08-04 15:28:12 UTC (rev 11120)
@@ -14,9 +14,7 @@
 package org.hornetq.core.journal.impl;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -65,8 +63,6 @@
 
    private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
 
-   private Map<Long, JournalFile> filesReservedForSync;
-
    private final AtomicLong nextFileID = new AtomicLong(0);
 
    private final int maxAIO;
@@ -447,14 +443,9 @@
       return nextFile;
    }
 
-   public void createRemoteBackupSyncFile(long fileID) throws Exception
+   public JournalFile createRemoteBackupSyncFile(long fileID) throws Exception
    {
-      if (filesReservedForSync == null)
-      {
-         filesReservedForSync = new HashMap<Long, JournalFile>();
-      }
-      assert !filesReservedForSync.containsKey(Long.valueOf(fileID));
-      filesReservedForSync.put(Long.valueOf(fileID), createFile(false, false, false, false, fileID));
+      return createFile(false, false, false, false, fileID);
    }
 
    // Package protected ---------------------------------------------
@@ -477,10 +468,8 @@
    {
       long fileID = fileIdPreSet != -1 ? fileIdPreSet : generateFileID();
 
-      String fileName;
+      final String fileName = createFileName(tmpCompact, fileID);
 
-      fileName = createFileName(tmpCompact, fileID);
-
       if (JournalFilesRepository.trace)
       {
          JournalFilesRepository.trace("Creating file " + fileName);
@@ -583,23 +572,4 @@
 
       return jf;
    }
-
-   /**
-    * @param id
-    * @return
-    */
-   public JournalFile getRemoteBackupSyncFile(long id)
-   {
-      return filesReservedForSync.get(Long.valueOf(id));
-   }
-
-   public Collection<? extends JournalFile> getSyncFiles()
-   {
-      return filesReservedForSync.values();
-   }
-
-   public void clearSyncFiles()
-   {
-      filesReservedForSync.clear();
-   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-08-04 15:26:34 UTC (rev 11119)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-08-04 15:28:12 UTC (rev 11120)
@@ -3199,7 +3199,7 @@
 
    }
 
-   private static class JournalFileComparator implements Comparator<JournalFile>
+   public static class JournalFileComparator implements Comparator<JournalFile>
    {
       public int compare(final JournalFile f1, final JournalFile f2)
       {
@@ -3273,7 +3273,7 @@
     * @param fileIds
     * @throws Exception
     */
-   public void createFilesForRemoteSync(long[] fileIds) throws Exception
+   public void createFilesForRemoteSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
    {
       writeLock();
       try
@@ -3283,7 +3283,7 @@
          for (long id : fileIds)
          {
             maxID = Math.max(maxID, id);
-            filesRepository.createRemoteBackupSyncFile(id);
+            map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
          }
          if (maxID > 0)
          {
@@ -3295,42 +3295,4 @@
          writeUnlock();
       }
    }
-
-   /**
-    * @param id
-    * @return
-    */
-   public JournalFile getRemoteBackupSyncFile(long id)
-   {
-      return filesRepository.getRemoteBackupSyncFile(id);
-   }
-
-   /**
-    *
-    */
-   public void finishRemoteBackupSync()
-   {
-      writeLock();
-      try
-      {
-         lockAppend.lock();
-         List<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>();
-         dataFilesToProcess.addAll(filesRepository.getDataFiles());
-         filesRepository.clearDataFiles();
-         dataFilesToProcess.addAll(filesRepository.getSyncFiles());
-         filesRepository.clearSyncFiles();
-         Collections.sort(dataFilesToProcess, new JournalFileComparator());
-         for (JournalFile file : dataFilesToProcess)
-         {
-            filesRepository.addDataFileOnTop(file);
-         }
-         // XXX HORNETQ-720 still missing a "reload" call?
-      }
-      finally
-      {
-         lockAppend.unlock();
-         writeUnlock();
-      }
-
-   }
 }

Added: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java	2011-08-04 15:28:12 UTC (rev 11120)
@@ -0,0 +1,29 @@
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+
+/**
+ * Journal used at a replicating backup server during the synchronization of data with the 'live'
+ * server.
+ */
+public class ReplicatingJournal extends JournalImpl
+{
+
+   /**
+    * @param fileSize
+    * @param minFiles
+    * @param compactMinFiles
+    * @param compactPercentage
+    * @param fileFactory
+    * @param filePrefix
+    * @param fileExtension
+    * @param maxAIO
+    */
+   public ReplicatingJournal(int fileSize, int minFiles, int compactMinFiles, int compactPercentage,
+                             SequentialFileFactory fileFactory, String filePrefix, String fileExtension, int maxAIO)
+   {
+      super(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO);
+   }
+
+
+}



More information about the hornetq-commits mailing list