Author: borges
Date: 2011-08-09 06:27:51 -0400 (Tue, 09 Aug 2011)
New Revision: 11158
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Log:
HORNETQ-720 More support for replication during sync.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09
10:26:46 UTC (rev 11157)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09
10:27:51 UTC (rev 11158)
@@ -456,6 +456,7 @@
JournalImpl journal = assertJournalImpl(journalIf);
Map<Long, JournalFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(),
mapToFill);
+ current.getFile().open(1, false);
registerJournal(packet.getJournalContentType().typeByte,
new ReplicatingJournal(current, storage.hasCallbackSupport()));
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-09
10:26:46 UTC (rev 11157)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-09
10:27:51 UTC (rev 11158)
@@ -36,7 +36,7 @@
public ReplicatingJournal(JournalFile file, boolean hasCallbackSupport)
{
super(hasCallbackSupport);
- this.currentFile = file;
+ currentFile = file;
}
@Override
@@ -77,36 +77,39 @@
{
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType,
record);
- if (callback != null)
- {
- callback.storeLineUp();
- }
+ writeRecord(addRecord, sync, callback);
- lockAppend.lock();
- try
- {
- appendRecord(addRecord, sync, callback);
- }
- finally
- {
- lockAppend.unlock();
- }
}
/**
* Write the record to the current file.
*/
- private void appendRecord(JournalInternalRecord encoder, boolean sync, IOCompletion
callback) throws Exception
+ private void writeRecord(JournalInternalRecord encoder, boolean sync, IOCompletion
callback) throws Exception
{
- encoder.setFileID(currentFile.getRecordID());
- if (callback != null)
+
+ lockAppend.lock();
+ try
{
- currentFile.getFile().write(encoder, sync, callback);
+ if (callback != null)
+ {
+ callback.storeLineUp();
+ }
+
+ encoder.setFileID(currentFile.getRecordID());
+
+ if (callback != null)
+ {
+ currentFile.getFile().write(encoder, sync, callback);
+ }
+ else
+ {
+ currentFile.getFile().write(encoder, sync);
+ }
}
- else
+ finally
{
- currentFile.getFile().write(encoder, sync);
+ lockAppend.unlock();
}
}
@@ -134,7 +137,8 @@
appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean
sync, IOCompletion callback)
throws Exception
{
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType,
record);
+ writeRecord(updateRecord, sync, callback);
}
@Override