Author: borges
Date: 2011-08-04 11:28:12 -0400 (Thu, 04 Aug 2011)
New Revision: 11120
Added:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
HORNETQ-720 Move the sync-files control into ReplicationEndpointImpl
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-04
15:26:34 UTC (rev 11119)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-04
15:28:12 UTC (rev 11120)
@@ -14,6 +14,9 @@
package org.hornetq.core.replication.impl;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -77,13 +80,17 @@
private Channel channel;
private Journal[] journals;
+ private JournalLoadInformation[] journalLoadInformation;
+ // Files reserved in each journal for synchronization of existing data from the
'live' server
+ private final Map<JournalContent, Map<Long, JournalFile>>
filesReservedForSync =
+ new HashMap<JournalContent, Map<Long, JournalFile>>();
private JournalStorageManager storage;
private PagingManager pageManager;
- private JournalLoadInformation[] journalLoadInformation;
+
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex =
new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
@@ -104,6 +111,7 @@
public void registerJournal(final byte id, final Journal journal)
{
+
if (journals == null || id >= journals.length)
{
Journal[] oldJournals = journals;
@@ -228,9 +236,14 @@
server.getManagementService().setStorageManager(storage);
+ registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
registerJournal(JournalContent.MESSAGES.typeByte, storage.getMessageJournal());
- registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
+ for (JournalContent jc : EnumSet.allOf(JournalContent.class))
+ {
+ filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
+ }
+
// We only need to load internal structures on the backup...
journalLoadInformation = storage.loadInternalOnly();
@@ -371,22 +384,34 @@
{
if (msg.isUpToDate())
{
- // XXX HORNETQ-720 must reload journals(?)
- for (Journal j : journals)
+ for (Journal journal2 : journals)
{
- JournalImpl journal = (JournalImpl)j;
- journal.finishRemoteBackupSync();
+ JournalImpl journal = (JournalImpl)journal2;
+ journal.writeLock();
+ try
+ {
+ if (journal.getDataFiles().length != 0)
+ {
+ throw new IllegalStateException("Journal should not have any data
files at this point");
+ }
+ // files should be already in place.
+ filesReservedForSync.remove(msg.getJournalContent());
+ // XXX HORNETQ-720 must reload journals
+ // XXX HORNETQ-720 must start using real journals
+ }
+ finally
+ {
+ journal.writeUnlock();
+ }
+
}
server.setRemoteBackupUpToDate();
log.info("Backup server " + server + " is synchronized with
live-server.");
return;
}
- Journal journalIf = getJournal(msg.getJournalContent().typeByte);
- JournalImpl journal = assertJournalImpl(journalIf);
-
long id = msg.getFileId();
- JournalFile journalFile = journal.getRemoteBackupSyncFile(id);
+ JournalFile journalFile =
filesReservedForSync.get(msg.getJournalContent()).get(Long.valueOf(id));
byte[] data = msg.getData();
if (data == null)
{
@@ -401,7 +426,6 @@
}
sf.writeDirect(ByteBuffer.wrap(data), true);
}
- // journal.get
}
private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet)
throws Exception
@@ -413,7 +437,7 @@
final Journal journalIf = journals[packet.getJournalContentType().typeByte];
JournalImpl journal = assertJournalImpl(journalIf);
- journal.createFilesForRemoteSync(packet.getFileIds());
+ journal.createFilesForRemoteSync(packet.getFileIds(),
filesReservedForSync.get(packet.getJournalContentType()));
}
private static JournalImpl assertJournalImpl(final Journal journalIf) throws
HornetQException
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-04
15:26:34 UTC (rev 11119)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-04
15:28:12 UTC (rev 11120)
@@ -14,9 +14,7 @@
package org.hornetq.core.journal.impl;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -65,8 +63,6 @@
private final BlockingQueue<JournalFile> openedFiles = new
LinkedBlockingQueue<JournalFile>();
- private Map<Long, JournalFile> filesReservedForSync;
-
private final AtomicLong nextFileID = new AtomicLong(0);
private final int maxAIO;
@@ -447,14 +443,9 @@
return nextFile;
}
- public void createRemoteBackupSyncFile(long fileID) throws Exception
+ public JournalFile createRemoteBackupSyncFile(long fileID) throws Exception
{
- if (filesReservedForSync == null)
- {
- filesReservedForSync = new HashMap<Long, JournalFile>();
- }
- assert !filesReservedForSync.containsKey(Long.valueOf(fileID));
- filesReservedForSync.put(Long.valueOf(fileID), createFile(false, false, false,
false, fileID));
+ return createFile(false, false, false, false, fileID);
}
// Package protected ---------------------------------------------
@@ -477,10 +468,8 @@
{
long fileID = fileIdPreSet != -1 ? fileIdPreSet : generateFileID();
- String fileName;
+ final String fileName = createFileName(tmpCompact, fileID);
- fileName = createFileName(tmpCompact, fileID);
-
if (JournalFilesRepository.trace)
{
JournalFilesRepository.trace("Creating file " + fileName);
@@ -583,23 +572,4 @@
return jf;
}
-
- /**
- * @param id
- * @return
- */
- public JournalFile getRemoteBackupSyncFile(long id)
- {
- return filesReservedForSync.get(Long.valueOf(id));
- }
-
- public Collection<? extends JournalFile> getSyncFiles()
- {
- return filesReservedForSync.values();
- }
-
- public void clearSyncFiles()
- {
- filesReservedForSync.clear();
- }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-04
15:26:34 UTC (rev 11119)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-04
15:28:12 UTC (rev 11120)
@@ -3199,7 +3199,7 @@
}
- private static class JournalFileComparator implements Comparator<JournalFile>
+ public static class JournalFileComparator implements Comparator<JournalFile>
{
public int compare(final JournalFile f1, final JournalFile f2)
{
@@ -3273,7 +3273,7 @@
* @param fileIds
* @throws Exception
*/
- public void createFilesForRemoteSync(long[] fileIds) throws Exception
+ public void createFilesForRemoteSync(long[] fileIds, Map<Long, JournalFile> map)
throws Exception
{
writeLock();
try
@@ -3283,7 +3283,7 @@
for (long id : fileIds)
{
maxID = Math.max(maxID, id);
- filesRepository.createRemoteBackupSyncFile(id);
+ map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
}
if (maxID > 0)
{
@@ -3295,42 +3295,4 @@
writeUnlock();
}
}
-
- /**
- * @param id
- * @return
- */
- public JournalFile getRemoteBackupSyncFile(long id)
- {
- return filesRepository.getRemoteBackupSyncFile(id);
- }
-
- /**
- *
- */
- public void finishRemoteBackupSync()
- {
- writeLock();
- try
- {
- lockAppend.lock();
- List<JournalFile> dataFilesToProcess = new
ArrayList<JournalFile>();
- dataFilesToProcess.addAll(filesRepository.getDataFiles());
- filesRepository.clearDataFiles();
- dataFilesToProcess.addAll(filesRepository.getSyncFiles());
- filesRepository.clearSyncFiles();
- Collections.sort(dataFilesToProcess, new JournalFileComparator());
- for (JournalFile file : dataFilesToProcess)
- {
- filesRepository.addDataFileOnTop(file);
- }
- // XXX HORNETQ-720 still missing a "reload" call?
- }
- finally
- {
- lockAppend.unlock();
- writeUnlock();
- }
-
- }
}
Added:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-04
15:28:12 UTC (rev 11120)
@@ -0,0 +1,29 @@
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+
+/**
+ * Journal used at a replicating backup server during the synchronization of data with
the 'live'
+ * server.
+ */
+public class ReplicatingJournal extends JournalImpl
+{
+
+ /**
+ * @param fileSize
+ * @param minFiles
+ * @param compactMinFiles
+ * @param compactPercentage
+ * @param fileFactory
+ * @param filePrefix
+ * @param fileExtension
+ * @param maxAIO
+ */
+ public ReplicatingJournal(int fileSize, int minFiles, int compactMinFiles, int
compactPercentage,
+ SequentialFileFactory fileFactory, String filePrefix, String
fileExtension, int maxAIO)
+ {
+ super(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory,
filePrefix, fileExtension, maxAIO);
+ }
+
+
+}