[jboss-cvs] JBoss Messaging SVN: r7448 - branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 24 00:48:19 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-24 00:48:19 -0400 (Wed, 24 Jun 2009)
New Revision: 7448
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
tweak
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-23 23:51:36 UTC (rev 7447)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-24 04:48:19 UTC (rev 7448)
@@ -1014,7 +1014,7 @@
}
}
- public void commitRecord(long transactionID, Pair<Integer, Integer>[] summary) throws Exception
+ public void commitRecord(long transactionID, int numberOfRecords) throws Exception
{
}
@@ -1030,7 +1030,7 @@
{
}
- public void prepareRecord(long transactionID, byte[] extraData, Pair<Integer, Integer>[] summary) throws Exception
+ public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
{
}
@@ -1253,7 +1253,7 @@
}
- public void prepareRecord(long transactionID, byte[] extraData, Pair<Integer, Integer>[] summary) throws Exception
+ public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
{
if (trace)
{
@@ -1285,7 +1285,7 @@
transactionInfos.put(transactionID, journalTransaction);
}
- boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, summary);
+ boolean healthy = checkTransactionHealth(currentFile, journalTransaction, orderedFiles, numberOfRecords);
if (healthy)
{
@@ -1298,7 +1298,7 @@
}
}
- public void commitRecord(long transactionID, Pair<Integer, Integer>[] summary) throws Exception
+ public void commitRecord(long transactionID, int numberOfRecords) throws Exception
{
if (trace)
{
@@ -1323,7 +1323,10 @@
throw new IllegalStateException("Cannot find tx " + transactionID);
}
- boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, summary);
+ boolean healthy = checkTransactionHealth(currentFile,
+ journalTransaction,
+ orderedFiles,
+ numberOfRecords);
if (healthy)
{
@@ -1874,6 +1877,7 @@
long recordID = 0;
+ // If prepare or commit
if (!isCompleteTransaction(recordType))
{
if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
@@ -1934,9 +1938,7 @@
// Add the variable size required for preparedTransactions
preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
}
- // Both commit and record contain the recordSummary, and this is
- // used to calculate the record-size on both record-types
- variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
+ variableSize = 0;
}
int recordSize = getRecordSize(recordType);
@@ -2053,25 +2055,22 @@
case PREPARE_RECORD:
{
+ int numberOfRecords = wholeFileBuffer.getInt();
+
byte extraData[] = new byte[preparedTransactionExtraDataSize];
wholeFileBuffer.get(extraData);
- // Pair <FileID, NumberOfElements>
- Pair<Integer, Integer>[] summary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
+ reader.prepareRecord(transactionID, extraData, numberOfRecords);
- reader.prepareRecord(transactionID, extraData, summary);
-
break;
}
case COMMIT_RECORD:
{
- // We need to read it even if transaction was not found, or
- // the reading checks would fail
- // Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] summary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
- reader.commitRecord(transactionID, summary);
+ int numberOfRecords = wholeFileBuffer.getInt();
+
+ reader.commitRecord(transactionID, numberOfRecords);
break;
}
case ROLLBACK_RECORD:
@@ -2111,26 +2110,6 @@
}
- /** It will read the elements-summary back from the commit/prepare transaction
- * Pair<FileID, Counter> */
- @SuppressWarnings("unchecked")
- // See comment on the method body
- private Pair<Integer, Integer>[] readTransactionalElementsSummary(final int variableSize, final ByteBuffer bb)
- {
- int numberOfFiles = variableSize / (SIZE_INT * 2);
-
- // This line aways show an annoying compilation-warning, the
- // SupressWarning is to avoid a warning about this cast
- Pair<Integer, Integer> values[] = new Pair[numberOfFiles];
-
- for (int i = 0; i < numberOfFiles; i++)
- {
- values[i] = new Pair<Integer, Integer>(bb.getInt(), bb.getInt());
- }
-
- return values;
- }
-
/**
* <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
* <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
@@ -2145,9 +2124,10 @@
* @param recordedSummary
* @return
*/
- private boolean checkTransactionHealth(final JournalTransaction journalTransaction,
+ private boolean checkTransactionHealth(final JournalFile currentFile,
+ final JournalTransaction journalTransaction,
final List<JournalFile> orderedFiles,
- final Pair<Integer, Integer>[] recordedSummary)
+ final int numberOfRecords)
{
boolean healthy = true;
@@ -2156,42 +2136,11 @@
// FileID, NumberOfElements
Map<Integer, AtomicInteger> currentSummary = journalTransaction.getElementsSummary();
- // (II) We compare the recorded summary on the commit, against to the
- // reality on the files
- for (Pair<Integer, Integer> ref : recordedSummary)
- {
- AtomicInteger counter = currentSummary.get(ref.a);
+ AtomicInteger value = currentSummary.get(currentFile.getFileID());
- if (counter == null)
- {
- for (JournalFile lookupFile : orderedFiles)
- {
- if (lookupFile.getFileID() == ref.a)
- {
- // (III) oops, we were expecting at least one record on this
- // file.
- // The file still exists and no records were found.
- // That means the transaction crashed before complete,
- // so this transaction is broken and needs to be ignored.
- // This is probably a hole caused by a crash during commit.
- healthy = false;
- break;
- }
- }
- }
- else
- {
- // (IV) Missing a record... Transaction was not completed as stated.
- // we will ignore the whole transaction
- // This is probably a hole caused by a crash during commit/prepare.
- if (counter.get() != ref.b)
- {
- healthy = false;
- break;
- }
- }
- }
- return healthy;
+ Integer intValue = value == null ? 0 : value.intValue();
+
+ return intValue == numberOfRecords;
}
/**
@@ -2850,9 +2799,9 @@
public void fillNumberOfRecords(JournalFile currentFile, MessagingBuffer bb)
{
bb.writerIndex(SIZE_BYTE + SIZE_INT + SIZE_LONG);
-
+
bb.writeInt(getCounter(currentFile).intValue());
-
+
}
/** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
@@ -3222,13 +3171,13 @@
* @param extraData
* @param summaryData
*/
- void prepareRecord(long transactionID, byte[] extraData, Pair<Integer, Integer>[] summary) throws Exception;
+ void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
/**
* @param transactionID
* @param summaryData
*/
- void commitRecord(long transactionID, Pair<Integer, Integer>[] summary) throws Exception;
+ void commitRecord(long transactionID, int numberOfRecords) throws Exception;
/**
* @param transactionID
More information about the jboss-cvs-commits
mailing list