[jboss-cvs] JBoss Messaging SVN: r7518 - trunk/src/main/org/jboss/messaging/core/journal/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 2 19:00:15 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-07-02 19:00:14 -0400 (Thu, 02 Jul 2009)
New Revision: 7518
Modified:
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
Just formatting.. no code changes at this commit.
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-02 22:02:47 UTC (rev 7517)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-02 23:00:14 UTC (rev 7518)
@@ -207,7 +207,7 @@
// This will be set only while the JournalCompactor is being executed
private volatile JournalCompactor compactor;
-
+
// Latch used to wait compactor finish, to make sure we won't stop the journal with the compactor running
private final VariableLatch compactorWait = new VariableLatch();
@@ -298,6 +298,180 @@
// Public methods (used by package members such as JournalCompactor) (these methods are not part of the JournalImpl
// interface)
+ /**
+ * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
+ * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
+ * (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
+ * <p>The element-summary will then have</p>
+ * <p>FileID1, 10</p>
+ * <p>FileID2, 10</p>
+ * <p>FileID3, 10</p>
+ *
+ * <br>
+ * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
+ * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
+ * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
+ * That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
+ *
+ * @param recordType
+ * @param txID
+ * @param tx
+ * @param transactionData
+ * @return
+ * @throws Exception
+ */
+ public static void writeTransaction(final int fileID,
+ final byte recordType,
+ final long txID,
+ final JournalTransaction tx,
+ final EncodingSupport transactionData,
+ final int size,
+ final int numberOfRecords,
+ final ChannelBuffer bb) throws Exception
+ {
+ bb.writeByte(recordType);
+ bb.writeInt(fileID); // skip ID part
+ bb.writeLong(txID);
+ bb.writeInt(numberOfRecords); // skip number of pendingTransactions part
+
+ if (transactionData != null)
+ {
+ bb.writeInt(transactionData.getEncodeSize());
+ }
+
+ if (transactionData != null)
+ {
+ transactionData.encode(bb);
+ }
+
+ bb.writeInt(size);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ * @param bb
+ */
+ public static void writeUpdateRecordTX(final int fileID,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final int size,
+ final ChannelBuffer bb)
+ {
+ bb.writeByte(UPDATE_RECORD_TX);
+ bb.writeInt(fileID);
+ bb.writeLong(txID);
+ bb.writeLong(id);
+ bb.writeInt(record.getEncodeSize());
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ * @param bb
+ */
+ public static void writeUpdateRecord(final int fileId,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final int size,
+ final ChannelBuffer bb)
+ {
+ bb.writeByte(UPDATE_RECORD);
+ bb.writeInt(fileId);
+ bb.writeLong(id);
+ bb.writeInt(record.getEncodeSize());
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ * @param bb
+ */
+ public static void writeAddRecord(final int fileId,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final int size,
+ final ChannelBuffer bb)
+ {
+ bb.writeByte(ADD_RECORD);
+ bb.writeInt(fileId);
+ bb.writeLong(id);
+ bb.writeInt(record.getEncodeSize());
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param record
+ * @param size
+ * @param bb
+ */
+ public static void writeDeleteRecordTransactional(final int fileID,
+ final long txID,
+ final long id,
+ final EncodingSupport record,
+ final int size,
+ final ChannelBuffer bb)
+ {
+ bb.writeByte(DELETE_RECORD_TX);
+ bb.writeInt(fileID);
+ bb.writeLong(txID);
+ bb.writeLong(id);
+ bb.writeInt(record != null ? record.getEncodeSize() : 0);
+ if (record != null)
+ {
+ record.encode(bb);
+ }
+ bb.writeInt(size);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @param recordLength
+ * @param size
+ * @param bb
+ */
+ public static void writeAddRecordTX(final int fileID,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final int size,
+ final ChannelBuffer bb)
+ {
+ bb.writeByte(ADD_RECORD_TX);
+ bb.writeInt(fileID);
+ bb.writeLong(txID);
+ bb.writeLong(id);
+ bb.writeInt(record.getEncodeSize());
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
+ }
+
public Map<Long, JournalRecord> getRecords()
{
return records;
@@ -310,9 +484,334 @@
public JournalCompactor getCompactor()
{
- return this.compactor;
+ return compactor;
}
+ public static int readJournalFile(final SequentialFileFactory fileFactory,
+ final JournalFile file,
+ final JournalReaderCallback reader) throws Exception
+ {
+
+ file.getFile().open(1);
+ ByteBuffer wholeFileBuffer = null;
+ try
+ {
+
+ wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
+
+ final int journalFileSize = file.getFile().read(wholeFileBuffer);
+
+ if (journalFileSize != file.getFile().size())
+ {
+ throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
+ }
+
+ wholeFileBuffer.position(0);
+
+ // First long is the ordering timestamp, we just jump its position
+ wholeFileBuffer.position(SIZE_HEADER);
+
+ int lastDataPos = SIZE_HEADER;
+
+ while (wholeFileBuffer.hasRemaining())
+ {
+ final int pos = wholeFileBuffer.position();
+
+ byte recordType = wholeFileBuffer.get();
+
+ if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+ {
+ // I - We scan for any valid record on the file. If a hole
+ // happened on the middle of the file we keep looking until all
+ // the possibilities are gone
+ continue;
+ }
+
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
+ {
+ reader.markAsDataFile(file);
+
+ wholeFileBuffer.position(pos + 1);
+ // II - Ignore this record, lets keep looking
+ continue;
+ }
+
+ // III - Every record has the file-id.
+ // This is what supports us from not re-filling the whole file
+ int readFileId = wholeFileBuffer.getInt();
+
+ long transactionID = 0;
+
+ if (isTransaction(recordType))
+ {
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+ {
+ wholeFileBuffer.position(pos + 1);
+ reader.markAsDataFile(file);
+ continue;
+ }
+
+ transactionID = wholeFileBuffer.getLong();
+ }
+
+ long recordID = 0;
+
+ // If prepare or commit
+ if (!isCompleteTransaction(recordType))
+ {
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+ {
+ wholeFileBuffer.position(pos + 1);
+ reader.markAsDataFile(file);
+ continue;
+ }
+
+ recordID = wholeFileBuffer.getLong();
+ }
+
+ // We use the size of the record to validate the health of the
+ // record.
+ // (V) We verify the size of the record
+
+ // The variable record portion used on Updates and Appends
+ int variableSize = 0;
+
+ // Used to hold extra data on transaction prepares
+ int preparedTransactionExtraDataSize = 0;
+
+ byte userRecordType = 0;
+
+ byte record[] = null;
+
+ if (isContainsBody(recordType))
+ {
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
+ {
+ wholeFileBuffer.position(pos + 1);
+ reader.markAsDataFile(file);
+ continue;
+ }
+
+ variableSize = wholeFileBuffer.getInt();
+
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize))
+ {
+ wholeFileBuffer.position(pos + 1);
+ continue;
+ }
+
+ if (recordType != DELETE_RECORD_TX)
+ {
+ userRecordType = wholeFileBuffer.get();
+ }
+
+ record = new byte[variableSize];
+
+ wholeFileBuffer.get(record);
+ }
+
+ // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
+ // currentFile
+ int transactionCheckNumberOfRecords = 0;
+
+ if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+ {
+ if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
+ {
+ wholeFileBuffer.position(pos + 1);
+ continue;
+ }
+
+ transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
+
+ if (recordType == PREPARE_RECORD)
+ {
+ // Add the variable size required for preparedTransactions
+ preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
+ }
+ variableSize = 0;
+ }
+
+ int recordSize = getRecordSize(recordType);
+
+ // VI - this is completing V, We will validate the size at the end
+ // of the record,
+ // But we avoid buffer overflows by damaged data
+ if (isInvalidSize(journalFileSize, pos, recordSize + variableSize + preparedTransactionExtraDataSize))
+ {
+ // Avoid a buffer overflow caused by damaged data... continue
+ // scanning for more pendingTransactions...
+ trace("Record at position " + pos +
+ " recordType = " +
+ recordType +
+ " file:" +
+ file.getFile().getFileName() +
+ " recordSize: " +
+ recordSize +
+ " variableSize: " +
+ variableSize +
+ " preparedTransactionExtraDataSize: " +
+ preparedTransactionExtraDataSize +
+ " is corrupted and it is being ignored (II)");
+ // If a file has damaged pendingTransactions, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile(file);
+ wholeFileBuffer.position(pos + 1);
+
+ continue;
+ }
+
+ int oldPos = wholeFileBuffer.position();
+
+ wholeFileBuffer.position(pos + variableSize +
+ recordSize +
+ preparedTransactionExtraDataSize -
+ DataConstants.SIZE_INT);
+
+ int checkSize = wholeFileBuffer.getInt();
+
+ // VII - The checkSize at the end has to match with the size
+ // informed at the beggining.
+ // This is like testing a hash for the record. (We could replace the
+ // checkSize by some sort of calculated hash)
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+ {
+ trace("Record at position " + pos +
+ " recordType = " +
+ recordType +
+ " file:" +
+ file.getFile().getFileName() +
+ " is corrupted and it is being ignored (III)");
+
+ // If a file has damaged pendingTransactions, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile(file);
+
+ wholeFileBuffer.position(pos + DataConstants.SIZE_BYTE);
+
+ continue;
+ }
+
+ // This record is from a previous file-usage. The file was
+ // reused and we need to ignore this record
+ if (readFileId != file.getFileID())
+ {
+ // If a file has damaged pendingTransactions, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile(file);
+
+ continue;
+ }
+
+ wholeFileBuffer.position(oldPos);
+
+ // At this point everything is checked. So we relax and just load
+ // the data now.
+
+ switch (recordType)
+ {
+ case ADD_RECORD:
+ {
+ reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
+ break;
+ }
+
+ case UPDATE_RECORD:
+ {
+ reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
+ break;
+ }
+
+ case DELETE_RECORD:
+ {
+ reader.onReadDeleteRecord(recordID);
+ break;
+ }
+
+ case ADD_RECORD_TX:
+ {
+ reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+ break;
+ }
+
+ case UPDATE_RECORD_TX:
+ {
+ reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+ break;
+ }
+
+ case DELETE_RECORD_TX:
+ {
+ reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+ break;
+ }
+
+ case PREPARE_RECORD:
+ {
+
+ byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+ wholeFileBuffer.get(extraData);
+
+ reader.onReadPrepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
+
+ break;
+ }
+ case COMMIT_RECORD:
+ {
+
+ reader.onReadCommitRecord(transactionID, transactionCheckNumberOfRecords);
+ break;
+ }
+ case ROLLBACK_RECORD:
+ {
+ reader.onReadRollbackRecord(transactionID);
+ break;
+ }
+ default:
+ {
+ throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+ " is corrupt, invalid record type " +
+ recordType);
+ }
+ }
+
+ checkSize = wholeFileBuffer.getInt();
+
+ // This is a sanity check about the loading code itself.
+ // If this checkSize doesn't match, it means the reading method is
+ // not doing what it was supposed to do
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+ {
+ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
+ ", pos = " +
+ pos);
+ }
+
+ lastDataPos = wholeFileBuffer.position();
+
+ }
+
+ return lastDataPos;
+ }
+
+ finally
+ {
+ if (wholeFileBuffer != null)
+ {
+ fileFactory.releaseBuffer(wholeFileBuffer);
+ }
+
+ try
+ {
+ file.getFile().close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
// Journal implementation
// ----------------------------------------------------------------
@@ -825,6 +1324,11 @@
}
+ public int getAlignment() throws Exception
+ {
+ return fileFactory.getAlignment();
+ }
+
/**
* @see JournalImpl#load(LoaderCallback)
*/
@@ -836,22 +1340,22 @@
long maxID = load(new LoaderCallback()
{
- public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+ public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
{
preparedTransactions.add(preparedTransaction);
}
- public void addRecord(RecordInfo info)
+ public void addRecord(final RecordInfo info)
{
records.add(info);
}
- public void updateRecord(RecordInfo info)
+ public void updateRecord(final RecordInfo info)
{
records.add(info);
}
- public void deleteRecord(long id)
+ public void deleteRecord(final long id)
{
recordsToDelete.add(id);
}
@@ -903,10 +1407,7 @@
dataFiles.clear();
- this.compactor = new JournalCompactor(fileFactory,
- this,
- this.records.keySet(),
- dataFilesToProcess.get(0).getFileID());
+ compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
{
@@ -916,7 +1417,7 @@
// We will calculate the new records during compacting, what will take the position the records will take
// after compacting
- this.records.clear();
+ records.clear();
}
finally
{
@@ -961,7 +1462,7 @@
try
{
// Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
- this.compactor = null;
+ compactor = null;
newDatafiles = localCompactor.getNewDataFiles();
@@ -996,7 +1497,7 @@
{
trace("Merging pending transaction " + newTransaction + " after compacting to the journal");
}
- JournalTransaction liveTransaction = this.transactions.get(newTransaction.getId());
+ JournalTransaction liveTransaction = transactions.get(newTransaction.getId());
if (liveTransaction == null)
{
log.warn("Inconsistency: Can't merge transaction " + newTransaction.getId() +
@@ -1038,56 +1539,6 @@
}
- protected void deleteControlFile(SequentialFile controlFile) throws Exception
- {
- controlFile.delete();
- }
-
- /** being protected as testcases can override this method */
- protected void renameFiles(List<JournalFile> oldFiles, List<JournalFile> newFiles) throws Exception
- {
- for (JournalFile file : oldFiles)
- {
- dataFiles.remove(file);
- freeFiles.add(reinitializeFile(file));
- }
-
- for (JournalFile file : newFiles)
- {
- String newName = file.getFile().getFileName();
- newName = newName.substring(0, newName.lastIndexOf(".cmp"));
- file.getFile().renameTo(newName);
- }
-
- }
-
- /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
- protected void onCompactDone()
- {
- }
-
- /**
- * @throws Exception
- */
- protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
- {
- return JournalCompactor.writeControlFile(fileFactory, files, newFiles);
- }
-
- private static boolean isInvalidSize(int fileSize, int bufferPos, int size)
- {
- if (size < 0)
- {
- return true;
- }
- else
- {
- final int position = bufferPos + size;
- return position > fileSize || position < 0;
-
- }
- }
-
/**
* <p>Load data accordingly to the record layouts</p>
*
@@ -1149,7 +1600,7 @@
int resultLastPost = readJournalFile(fileFactory, file, new JournalReaderCallback()
{
- public void onReadAddRecord(RecordInfo info) throws Exception
+ public void onReadAddRecord(final RecordInfo info) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1162,7 +1613,7 @@
records.put(info.id, new JournalRecord(file, info.data.length + SIZE_ADD_RECORD));
}
- public void onReadUpdateRecord(RecordInfo info) throws Exception
+ public void onReadUpdateRecord(final RecordInfo info) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1184,7 +1635,7 @@
}
}
- public void onReadDeleteRecord(long recordID) throws Exception
+ public void onReadDeleteRecord(final long recordID) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1202,12 +1653,12 @@
}
}
- public void onReadUpdateRecordTX(long transactionID, RecordInfo info) throws Exception
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
onReadAddRecordTX(transactionID, info);
}
- public void onReadAddRecordTX(long transactionID, RecordInfo info) throws Exception
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
if (trace && LOAD_TRACE)
@@ -1240,7 +1691,7 @@
tnp.addPositive(file, info.id, info.data.length + SIZE_ADD_RECORD_TX);
}
- public void onReadDeleteRecordTX(long transactionID, RecordInfo info) throws Exception
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1273,7 +1724,7 @@
}
- public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1318,7 +1769,7 @@
}
}
- public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1379,7 +1830,7 @@
}
- public void onReadRollbackRecord(long transactionID) throws Exception
+ public void onReadRollbackRecord(final long transactionID) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1409,7 +1860,7 @@
}
}
- public void markAsDataFile(JournalFile file)
+ public void markAsDataFile(final JournalFile file)
{
if (trace && LOAD_TRACE)
{
@@ -1516,67 +1967,42 @@
return maxID;
}
- /**
- * @return
- * @throws Exception
- */
- private void checkControlFile() throws Exception
+ protected void deleteControlFile(final SequentialFile controlFile) throws Exception
{
- ArrayList<String> dataFiles = new ArrayList<String>();
- ArrayList<String> newFiles = new ArrayList<String>();
+ controlFile.delete();
+ }
- SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles);
- if (controlFile != null)
+ /** being protected as testcases can override this method */
+ protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
+ {
+ for (JournalFile file : oldFiles)
{
- log.info("Journal Compactor was interrupted during renaming phase, renaming files");
-
- for (String dataFile : dataFiles)
- {
- SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
- log.info("Removing old compacted file" + file.getFileName());
- if (file.exists())
- {
- file.delete();
- }
- }
-
- for (String newFile : newFiles)
- {
- SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
- log.info("Renaming file " + file.getFileName() + " as an part of the data files");
- if (file.exists())
- {
- final String originalName = file.getFileName();
- final String newName = originalName.substring(0, originalName.lastIndexOf(".cmp"));
- file.renameTo(newName);
- }
- }
-
- controlFile.delete();
+ dataFiles.remove(file);
+ freeFiles.add(reinitializeFile(file));
}
- List<String> leftFiles = fileFactory.listFiles(this.getFileExtension() + ".cmp");
-
- if (leftFiles.size() > 0)
+ for (JournalFile file : newFiles)
{
- log.warn("Compacted files were left unnatended on journal directory, deleting invalid files now");
-
- for (String fileToDelete : leftFiles)
- {
- log.warn("Deleting unnatended file " + fileToDelete);
- SequentialFile file = fileFactory.createSequentialFile(fileToDelete, 1);
- file.delete();
- }
+ String newName = file.getFile().getFileName();
+ newName = newName.substring(0, newName.lastIndexOf(".cmp"));
+ file.getFile().renameTo(newName);
}
- return;
}
- public int getAlignment() throws Exception
+ /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
+ protected void onCompactDone()
{
- return fileFactory.getAlignment();
}
+ /**
+ * @throws Exception
+ */
+ protected SequentialFile createControlFile(final List<JournalFile> files, final List<JournalFile> newFiles) throws Exception
+ {
+ return JournalCompactor.writeControlFile(fileFactory, files, newFiles);
+ }
+
// TestableJournal implementation
// --------------------------------------------------------------
@@ -1744,7 +2170,7 @@
totalLiveSize +
", margin to start compacting = " +
compactMargin);
-
+
compactorWait.up();
// We can't use the executor for the compacting... or we would lock files opening and creation (besides other
@@ -1885,7 +2311,7 @@
{
log.warn("Couldn't stop journal executor after 60 seconds");
}
-
+
while (!compactorWait.waitCompletion(60000))
{
log.warn("Waiting the compactor to finish its operations");
@@ -1965,329 +2391,6 @@
return jf;
}
- public static int readJournalFile(SequentialFileFactory fileFactory, JournalFile file, JournalReaderCallback reader) throws Exception
- {
-
- file.getFile().open(1);
- ByteBuffer wholeFileBuffer = null;
- try
- {
-
- wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
-
- final int journalFileSize = file.getFile().read(wholeFileBuffer);
-
- if (journalFileSize != file.getFile().size())
- {
- throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
- }
-
- wholeFileBuffer.position(0);
-
- // First long is the ordering timestamp, we just jump its position
- wholeFileBuffer.position(SIZE_HEADER);
-
- int lastDataPos = SIZE_HEADER;
-
- while (wholeFileBuffer.hasRemaining())
- {
- final int pos = wholeFileBuffer.position();
-
- byte recordType = wholeFileBuffer.get();
-
- if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
- {
- // I - We scan for any valid record on the file. If a hole
- // happened on the middle of the file we keep looking until all
- // the possibilities are gone
- continue;
- }
-
- if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
- {
- reader.markAsDataFile(file);
-
- wholeFileBuffer.position(pos + 1);
- // II - Ignore this record, lets keep looking
- continue;
- }
-
- // III - Every record has the file-id.
- // This is what supports us from not re-filling the whole file
- int readFileId = wholeFileBuffer.getInt();
-
- long transactionID = 0;
-
- if (isTransaction(recordType))
- {
- if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
- {
- wholeFileBuffer.position(pos + 1);
- reader.markAsDataFile(file);
- continue;
- }
-
- transactionID = wholeFileBuffer.getLong();
- }
-
- long recordID = 0;
-
- // If prepare or commit
- if (!isCompleteTransaction(recordType))
- {
- if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
- {
- wholeFileBuffer.position(pos + 1);
- reader.markAsDataFile(file);
- continue;
- }
-
- recordID = wholeFileBuffer.getLong();
- }
-
- // We use the size of the record to validate the health of the
- // record.
- // (V) We verify the size of the record
-
- // The variable record portion used on Updates and Appends
- int variableSize = 0;
-
- // Used to hold extra data on transaction prepares
- int preparedTransactionExtraDataSize = 0;
-
- byte userRecordType = 0;
-
- byte record[] = null;
-
- if (isContainsBody(recordType))
- {
- if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
- {
- wholeFileBuffer.position(pos + 1);
- reader.markAsDataFile(file);
- continue;
- }
-
- variableSize = wholeFileBuffer.getInt();
-
- if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize))
- {
- wholeFileBuffer.position(pos + 1);
- continue;
- }
-
- if (recordType != DELETE_RECORD_TX)
- {
- userRecordType = wholeFileBuffer.get();
- }
-
- record = new byte[variableSize];
-
- wholeFileBuffer.get(record);
- }
-
- // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
- // currentFile
- int transactionCheckNumberOfRecords = 0;
-
- if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
- {
- if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
- {
- wholeFileBuffer.position(pos + 1);
- continue;
- }
-
- transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
-
- if (recordType == PREPARE_RECORD)
- {
- // Add the variable size required for preparedTransactions
- preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
- }
- variableSize = 0;
- }
-
- int recordSize = getRecordSize(recordType);
-
- // VI - this is completing V, We will validate the size at the end
- // of the record,
- // But we avoid buffer overflows by damaged data
- if (isInvalidSize(journalFileSize, pos, recordSize + variableSize + preparedTransactionExtraDataSize))
- {
- // Avoid a buffer overflow caused by damaged data... continue
- // scanning for more pendingTransactions...
- trace("Record at position " + pos +
- " recordType = " +
- recordType +
- " file:" +
- file.getFile().getFileName() +
- " recordSize: " +
- recordSize +
- " variableSize: " +
- variableSize +
- " preparedTransactionExtraDataSize: " +
- preparedTransactionExtraDataSize +
- " is corrupted and it is being ignored (II)");
- // If a file has damaged pendingTransactions, we make it a dataFile, and the
- // next reclaiming will fix it
- reader.markAsDataFile(file);
- wholeFileBuffer.position(pos + 1);
-
- continue;
- }
-
- int oldPos = wholeFileBuffer.position();
-
- wholeFileBuffer.position(pos + variableSize +
- recordSize +
- preparedTransactionExtraDataSize -
- DataConstants.SIZE_INT);
-
- int checkSize = wholeFileBuffer.getInt();
-
- // VII - The checkSize at the end has to match with the size
- // informed at the beggining.
- // This is like testing a hash for the record. (We could replace the
- // checkSize by some sort of calculated hash)
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
- trace("Record at position " + pos +
- " recordType = " +
- recordType +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored (III)");
-
- // If a file has damaged pendingTransactions, we make it a dataFile, and the
- // next reclaiming will fix it
- reader.markAsDataFile(file);
-
- wholeFileBuffer.position(pos + DataConstants.SIZE_BYTE);
-
- continue;
- }
-
- // This record is from a previous file-usage. The file was
- // reused and we need to ignore this record
- if (readFileId != file.getFileID())
- {
- // If a file has damaged pendingTransactions, we make it a dataFile, and the
- // next reclaiming will fix it
- reader.markAsDataFile(file);
-
- continue;
- }
-
- wholeFileBuffer.position(oldPos);
-
- // At this point everything is checked. So we relax and just load
- // the data now.
-
- switch (recordType)
- {
- case ADD_RECORD:
- {
- reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
- break;
- }
-
- case UPDATE_RECORD:
- {
- reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
- break;
- }
-
- case DELETE_RECORD:
- {
- reader.onReadDeleteRecord(recordID);
- break;
- }
-
- case ADD_RECORD_TX:
- {
- reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
- break;
- }
-
- case UPDATE_RECORD_TX:
- {
- reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
- break;
- }
-
- case DELETE_RECORD_TX:
- {
- reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
- break;
- }
-
- case PREPARE_RECORD:
- {
-
- byte extraData[] = new byte[preparedTransactionExtraDataSize];
-
- wholeFileBuffer.get(extraData);
-
- reader.onReadPrepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
-
- break;
- }
- case COMMIT_RECORD:
- {
-
- reader.onReadCommitRecord(transactionID, transactionCheckNumberOfRecords);
- break;
- }
- case ROLLBACK_RECORD:
- {
- reader.onReadRollbackRecord(transactionID);
- break;
- }
- default:
- {
- throw new IllegalStateException("Journal " + file.getFile().getFileName() +
- " is corrupt, invalid record type " +
- recordType);
- }
- }
-
- checkSize = wholeFileBuffer.getInt();
-
- // This is a sanity check about the loading code itself.
- // If this checkSize doesn't match, it means the reading method is
- // not doing what it was supposed to do
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
- throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
- ", pos = " +
- pos);
- }
-
- lastDataPos = wholeFileBuffer.position();
-
- }
-
- return lastDataPos;
- }
-
- finally
- {
- if (wholeFileBuffer != null)
- {
- fileFactory.releaseBuffer(wholeFileBuffer);
- }
-
- try
- {
- file.getFile().close();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
/**
* <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
* <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
@@ -2307,180 +2410,6 @@
return journalTransaction.getCounter(currentFile) == numberOfRecords;
}
- /**
- * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
- * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
- * (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
- * <p>The element-summary will then have</p>
- * <p>FileID1, 10</p>
- * <p>FileID2, 10</p>
- * <p>FileID3, 10</p>
- *
- * <br>
- * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
- * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
- * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
- * That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
- *
- * @param recordType
- * @param txID
- * @param tx
- * @param transactionData
- * @return
- * @throws Exception
- */
- public static void writeTransaction(final int fileID,
- final byte recordType,
- final long txID,
- final JournalTransaction tx,
- final EncodingSupport transactionData,
- final int size,
- final int numberOfRecords,
- final ChannelBuffer bb) throws Exception
- {
- bb.writeByte(recordType);
- bb.writeInt(fileID); // skip ID part
- bb.writeLong(txID);
- bb.writeInt(numberOfRecords); // skip number of pendingTransactions part
-
- if (transactionData != null)
- {
- bb.writeInt(transactionData.getEncodeSize());
- }
-
- if (transactionData != null)
- {
- transactionData.encode(bb);
- }
-
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @param size
- * @param bb
- */
- public static void writeUpdateRecordTX(final int fileID,
- final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- int size,
- ChannelBuffer bb)
- {
- bb.writeByte(UPDATE_RECORD_TX);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @param size
- * @param bb
- */
- public static void writeUpdateRecord(final int fileId,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- int size,
- ChannelBuffer bb)
- {
- bb.writeByte(UPDATE_RECORD);
- bb.writeInt(fileId);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @param size
- * @param bb
- */
- public static void writeAddRecord(final int fileId,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- int size,
- ChannelBuffer bb)
- {
- bb.writeByte(ADD_RECORD);
- bb.writeInt(fileId);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param id
- * @param record
- * @param size
- * @param bb
- */
- public static void writeDeleteRecordTransactional(final int fileID,
- final long txID,
- final long id,
- final EncodingSupport record,
- int size,
- ChannelBuffer bb)
- {
- bb.writeByte(DELETE_RECORD_TX);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record != null ? record.getEncodeSize() : 0);
- if (record != null)
- {
- record.encode(bb);
- }
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @param recordLength
- * @param size
- * @param bb
- */
- public static void writeAddRecordTX(final int fileID,
- final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- int size,
- ChannelBuffer bb)
- {
- bb.writeByte(ADD_RECORD_TX);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
private static boolean isTransaction(final byte recordType)
{
return recordType == ADD_RECORD_TX || recordType == UPDATE_RECORD_TX ||
@@ -2860,7 +2789,7 @@
* @return
* @throws Exception
*/
- JournalFile getFile(boolean keepOpened, boolean multiAIO, boolean fill) throws Exception
+ JournalFile getFile(final boolean keepOpened, final boolean multiAIO, final boolean fill) throws Exception
{
JournalFile nextOpenedFile = null;
try
@@ -2925,7 +2854,7 @@
return tx;
}
- private IOCallback getSyncCallback(boolean sync)
+ private IOCallback getSyncCallback(final boolean sync)
{
if (fileFactory.isSupportsCallbacks())
{
@@ -2944,8 +2873,78 @@
}
}
- public ChannelBuffer newBuffer(final int size)
+ /**
+ * @return
+ * @throws Exception
+ */
+ private void checkControlFile() throws Exception
{
+ ArrayList<String> dataFiles = new ArrayList<String>();
+ ArrayList<String> newFiles = new ArrayList<String>();
+
+ SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles);
+ if (controlFile != null)
+ {
+ log.info("Journal Compactor was interrupted during renaming phase, renaming files");
+
+ for (String dataFile : dataFiles)
+ {
+ SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
+ log.info("Removing old compacted file" + file.getFileName());
+ if (file.exists())
+ {
+ file.delete();
+ }
+ }
+
+ for (String newFile : newFiles)
+ {
+ SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
+ log.info("Renaming file " + file.getFileName() + " as an part of the data files");
+ if (file.exists())
+ {
+ final String originalName = file.getFileName();
+ final String newName = originalName.substring(0, originalName.lastIndexOf(".cmp"));
+ file.renameTo(newName);
+ }
+ }
+
+ controlFile.delete();
+ }
+
+ List<String> leftFiles = fileFactory.listFiles(getFileExtension() + ".cmp");
+
+ if (leftFiles.size() > 0)
+ {
+ log.warn("Compacted files were left unnatended on journal directory, deleting invalid files now");
+
+ for (String fileToDelete : leftFiles)
+ {
+ log.warn("Deleting unnatended file " + fileToDelete);
+ SequentialFile file = fileFactory.createSequentialFile(fileToDelete, 1);
+ file.delete();
+ }
+ }
+
+ return;
+ }
+
+ private static boolean isInvalidSize(final int fileSize, final int bufferPos, final int size)
+ {
+ if (size < 0)
+ {
+ return true;
+ }
+ else
+ {
+ final int position = bufferPos + size;
+ return position > fileSize || position < 0;
+
+ }
+ }
+
+ private ChannelBuffer newBuffer(final int size)
+ {
return ChannelBuffers.buffer(size);
}
@@ -3031,11 +3030,11 @@
static NullEncoding instance = new NullEncoding();
- public void decode(MessagingBuffer buffer)
+ public void decode(final MessagingBuffer buffer)
{
}
- public void encode(MessagingBuffer buffer)
+ public void encode(final MessagingBuffer buffer)
{
}
More information about the jboss-cvs-commits
mailing list