Author: borges
Date: 2011-08-04 11:30:21 -0400 (Thu, 04 Aug 2011)
New Revision: 11122
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Log:
HORNETQ-720 Avoid record counting in the journal during replication sync
- add new journal state "SYNC" as the Journal cannot be considered
"LOADED".
- rename R*SendFileIdMessage to R*StartSyncMessage as it does more than fileID
reservation
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-04
15:28:56 UTC (rev 11121)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-04
15:30:21 UTC (rev 11122)
@@ -447,7 +447,7 @@
{
jf.setCanReclaim(false);
}
- replicator.reserveFileIds(datafiles, contentType);
+ replicator.sendStartSyncMessage(datafiles, contentType);
return datafiles;
}
@@ -1674,7 +1674,6 @@
JournalLoadInformation[] info = new JournalLoadInformation[2];
info[0] = bindingsJournal.loadInternalOnly();
info[1] = messageJournal.loadInternalOnly();
-
return info;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-04
15:28:56 UTC (rev 11121)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-08-04
15:30:21 UTC (rev 11122)
@@ -106,7 +106,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -535,7 +535,7 @@
}
case PacketImpl.REPLICATION_FILE_ID:
{
- packet = new ReplicationFileIdMessage();
+ packet = new ReplicationStartSyncMessage();
break;
}
case PacketImpl.REPLICATION_SYNC:
Deleted:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java 2011-08-04
15:28:56 UTC (rev 11121)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java 2011-08-04
15:30:21 UTC (rev 11122)
@@ -1,69 +0,0 @@
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Sends all fileIDs used in the live server to the backup. This is done so that we:
- * <ol>
- * <li>reserve those IDs in the backup;
- * <li>start replicating while the journal synchronization is taking place.
- * </ol>
- */
-public class ReplicationFileIdMessage extends PacketImpl
-{
-
- private long[] ids;
- private JournalContent journalType;
-
- public ReplicationFileIdMessage()
- {
- super(REPLICATION_FILE_ID);
- }
-
- public ReplicationFileIdMessage(JournalFile[] datafiles, JournalContent contentType)
- {
- this();
- ids = new long[datafiles.length];
- for (int i = 0; i < datafiles.length; i++)
- {
- ids[i] = datafiles[i].getFileID();
- }
- journalType = contentType;
- }
-
- @Override
- public void encodeRest(final HornetQBuffer buffer)
- {
- buffer.writeByte(journalType.typeByte);
- buffer.writeInt(ids.length);
- for (long id : ids)
- {
- buffer.writeLong(id);
- }
- }
-
- @Override
- public void decodeRest(final HornetQBuffer buffer)
- {
- journalType = JournalContent.getType(buffer.readByte());
- int length = buffer.readInt();
- ids = new long[length];
- for (int i = 0; i < length; i++)
- {
- ids[i] = buffer.readLong();
- }
- }
-
- public JournalContent getJournalContentType()
- {
- return journalType;
- }
-
- public long[] getFileIds()
- {
- return ids;
- }
-}
Copied:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
(from rev 11121,
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java)
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-08-04
15:30:21 UTC (rev 11122)
@@ -0,0 +1,68 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Sends all fileIDs used in the live server to the backup. This is done so that we:
+ * <ol>
+ * <li>reserve those IDs in the backup;
+ * <li>start replicating while the journal synchronization is taking place.
+ * </ol>
+ */
+public class ReplicationStartSyncMessage extends PacketImpl
+{
+ private long[] ids;
+ private JournalContent journalType;
+
+ public ReplicationStartSyncMessage()
+ {
+ super(REPLICATION_FILE_ID);
+ }
+
+ public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent
contentType)
+ {
+ this();
+ ids = new long[datafiles.length];
+ for (int i = 0; i < datafiles.length; i++)
+ {
+ ids[i] = datafiles[i].getFileID();
+ }
+ journalType = contentType;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalType.typeByte);
+ buffer.writeInt(ids.length);
+ for (long id : ids)
+ {
+ buffer.writeLong(id);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ journalType = JournalContent.getType(buffer.readByte());
+ int length = buffer.readInt();
+ ids = new long[length];
+ for (int i = 0; i < length; i++)
+ {
+ ids[i] = buffer.readLong();
+ }
+ }
+
+ public JournalContent getJournalContentType()
+ {
+ return journalType;
+ }
+
+ public long[] getFileIds()
+ {
+ return ids;
+ }
+}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-08-04
15:28:56 UTC (rev 11121)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-08-04
15:30:21 UTC (rev 11122)
@@ -99,7 +99,7 @@
* @param contentType
* @throws HornetQException
*/
- void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws
HornetQException;
+ void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws
HornetQException;
/**
* Informs backup that data synchronization is done.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-04
15:28:56 UTC (rev 11121)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-04
15:30:21 UTC (rev 11122)
@@ -28,6 +28,7 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.ReplicatingJournal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
@@ -47,7 +48,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -56,6 +56,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
@@ -81,16 +82,18 @@
private Journal[] journals;
private JournalLoadInformation[] journalLoadInformation;
- // Files reserved in each journal for synchronization of existing data from the
'live' server
+
+ /** Files reserved in each journal for synchronization of existing data from the
'live' server. */
private final Map<JournalContent, Map<Long, JournalFile>>
filesReservedForSync =
new HashMap<JournalContent, Map<Long, JournalFile>>();
+ /** Used to hold the real Journals before the backup is synchronized. */
+ private final Map<JournalContent, Journal> journalsHolder = new
HashMap<JournalContent, Journal>();
+
private JournalStorageManager storage;
private PagingManager pageManager;
-
-
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex =
new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
@@ -191,7 +194,7 @@
}
else if (type == PacketImpl.REPLICATION_FILE_ID)
{
- handleJournalFileIdReservation((ReplicationFileIdMessage)packet);
+ handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
}
else if (type == PacketImpl.REPLICATION_SYNC)
{
@@ -236,11 +239,12 @@
server.getManagementService().setStorageManager(storage);
- registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
- registerJournal(JournalContent.MESSAGES.typeByte, storage.getMessageJournal());
+ journalsHolder.put(JournalContent.BINDINGS, storage.getBindingsJournal());
+ journalsHolder.put(JournalContent.MESSAGES, storage.getMessageJournal());
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
+ System.out.println("State? " + journalsHolder.get(jc));
filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
}
@@ -257,7 +261,7 @@
pageManager.start();
- started = true;
+ started = true;
}
@@ -389,9 +393,9 @@
{
if (msg.isUpToDate())
{
- for (Journal journal2 : journals)
+ for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
- JournalImpl journal = (JournalImpl)journal2;
+ JournalImpl journal = (JournalImpl)journalsHolder.get(jc);
journal.writeLock();
try
{
@@ -401,6 +405,7 @@
}
// files should be already in place.
filesReservedForSync.remove(msg.getJournalContent());
+ registerJournal(jc.typeByte, journalsHolder.get(jc));
// XXX HORNETQ-720 must reload journals
// XXX HORNETQ-720 must start using real journals
}
@@ -417,6 +422,7 @@
long id = msg.getFileId();
JournalFile journalFile =
filesReservedForSync.get(msg.getJournalContent()).get(Long.valueOf(id));
+
byte[] data = msg.getData();
if (data == null)
{
@@ -433,18 +439,27 @@
}
}
- private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet)
throws Exception
+ /**
+ * Reserves files (with the given fileID) in the specified journal, and places a
+ * {@link ReplicatingJournal} in place to store messages while synchronization is
going on.
+ * @param packet
+ * @throws Exception
+ */
+ private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage
packet) throws Exception
{
if (server.isRemoteBackupUpToDate())
{
throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup
can not be up-to-date!");
}
- final Journal journalIf = journals[packet.getJournalContentType().typeByte];
+ final Journal journalIf = journalsHolder.get(packet.getJournalContentType());
JournalImpl journal = assertJournalImpl(journalIf);
- journal.createFilesForRemoteSync(packet.getFileIds(),
filesReservedForSync.get(packet.getJournalContentType()));
+ Map<Long, JournalFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
+ JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(),
mapToFill);
+ registerJournal(packet.getJournalContentType().typeByte, new
ReplicatingJournal(current));
}
+ // XXX HORNETQ-720 really need to do away with this once the method calls get stable.
private static JournalImpl assertJournalImpl(final Journal journalIf) throws
HornetQException
{
if (!(journalIf instanceof JournalImpl))
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-04
15:28:56 UTC (rev 11121)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-04
15:30:21 UTC (rev 11122)
@@ -44,7 +44,6 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -52,6 +51,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.utils.ExecutorFactory;
@@ -509,7 +509,7 @@
public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
{
SequentialFile file = jf.getFile().copy();
- log.info("Replication: sending " + jf + " to backup. " +
file);
+ log.info("Replication: sending " + jf + " (size=" + file.size()
+ ") to backup. " + file);
if (!file.isOpen())
{
file.open(1, false);
@@ -531,9 +531,9 @@
}
@Override
- public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws
HornetQException
+ public void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
throws HornetQException
{
- sendReplicatePacket(new ReplicationFileIdMessage(datafiles, contentType));
+ sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType));
}
@Override
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-04
15:28:56 UTC (rev 11121)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-04
15:30:21 UTC (rev 11122)
@@ -565,7 +565,7 @@
if (liveServerSessionFactory == null)
{
- // XXX
+ // XXX HORNETQ-720
throw new RuntimeException("Need to retry...");
}
CoreRemotingConnection liveConnection =
liveServerSessionFactory.getConnection();
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-04
15:28:56 UTC (rev 11121)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-04
15:30:21 UTC (rev 11122)
@@ -77,7 +77,9 @@
private enum JournalState
{
- STOPPED, STARTED, LOADED;
+ STOPPED, STARTED,
+ /** When a replicating server is still not synchronized with its live. */
+ SYNCING, LOADED;
}
// Constants -----------------------------------------------------
@@ -100,7 +102,7 @@
// Journal
private static final void trace(final String message)
{
- JournalImpl.log.trace(message);
+ JournalImpl.log.info(message);
}
private static final void traceRecord(final String message)
@@ -304,6 +306,12 @@
this.userVersion = userVersion;
}
+ @Override
+ public String toString()
+ {
+ return super.toString() + " " + state;
+ }
+
public void runDirectJournalBlast() throws Exception
{
final int numIts = 100000000;
@@ -411,7 +419,6 @@
// since we can re-use dataFiles
Collections.sort(orderedFiles, new JournalFileComparator());
-
return orderedFiles;
}
@@ -1425,6 +1432,7 @@
}
}
+ // XXX make it protected?
public int getAlignment() throws Exception
{
return fileFactory.getAlignment();
@@ -1458,7 +1466,7 @@
}
};
- return this.load(dummyLoader);
+ return this.load(dummyLoader, true, true);
}
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
@@ -1765,9 +1773,9 @@
localCompactor.replayPendingCommands();
- // Merge transactions back after compacting
- // This has to be done after the replay pending commands, as we need to
delete committs that happened during
- // the compacting
+ // Merge transactions back after compacting.
+ // This has to be done after the replay pending commands, as we need to
delete commits
+ // that happened during the compacting
for (JournalTransaction newTransaction :
localCompactor.getNewTransactions().values())
{
@@ -1868,11 +1876,20 @@
return load(loadManager, true);
}
- public synchronized JournalLoadInformation load(final LoaderCallback loadManager,
boolean fixFailingTransactions) throws Exception
+ public JournalLoadInformation load(final LoaderCallback loadManager, boolean
fixFailingTransactions)
+ throws Exception
{
+ return load(loadManager, fixFailingTransactions, false);
+ }
+
+ private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
boolean fixFailingTransactions,
+ final boolean replicationSync) throws Exception
+ {
+
if (state != JournalState.STARTED)
{
- throw new IllegalStateException("Journal " + this + " must be in
started state, was " + state);
+ throw new IllegalStateException("Journal " + this + " must be in
" + JournalState.STARTED + " state, was " +
+ state);
}
checkControlFile();
@@ -2162,6 +2179,13 @@
}
}
+ if (replicationSync)
+ {
+ assert filesRepository.getDataFiles().isEmpty();
+ setJournalState(JournalState.SYNCING);
+ return new JournalLoadInformation(0, -1);
+ }
+
// Create any more files we need
filesRepository.ensureMinFiles();
@@ -3144,11 +3168,6 @@
}
}
- private HornetQBuffer newBuffer(final int size)
- {
- return HornetQBuffers.fixedBuffer(size);
- }
-
// Inner classes
// ---------------------------------------------------------------------------
@@ -3157,11 +3176,6 @@
private static NullEncoding instance = new NullEncoding();
- public static NullEncoding getInstance()
- {
- return NullEncoding.instance;
- }
-
public void decode(final HornetQBuffer buffer)
{
}
@@ -3271,9 +3285,10 @@
/**
* @param fileIds
+ * @return
* @throws Exception
*/
- public void createFilesForRemoteSync(long[] fileIds, Map<Long, JournalFile> map)
throws Exception
+ public JournalFile createFilesForRemoteSync(long[] fileIds, Map<Long,
JournalFile> map) throws Exception
{
writeLock();
try
@@ -3285,14 +3300,18 @@
maxID = Math.max(maxID, id);
map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
}
- if (maxID > 0)
- {
- filesRepository.setNextFileID(maxID);
- }
+ maxID += 1;
+ filesRepository.setNextFileID(maxID);
+ return filesRepository.createRemoteBackupSyncFile(maxID);
}
finally
{
writeUnlock();
}
}
+
+ public boolean getAutoReclaim()
+ {
+ return autoReclaim;
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-04
15:28:56 UTC (rev 11121)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-04
15:30:21 UTC (rev 11122)
@@ -1,29 +1,277 @@
package org.hornetq.core.journal.impl;
-import org.hornetq.core.journal.SequentialFileFactory;
+import java.util.List;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.TransactionFailureCallback;
+
/**
* Journal used at a replicating backup server during the synchronization of data with
the 'live'
* server.
+ * <p>
+ * Its main purpose is to store the data like a Journal would but without verifying
records.
*/
-public class ReplicatingJournal extends JournalImpl
+public class ReplicatingJournal implements Journal
{
+ private final JournalFile file;
+
/**
- * @param fileSize
- * @param minFiles
- * @param compactMinFiles
- * @param compactPercentage
- * @param fileFactory
- * @param filePrefix
- * @param fileExtension
- * @param maxAIO
+ * @param file
*/
- public ReplicatingJournal(int fileSize, int minFiles, int compactMinFiles, int
compactPercentage,
- SequentialFileFactory fileFactory, String filePrefix, String
fileExtension, int maxAIO)
+ public ReplicatingJournal(JournalFile file)
{
- super(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory,
filePrefix, fileExtension, maxAIO);
+ this.file = file;
}
+ @Override
+ public void start() throws Exception
+ {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void stop() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public boolean isStarted()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync)
throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback)
+
throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean
sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync)
throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void
+ appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync,
IOCompletion completionCallback)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendUpdateRecord(long id, byte recordType, EncodingSupport record,
boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendUpdateRecord(long id, byte recordType, EncodingSupport record,
boolean sync,
+ IOCompletion completionCallback) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendDeleteRecord(long id, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback)
throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[]
record) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendAddRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
byte[] record) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendUpdateRecordTransactional(long txID, long id, byte recordType,
EncodingSupport record)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws
Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport
record) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendDeleteRecordTransactional(long txID, long id) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendCommitRecord(long txID, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws
Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void
+ appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean
lineUpContext) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean
sync, IOCompletion callback)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync)
throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void
+ appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion callback)
+
throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendRollbackRecord(long txID, boolean sync) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback)
throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalLoadInformation loadInternalOnly() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void lineUpContex(IOCompletion callback)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalLoadInformation load(List<RecordInfo> committedRecords,
+ List<PreparedTransactionInfo> preparedTransactions,
TransactionFailureCallback transactionFailure)
+ throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getAlignment() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getNumberOfRecords()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getUserVersion()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void perfBlast(int pages)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void runDirectJournalBlast() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
}