[jboss-cvs] JBoss Messaging SVN: r7373 - in trunk: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 16 19:58:22 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-16 19:58:22 -0400 (Tue, 16 Jun 2009)
New Revision: 7373
Modified:
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
JBMESSAGING-1504 - Simple refactoring only at this point. I need to be able two read files in two different scenarios (loading and compacting). This commit is just refactoring the load part into a reading method (and loading should be using the readJournalFile method then).
This shouldn't make any difference on behaviour.
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-16 23:38:44 UTC (rev 7372)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-16 23:58:22 UTC (rev 7373)
@@ -45,6 +45,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.buffers.ChannelBuffer;
@@ -279,7 +280,7 @@
bb.writeByte(recordType);
record.encode(bb);
bb.writeInt(size);
-
+
IOCallback callback = getSyncCallback(sync);
lock.acquire();
@@ -293,7 +294,7 @@
{
lock.release();
}
-
+
if (callback != null)
{
callback.waitCompletion();
@@ -331,9 +332,8 @@
record.encode(bb);
bb.writeInt(size);
-
IOCallback callback = getSyncCallback(sync);
-
+
lock.acquire();
try
{
@@ -345,7 +345,7 @@
{
lock.release();
}
-
+
if (callback != null)
{
callback.waitCompletion();
@@ -374,7 +374,7 @@
bb.writeInt(-1); // skip ID part
bb.writeLong(id);
bb.writeInt(size);
-
+
IOCallback callback = getSyncCallback(sync);
lock.acquire();
@@ -388,14 +388,17 @@
{
lock.release();
}
-
+
if (callback != null)
{
callback.waitCompletion();
}
}
- public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record,
+ public void appendAddRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final byte[] record,
final boolean sync) throws Exception
{
appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record), sync);
@@ -491,13 +494,14 @@
}
}
- public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record,
- final boolean sync) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record, final boolean sync) throws Exception
{
appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record), sync);
}
- public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record,
+ public void appendDeleteRecordTransactional(final long txID,
+ final long id,
+ final EncodingSupport record,
final boolean sync) throws Exception
{
if (state != STATE_LOADED)
@@ -535,8 +539,7 @@
}
}
- public void appendDeleteRecordTransactional(final long txID, final long id,
- final boolean sync) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id, final boolean sync) throws Exception
{
if (state != STATE_LOADED)
{
@@ -819,490 +822,300 @@
fileFactory.controlBuffersLifeCycle(false);
- Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+ final Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
- List<JournalFile> orderedFiles = orderFiles();
+ final List<JournalFile> orderedFiles = orderFiles();
int lastDataPos = SIZE_HEADER;
long maxID = -1;
- for (JournalFile file : orderedFiles)
+ for (final JournalFile file : orderedFiles)
{
- ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
+ trace("Loading file " + file.getFile().getFileName());
- file.getFile().open(1);
+ final AtomicBoolean hasData = new AtomicBoolean(false);
- int bytesRead = file.getFile().read(wholeFileBuffer);
-
- if (bytesRead != fileSize)
+ int resultLastPost = readJournalFile(file, new JournalReader()
{
- // FIXME - We should extract everything we can from this file
- // and then we shouldn't ever reuse this file on reclaiming (instead
- // reclaim on different size files would aways throw the file away)
- // rather than throw ISE!
- // We don't want to leave the user with an unusable system
- throw new IllegalStateException("File is wrong size " + bytesRead +
- " expected " +
- fileSize +
- " : " +
- file.getFile().getFileName());
- }
- wholeFileBuffer.position(0);
+ public void addRecord(RecordInfo info) throws Exception
+ {
+ if (trace)
+ {
+ trace("AddRecord: " + info);
+ }
+ hasData.set(true);
- // First long is the ordering timestamp, we just jump its position
- wholeFileBuffer.position(SIZE_HEADER);
+ loadManager.addRecord(info);
- boolean hasData = false;
-
- while (wholeFileBuffer.hasRemaining())
- {
- final int pos = wholeFileBuffer.position();
-
- byte recordType = wholeFileBuffer.get();
-
- if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
- {
- // I - We scan for any valid record on the file. If a hole
- // happened on the middle of the file we keep looking until all
- // the possibilities are gone
- continue;
+ posFilesMap.put(info.id, new PosFiles(file));
+
+ PosFiles file = posFilesMap.get(info.id);
}
- if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
+ public void updateRecord(RecordInfo info) throws Exception
{
- hasData = true;
- wholeFileBuffer.position(pos + 1);
- // II - Ignore this record, lets keep looking
- continue;
- }
-
- // III - Every record has the file-id.
- // This is what supports us from not re-filling the whole file
- int readFileId = wholeFileBuffer.getInt();
-
- long transactionID = 0;
-
- if (isTransaction(recordType))
- {
- if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+ if (trace)
{
- wholeFileBuffer.position(pos + 1);
- hasData = true;
- continue;
+ trace("UpdateRecord: " + info);
}
+ hasData.set(true);
- transactionID = wholeFileBuffer.getLong();
- }
+ loadManager.updateRecord(info);
- long recordID = 0;
+ PosFiles posFiles = posFilesMap.get(info.id);
- if (!isCompleteTransaction(recordType))
- {
- if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+ if (posFiles != null)
{
- wholeFileBuffer.position(pos + 1);
- hasData = true;
- continue;
+ // It's legal for this to be null. The file(s) with the may
+ // have been deleted
+ // just leaving some updates in this file
+
+ posFiles.addUpdateFile(file);
}
-
- recordID = wholeFileBuffer.getLong();
-
- maxID = Math.max(maxID, recordID);
}
- // We use the size of the record to validate the health of the
- // record.
- // (V) We verify the size of the record
-
- // The variable record portion used on Updates and Appends
- int variableSize = 0;
-
- // Used to hold extra data on transaction prepares
- int preparedTransactionExtraDataSize = 0;
-
- byte userRecordType = 0;
-
- byte record[] = null;
-
- if (isContainsBody(recordType))
+ public void deleteRecord(long recordID) throws Exception
{
- if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
+ if (trace)
{
- wholeFileBuffer.position(pos + 1);
- hasData = true;
- continue;
+ trace("DeleteRecord: " + recordID);
}
+ hasData.set(true);
- variableSize = wholeFileBuffer.getInt();
+ loadManager.deleteRecord(recordID);
- if (isInvalidSize(wholeFileBuffer.position(), variableSize))
- {
- wholeFileBuffer.position(pos + 1);
- continue;
- }
+ PosFiles posFiles = posFilesMap.remove(recordID);
- if (recordType != DELETE_RECORD_TX)
+ if (posFiles != null)
{
- userRecordType = wholeFileBuffer.get();
+ posFiles.addDelete(file);
}
-
- record = new byte[variableSize];
-
- wholeFileBuffer.get(record);
}
- if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+ public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
{
- if (recordType == PREPARE_RECORD)
- {
- // Add the variable size required for preparedTransactions
- preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
- }
- // Both commit and record contain the recordSummary, and this is
- // used to calculate the record-size on both record-types
- variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
+ addRecordTX(transactionID, info);
}
- int recordSize = getRecordSize(recordType);
-
- // VI - this is completing V, We will validate the size at the end
- // of the record,
- // But we avoid buffer overflows by damaged data
- if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
+ public void addRecordTX(long transactionID, RecordInfo info) throws Exception
{
- // Avoid a buffer overflow caused by damaged data... continue
- // scanning for more records...
- log.debug("Record at position " + pos +
- " recordType = " + recordType +
- " file:" + file.getFile().getFileName() +
- " recordSize: " + recordSize +
- " variableSize: " + variableSize +
- " preparedTransactionExtraDataSize: " + preparedTransactionExtraDataSize +
- " is corrupted and it is being ignored (II)");
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- hasData = true;
- wholeFileBuffer.position(pos + 1);
- continue;
- }
+ if (trace)
+ {
+ trace((info.isUpdate ? "updateRecordTX: " : "addRecordTX: ") + info + ", txid = " + transactionID);
+ }
- int oldPos = wholeFileBuffer.position();
+ hasData.set(true);
- wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+ TransactionHolder tx = transactions.get(transactionID);
- int checkSize = wholeFileBuffer.getInt();
+ if (tx == null)
+ {
+ tx = new TransactionHolder(transactionID);
- // VII - The checkSize at the end has to match with the size
- // informed at the beggining.
- // This is like testing a hash for the record. (We could replace the
- // checkSize by some sort of calculated hash)
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
- log.debug("Record at position " + pos +
- " recordType = " +
- recordType +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored (III)");
+ transactions.put(transactionID, tx);
+ }
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- hasData = true;
+ tx.recordInfos.add(info);
- wholeFileBuffer.position(pos + SIZE_BYTE);
+ JournalTransaction tnp = transactionInfos.get(transactionID);
- continue;
- }
+ if (tnp == null)
+ {
+ tnp = new JournalTransaction();
- // This record is from a previous file-usage. The file was
- // reused and we need to ignore this record
- if (readFileId != file.getOrderingID())
- {
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- hasData = true;
+ transactionInfos.put(transactionID, tnp);
+ }
- continue;
+ tnp.addPositive(file, info.id);
}
- wholeFileBuffer.position(oldPos);
-
- // At this point everything is checked. So we relax and just load
- // the data now.
-
- switch (recordType)
+ public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
{
- case ADD_RECORD:
+ if (trace)
{
- loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+ trace("DeleteRecordTX: " + transactionID + " info = " + info);
+ }
- posFilesMap.put(recordID, new PosFiles(file));
+ hasData.set(true);
- hasData = true;
+ TransactionHolder tx = transactions.get(transactionID);
- break;
- }
- case UPDATE_RECORD:
+ if (tx == null)
{
- loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+ tx = new TransactionHolder(transactionID);
- hasData = true;
+ transactions.put(transactionID, tx);
+ }
- PosFiles posFiles = posFilesMap.get(recordID);
+ tx.recordsToDelete.add(info);
- if (posFiles != null)
- {
- // It's legal for this to be null. The file(s) with the may
- // have been deleted
- // just leaving some updates in this file
+ JournalTransaction tnp = transactionInfos.get(transactionID);
- posFiles.addUpdateFile(file);
- }
+ if (tnp == null)
+ {
+ tnp = new JournalTransaction();
- break;
+ transactionInfos.put(transactionID, tnp);
}
- case DELETE_RECORD:
- {
- loadManager.deleteRecord(recordID);
- hasData = true;
+ tnp.addNegative(file, info.id);
- PosFiles posFiles = posFilesMap.remove(recordID);
+ }
- if (posFiles != null)
- {
- posFiles.addDelete(file);
- }
-
- break;
+ public void prepareRecord(long transactionID, byte[] extraData, Pair<Integer, Integer>[] summary) throws Exception
+ {
+ if (trace)
+ {
+ trace("prepareRecordTX: txid = " + transactionID);
}
- case ADD_RECORD_TX:
- case UPDATE_RECORD_TX:
- {
- TransactionHolder tx = transactions.get(transactionID);
- if (tx == null)
- {
- tx = new TransactionHolder(transactionID);
+ hasData.set(true);
- transactions.put(transactionID, tx);
- }
+ TransactionHolder tx = transactions.get(transactionID);
- tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType == UPDATE_RECORD_TX));
+ if (tx == null)
+ {
+ // The user could choose to prepare empty transactions
+ tx = new TransactionHolder(transactionID);
- JournalTransaction tnp = transactionInfos.get(transactionID);
+ transactions.put(transactionID, tx);
+ }
- if (tnp == null)
- {
- tnp = new JournalTransaction();
+ tx.prepared = true;
- transactionInfos.put(transactionID, tnp);
- }
+ tx.extraData = extraData;
- tnp.addPositive(file, recordID);
+ JournalTransaction journalTransaction = transactionInfos.get(transactionID);
- hasData = true;
+ if (journalTransaction == null)
+ {
+ journalTransaction = new JournalTransaction();
- break;
+ transactionInfos.put(transactionID, journalTransaction);
}
- case DELETE_RECORD_TX:
- {
- TransactionHolder tx = transactions.get(transactionID);
- if (tx == null)
- {
- tx = new TransactionHolder(transactionID);
+ boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, summary);
- transactions.put(transactionID, tx);
- }
-
- tx.recordsToDelete.add(new RecordInfo(recordID, (byte)0, record, true));
-
- JournalTransaction tnp = transactionInfos.get(transactionID);
-
- if (tnp == null)
- {
- tnp = new JournalTransaction();
-
- transactionInfos.put(transactionID, tnp);
- }
-
- tnp.addNegative(file, recordID);
-
- hasData = true;
-
- break;
+ if (healthy)
+ {
+ journalTransaction.prepare(file);
}
- case PREPARE_RECORD:
+ else
{
- TransactionHolder tx = transactions.get(transactionID);
+ log.warn("Prepared transaction " + transactionID + " wasn't considered completed, it will be ignored");
+ tx.invalid = true;
+ }
+ }
- if (tx == null)
- {
- // The user could choose to prepare empty transactions
- tx = new TransactionHolder(transactionID);
+ public void commitRecord(long transactionID, Pair<Integer, Integer>[] summary) throws Exception
+ {
+ if (trace)
+ {
+ trace("commitRecord: txid = " + transactionID);
+ }
- transactions.put(transactionID, tx);
- }
+ TransactionHolder tx = transactions.remove(transactionID);
- byte extraData[] = new byte[preparedTransactionExtraDataSize];
+ // The commit could be alone on its own journal-file and the
+ // whole transaction body was reclaimed but not the
+ // commit-record
+ // So it is completely legal to not find a transaction at this
+ // point
+ // If we can't find it, we assume the TX was reclaimed and we
+ // ignore this
+ if (tx != null)
+ {
+ JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
- wholeFileBuffer.get(extraData);
-
- // Pair <FileID, NumberOfElements>
- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize,
- wholeFileBuffer);
-
- tx.prepared = true;
-
- tx.extraData = extraData;
-
- JournalTransaction journalTransaction = transactionInfos.get(transactionID);
-
if (journalTransaction == null)
{
- journalTransaction = new JournalTransaction();
-
- transactionInfos.put(transactionID, journalTransaction);
+ throw new IllegalStateException("Cannot find tx " + transactionID);
}
- boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
+ boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, summary);
if (healthy)
{
- journalTransaction.prepare(file);
- }
- else
- {
- log.warn("Prepared transaction " + transactionID +
- " wasn't considered completed, it will be ignored");
- tx.invalid = true;
- }
-
- hasData = true;
-
- break;
- }
- case COMMIT_RECORD:
- {
- TransactionHolder tx = transactions.remove(transactionID);
-
- // We need to read it even if transaction was not found, or
- // the reading checks would fail
- // Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize,
- wholeFileBuffer);
-
- // The commit could be alone on its own journal-file and the
- // whole transaction body was reclaimed but not the
- // commit-record
- // So it is completely legal to not find a transaction at this
- // point
- // If we can't find it, we assume the TX was reclaimed and we
- // ignore this
- if (tx != null)
- {
- JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
-
- if (journalTransaction == null)
+ for (RecordInfo txRecord : tx.recordInfos)
{
- throw new IllegalStateException("Cannot find tx " + transactionID);
- }
-
- boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
-
- if (healthy)
- {
- for (RecordInfo txRecord : tx.recordInfos)
+ if (txRecord.isUpdate)
{
- if (txRecord.isUpdate)
- {
- loadManager.updateRecord(txRecord);
- }
- else
- {
- loadManager.addRecord(txRecord);
- }
+ loadManager.updateRecord(txRecord);
}
-
- for (RecordInfo deleteValue : tx.recordsToDelete)
+ else
{
- loadManager.deleteRecord(deleteValue.id);
+ loadManager.addRecord(txRecord);
}
+ }
- journalTransaction.commit(file);
- }
- else
+ for (RecordInfo deleteValue : tx.recordsToDelete)
{
- log.warn("Transaction " + transactionID +
- " is missing elements so the transaction is being ignored");
-
- journalTransaction.forget();
+ loadManager.deleteRecord(deleteValue.id);
}
- hasData = true;
+ journalTransaction.commit(file);
}
+ else
+ {
+ log.warn("Transaction " + transactionID +
+ " is missing elements so the transaction is being ignored");
- break;
+ journalTransaction.forget();
+ }
+
+ hasData.set(true);
}
- case ROLLBACK_RECORD:
+
+ }
+
+ public void rollbackRecord(long transactionID) throws Exception
+ {
+ if (trace)
{
- TransactionHolder tx = transactions.remove(transactionID);
+ trace("rollbackRecord: txid = " + transactionID);
+ }
- // The rollback could be alone on its own journal-file and the
- // whole transaction body was reclaimed but the commit-record
- // So it is completely legal to not find a transaction at this
- // point
- if (tx != null)
- {
- JournalTransaction tnp = transactionInfos.remove(transactionID);
+ TransactionHolder tx = transactions.remove(transactionID);
- if (tnp == null)
- {
- throw new IllegalStateException("Cannot find tx " + transactionID);
- }
+ // The rollback could be alone on its own journal-file and the
+ // whole transaction body was reclaimed but the commit-record
+ // So it is completely legal to not find a transaction at this
+ // point
+ if (tx != null)
+ {
+ JournalTransaction tnp = transactionInfos.remove(transactionID);
- // There is no need to validate summaries/holes on
- // Rollbacks.. We will ignore the data anyway.
- tnp.rollback(file);
-
- hasData = true;
+ if (tnp == null)
+ {
+ throw new IllegalStateException("Cannot find tx " + transactionID);
}
- break;
+ // There is no need to validate summaries/holes on
+ // Rollbacks.. We will ignore the data anyway.
+ tnp.rollback(file);
+
+ hasData.set(true);
}
- default:
+ }
+
+ public void markAsDataFile(JournalFile file)
+ {
+ if (trace)
{
- throw new IllegalStateException("Journal " + file.getFile().getFileName() +
- " is corrupt, invalid record type " +
- recordType);
+ trace("Marking " + file + " as data file");
}
- }
- checkSize = wholeFileBuffer.getInt();
-
- // This is a sanity check about the loading code itself.
- // If this checkSize doesn't match, it means the reading method is
- // not doing what it was supposed to do
- if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
- {
- throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
- ", pos = " +
- pos);
+ hasData.set(true);
}
- lastDataPos = wholeFileBuffer.position();
- }
+ });
- fileFactory.releaseBuffer(wholeFileBuffer);
-
- file.getFile().close();
-
- if (hasData)
+ if (hasData.get())
{
+ lastDataPos = resultLastPost;
dataFiles.add(file);
}
else
@@ -1354,7 +1167,7 @@
openFile(currentFile);
}
-
+
fileFactory.activate(currentFile.getFile());
pushOpenedFile();
@@ -1463,7 +1276,7 @@
/** Method for use on testcases.
* It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
public void debugWait() throws Exception
- {
+ {
for (TransactionCallback callback : transactionCallbacks.values())
{
callback.waitCompletion();
@@ -1583,7 +1396,7 @@
lock.release();
}
}
-
+
public void perfBlast(final int pages) throws Exception
{
new PerfBlast(pages).start();
@@ -1605,7 +1418,7 @@
}
filesExecutor = Executors.newSingleThreadExecutor();
-
+
fileFactory.start();
state = STATE_STARTED;
@@ -1680,13 +1493,13 @@
SequentialFile sf = file.getFile();
sf.open(1);
-
+
sf.position(0);
ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
-
+
bb.putInt(newOrderingID);
-
+
bb.rewind();
sf.write(bb, true);
@@ -1700,6 +1513,317 @@
return jf;
}
+
+ private int readJournalFile(JournalFile file, JournalReader reader) throws Exception
+ {
+ ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
+
+ file.getFile().open(1);
+
+ int bytesRead = file.getFile().read(wholeFileBuffer);
+
+ if (bytesRead != fileSize)
+ {
+ // FIXME - We should extract everything we can from this file
+ // and then we shouldn't ever reuse this file on reclaiming (instead
+ // reclaim on different size files would aways throw the file away)
+ // rather than throw ISE!
+ // We don't want to leave the user with an unusable system
+ throw new IllegalStateException("File is wrong size " + bytesRead +
+ " expected " +
+ fileSize +
+ " : " +
+ file.getFile().getFileName());
+ }
+
+ wholeFileBuffer.position(0);
+
+ // First long is the ordering timestamp, we just jump its position
+ wholeFileBuffer.position(SIZE_HEADER);
+
+ int lastDataPos = SIZE_HEADER;
+
+ while (wholeFileBuffer.hasRemaining())
+ {
+ final int pos = wholeFileBuffer.position();
+
+ byte recordType = wholeFileBuffer.get();
+
+ if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+ {
+ // I - We scan for any valid record on the file. If a hole
+ // happened on the middle of the file we keep looking until all
+ // the possibilities are gone
+ continue;
+ }
+
+ if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
+ {
+ reader.markAsDataFile(file);
+
+ wholeFileBuffer.position(pos + 1);
+ // II - Ignore this record, lets keep looking
+ continue;
+ }
+
+ // III - Every record has the file-id.
+ // This is what supports us from not re-filling the whole file
+ int readFileId = wholeFileBuffer.getInt();
+
+ long transactionID = 0;
+
+ if (isTransaction(recordType))
+ {
+ if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+ {
+ wholeFileBuffer.position(pos + 1);
+ reader.markAsDataFile(file);
+ continue;
+ }
+
+ transactionID = wholeFileBuffer.getLong();
+ }
+
+ long recordID = 0;
+
+ if (!isCompleteTransaction(recordType))
+ {
+ if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+ {
+ wholeFileBuffer.position(pos + 1);
+ reader.markAsDataFile(file);
+ continue;
+ }
+
+ recordID = wholeFileBuffer.getLong();
+ }
+
+ // We use the size of the record to validate the health of the
+ // record.
+ // (V) We verify the size of the record
+
+ // The variable record portion used on Updates and Appends
+ int variableSize = 0;
+
+ // Used to hold extra data on transaction prepares
+ int preparedTransactionExtraDataSize = 0;
+
+ byte userRecordType = 0;
+
+ byte record[] = null;
+
+ if (isContainsBody(recordType))
+ {
+ if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
+ {
+ wholeFileBuffer.position(pos + 1);
+ reader.markAsDataFile(file);
+ continue;
+ }
+
+ variableSize = wholeFileBuffer.getInt();
+
+ if (isInvalidSize(wholeFileBuffer.position(), variableSize))
+ {
+ wholeFileBuffer.position(pos + 1);
+ continue;
+ }
+
+ if (recordType != DELETE_RECORD_TX)
+ {
+ userRecordType = wholeFileBuffer.get();
+ }
+
+ record = new byte[variableSize];
+
+ wholeFileBuffer.get(record);
+ }
+
+ if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+ {
+ if (recordType == PREPARE_RECORD)
+ {
+ // Add the variable size required for preparedTransactions
+ preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
+ }
+ // Both commit and record contain the recordSummary, and this is
+ // used to calculate the record-size on both record-types
+ variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
+ }
+
+ int recordSize = getRecordSize(recordType);
+
+ // VI - this is completing V, We will validate the size at the end
+ // of the record,
+ // But we avoid buffer overflows by damaged data
+ if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
+ {
+ // Avoid a buffer overflow caused by damaged data... continue
+ // scanning for more records...
+ trace("Record at position " + pos +
+ " recordType = " +
+ recordType +
+ " file:" +
+ file.getFile().getFileName() +
+ " recordSize: " +
+ recordSize +
+ " variableSize: " +
+ variableSize +
+ " preparedTransactionExtraDataSize: " +
+ preparedTransactionExtraDataSize +
+ " is corrupted and it is being ignored (II)");
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile(file);
+ wholeFileBuffer.position(pos + 1);
+
+ continue;
+ }
+
+ int oldPos = wholeFileBuffer.position();
+
+ wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+
+ int checkSize = wholeFileBuffer.getInt();
+
+ // VII - The checkSize at the end has to match with the size
+ // informed at the beggining.
+ // This is like testing a hash for the record. (We could replace the
+ // checkSize by some sort of calculated hash)
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+ {
+ trace("Record at position " + pos +
+ " recordType = " +
+ recordType +
+ " file:" +
+ file.getFile().getFileName() +
+ " is corrupted and it is being ignored (III)");
+
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile(file);
+
+ wholeFileBuffer.position(pos + SIZE_BYTE);
+
+ continue;
+ }
+
+ // This record is from a previous file-usage. The file was
+ // reused and we need to ignore this record
+ if (readFileId != file.getOrderingID())
+ {
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ reader.markAsDataFile(file);
+
+ continue;
+ }
+
+ wholeFileBuffer.position(oldPos);
+
+ // At this point everything is checked. So we relax and just load
+ // the data now.
+
+ switch (recordType)
+ {
+ case ADD_RECORD:
+ {
+ reader.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+ break;
+ }
+
+ case UPDATE_RECORD:
+ {
+ reader.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+ break;
+ }
+
+ case DELETE_RECORD:
+ {
+ reader.deleteRecord(recordID);
+ break;
+ }
+
+ case ADD_RECORD_TX:
+ {
+ reader.addRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+ break;
+ }
+
+ case UPDATE_RECORD_TX:
+ {
+ reader.updateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+ break;
+ }
+
+ case DELETE_RECORD_TX:
+ {
+ reader.deleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+ break;
+ }
+
+ case PREPARE_RECORD:
+ {
+
+ byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+ wholeFileBuffer.get(extraData);
+
+ // Pair <FileID, NumberOfElements>
+ Pair<Integer, Integer>[] summary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
+
+ reader.prepareRecord(transactionID, extraData, summary);
+
+ break;
+ }
+ case COMMIT_RECORD:
+ {
+ // We need to read it even if transaction was not found, or
+ // the reading checks would fail
+ // Pair <OrderId, NumberOfElements>
+ Pair<Integer, Integer>[] summary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
+
+ reader.commitRecord(transactionID, summary);
+ break;
+ }
+ case ROLLBACK_RECORD:
+ {
+ reader.rollbackRecord(transactionID);
+ break;
+ }
+ default:
+ {
+ throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+ " is corrupt, invalid record type " +
+ recordType);
+ }
+ }
+
+ checkSize = wholeFileBuffer.getInt();
+
+ // This is a sanity check about the loading code itself.
+ // If this checkSize doesn't match, it means the reading method is
+ // not doing what it was supposed to do
+ if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+ {
+ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
+ ", pos = " +
+ pos);
+ }
+
+ lastDataPos = wholeFileBuffer.position();
+
+ }
+
+ fileFactory.releaseBuffer(wholeFileBuffer);
+
+ file.getFile().close();
+
+ return lastDataPos;
+
+ }
+
+
/** It will read the elements-summary back from the commit/prepare transaction
* Pair<FileID, Counter> */
@SuppressWarnings("unchecked")
@@ -1806,9 +1930,9 @@
* @throws Exception
*/
private ChannelBuffer writeTransaction(final byte recordType,
- final long txID,
- final JournalTransaction tx,
- final EncodingSupport transactionData) throws Exception
+ final long txID,
+ final JournalTransaction tx,
+ final EncodingSupport transactionData) throws Exception
{
int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() *
SIZE_INT *
@@ -1957,7 +2081,6 @@
return orderedFiles;
}
-
/**
* Note: You should aways guarantee locking the semaphore lock.
@@ -1987,7 +2110,7 @@
currentFile.getFile().unlockBuffer();
moveNextFile();
currentFile.getFile().lockBuffer();
-
+
// The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size))
{
@@ -2082,7 +2205,7 @@
closeFile(currentFile);
currentFile = enqueueOpenFile();
-
+
fileFactory.activate(currentFile.getFile());
}
@@ -2210,8 +2333,7 @@
return tx;
}
-
-
+
private IOCallback getSyncCallback(boolean sync)
{
if (fileFactory.isSupportsCallbacks())
@@ -2351,6 +2473,26 @@
}
}
}
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("PosFiles(add=" + addFile.getFile().getFileName());
+
+ if (updateFiles != null)
+ {
+
+ for (JournalFile update : updateFiles)
+ {
+ buffer.append(", update=" + update.getFile().getFileName());
+ }
+
+ }
+
+ buffer.append(")");
+
+ return buffer.toString();
+ }
}
private class JournalTransaction
@@ -2564,25 +2706,25 @@
private class PerfBlast extends Thread
{
private final int pages;
-
+
private PerfBlast(final int pages)
{
- this.pages = pages;
+ this.pages = pages;
}
-
+
public void run()
- {
+ {
try
- {
+ {
lock.acquire();
-
+
MessagingBuffer bb = newBuffer(128 * 1024);
-
+
for (int i = 0; i < pages; i++)
{
appendRecord(bb, false, null);
}
-
+
lock.release();
}
catch (Exception e)
@@ -2591,5 +2733,62 @@
}
}
}
-
+
+ private static interface JournalReader
+ {
+ void addRecord(RecordInfo info) throws Exception;
+
+ /**
+ * @param recordInfo
+ * @throws Exception
+ */
+ void updateRecord(RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param recordID
+ */
+ void deleteRecord(long recordID) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param recordInfo
+ * @throws Exception
+ */
+ void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param recordInfo
+ * @throws Exception
+ */
+ void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param recordInfo
+ */
+ void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param extraData
+ * @param summaryData
+ */
+ void prepareRecord(long transactionID, byte[] extraData, Pair<Integer, Integer>[] summary) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param summaryData
+ */
+ void commitRecord(long transactionID, Pair<Integer, Integer>[] summary) throws Exception;
+
+ /**
+ * @param transactionID
+ */
+ void rollbackRecord(long transactionID) throws Exception;
+
+ public void markAsDataFile(JournalFile file);
+
+ }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-16 23:38:44 UTC (rev 7372)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-16 23:58:22 UTC (rev 7373)
@@ -2944,30 +2944,30 @@
createJournal();
startJournal();
load();
+
+ int transactionID = 0;
for (int i = 0; i < 100; i++)
{
add(i);
if (i % 10 == 0 && i > 0)
{
- System.out.println("new file at " + i);
journal.forceMoveNextFile();
}
update(i);
}
- for (int i = 0; i < 100; i++)
+ for (int i = 100; i < 200; i++)
{
- addTx(i, i + 100);
+ addTx(transactionID, i);
updateTx(i + 100);
if (i % 10 == 0 && i > 0)
{
- System.out.println("new file at " + i);
journal.forceMoveNextFile();
}
- commit(i);
+ commit(transactionID++);
update(i);
}
@@ -2986,22 +2986,44 @@
journal.forceMoveNextFile();
- for (int i = 0; i < 200; i++)
+ for (int i = 0; i < 100; i++)
{
delete(i);
}
+ for (int i = 100; i < 200; i++)
+ {
+ updateTx(transactionID, i);
+ }
+
journal.forceMoveNextFile();
-
+
+ commit(transactionID++);
+
+ for (int i = 100; i < 200; i++)
+ {
+ updateTx(transactionID, i);
+ deleteTx(transactionID, i);
+ }
+
+ commit(transactionID++);
+
+
System.out.println("Before reclaim ****************************");
System.out.println(journal.debug());
System.out.println("*****************************************");
- journal.checkAndReclaimFiles();
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
System.out.println("After reclaim ****************************");
System.out.println(journal.debug());
System.out.println("*****************************************");
+
+ journal.forceMoveNextFile();
+ journal.checkAndReclaimFiles();
assertEquals(0, journal.getDataFilesCount());
More information about the jboss-cvs-commits
mailing list