[jboss-cvs] JBoss Messaging SVN: r7373 - in trunk: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 16 19:58:22 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-16 19:58:22 -0400 (Tue, 16 Jun 2009)
New Revision: 7373

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
JBMESSAGING-1504 - Simple refactoring only at this point. I need to be able two read files in two different scenarios (loading and compacting). This commit is just refactoring the load part into a reading method (and loading should be using the readJournalFile method then).
This shouldn't make any difference on behaviour.

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-16 23:38:44 UTC (rev 7372)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-16 23:58:22 UTC (rev 7373)
@@ -45,6 +45,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.jboss.messaging.core.buffers.ChannelBuffer;
@@ -279,7 +280,7 @@
       bb.writeByte(recordType);
       record.encode(bb);
       bb.writeInt(size);
-      
+
       IOCallback callback = getSyncCallback(sync);
 
       lock.acquire();
@@ -293,7 +294,7 @@
       {
          lock.release();
       }
-      
+
       if (callback != null)
       {
          callback.waitCompletion();
@@ -331,9 +332,8 @@
       record.encode(bb);
       bb.writeInt(size);
 
-      
       IOCallback callback = getSyncCallback(sync);
-      
+
       lock.acquire();
       try
       {
@@ -345,7 +345,7 @@
       {
          lock.release();
       }
-      
+
       if (callback != null)
       {
          callback.waitCompletion();
@@ -374,7 +374,7 @@
       bb.writeInt(-1); // skip ID part
       bb.writeLong(id);
       bb.writeInt(size);
-      
+
       IOCallback callback = getSyncCallback(sync);
 
       lock.acquire();
@@ -388,14 +388,17 @@
       {
          lock.release();
       }
-      
+
       if (callback != null)
       {
          callback.waitCompletion();
       }
    }
 
-   public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record,
+   public void appendAddRecordTransactional(final long txID,
+                                            final long id,
+                                            final byte recordType,
+                                            final byte[] record,
                                             final boolean sync) throws Exception
    {
       appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record), sync);
@@ -491,13 +494,14 @@
       }
    }
 
-   public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record,
-                                               final boolean sync) throws Exception
+   public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record, final boolean sync) throws Exception
    {
       appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record), sync);
    }
 
-   public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record,
+   public void appendDeleteRecordTransactional(final long txID,
+                                               final long id,
+                                               final EncodingSupport record,
                                                final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
@@ -535,8 +539,7 @@
       }
    }
 
-   public void appendDeleteRecordTransactional(final long txID, final long id,
-                                               final boolean sync) throws Exception
+   public void appendDeleteRecordTransactional(final long txID, final long id, final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -819,490 +822,300 @@
 
       fileFactory.controlBuffersLifeCycle(false);
 
-      Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+      final Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
 
-      List<JournalFile> orderedFiles = orderFiles();
+      final List<JournalFile> orderedFiles = orderFiles();
 
       int lastDataPos = SIZE_HEADER;
 
       long maxID = -1;
 
-      for (JournalFile file : orderedFiles)
+      for (final JournalFile file : orderedFiles)
       {
-         ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
+         trace("Loading file " + file.getFile().getFileName());
 
-         file.getFile().open(1);
+         final AtomicBoolean hasData = new AtomicBoolean(false);
 
-         int bytesRead = file.getFile().read(wholeFileBuffer);
-
-         if (bytesRead != fileSize)
+         int resultLastPost = readJournalFile(file, new JournalReader()
          {
-            // FIXME - We should extract everything we can from this file
-            // and then we shouldn't ever reuse this file on reclaiming (instead
-            // reclaim on different size files would aways throw the file away)
-            // rather than throw ISE!
-            // We don't want to leave the user with an unusable system
-            throw new IllegalStateException("File is wrong size " + bytesRead +
-                                            " expected " +
-                                            fileSize +
-                                            " : " +
-                                            file.getFile().getFileName());
-         }
 
-         wholeFileBuffer.position(0);
+            public void addRecord(RecordInfo info) throws Exception
+            {
+               if (trace)
+               {
+                  trace("AddRecord: " + info);
+               }
+               hasData.set(true);
 
-         // First long is the ordering timestamp, we just jump its position
-         wholeFileBuffer.position(SIZE_HEADER);
+               loadManager.addRecord(info);
 
-         boolean hasData = false;
-
-         while (wholeFileBuffer.hasRemaining())
-         {
-            final int pos = wholeFileBuffer.position();
-
-            byte recordType = wholeFileBuffer.get();
-
-            if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
-            {
-               // I - We scan for any valid record on the file. If a hole
-               // happened on the middle of the file we keep looking until all
-               // the possibilities are gone
-               continue;
+               posFilesMap.put(info.id, new PosFiles(file));
+               
+               PosFiles file = posFilesMap.get(info.id);
             }
 
-            if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
+            public void updateRecord(RecordInfo info) throws Exception
             {
-               hasData = true;
-               wholeFileBuffer.position(pos + 1);
-               // II - Ignore this record, lets keep looking
-               continue;
-            }
-
-            // III - Every record has the file-id.
-            // This is what supports us from not re-filling the whole file
-            int readFileId = wholeFileBuffer.getInt();
-
-            long transactionID = 0;
-
-            if (isTransaction(recordType))
-            {
-               if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+               if (trace)
                {
-                  wholeFileBuffer.position(pos + 1);
-                  hasData = true;
-                  continue;
+                  trace("UpdateRecord: " + info);
                }
+               hasData.set(true);
 
-               transactionID = wholeFileBuffer.getLong();
-            }
+               loadManager.updateRecord(info);
 
-            long recordID = 0;
+               PosFiles posFiles = posFilesMap.get(info.id);
 
-            if (!isCompleteTransaction(recordType))
-            {
-               if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+               if (posFiles != null)
                {
-                  wholeFileBuffer.position(pos + 1);
-                  hasData = true;
-                  continue;
+                  // It's legal for this to be null. The file(s) with the may
+                  // have been deleted
+                  // just leaving some updates in this file
+
+                  posFiles.addUpdateFile(file);
                }
-
-               recordID = wholeFileBuffer.getLong();
-
-               maxID = Math.max(maxID, recordID);
             }
 
-            // We use the size of the record to validate the health of the
-            // record.
-            // (V) We verify the size of the record
-
-            // The variable record portion used on Updates and Appends
-            int variableSize = 0;
-
-            // Used to hold extra data on transaction prepares
-            int preparedTransactionExtraDataSize = 0;
-
-            byte userRecordType = 0;
-
-            byte record[] = null;
-
-            if (isContainsBody(recordType))
+            public void deleteRecord(long recordID) throws Exception
             {
-               if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
+               if (trace)
                {
-                  wholeFileBuffer.position(pos + 1);
-                  hasData = true;
-                  continue;
+                  trace("DeleteRecord: " + recordID);
                }
+               hasData.set(true);
 
-               variableSize = wholeFileBuffer.getInt();
+               loadManager.deleteRecord(recordID);
 
-               if (isInvalidSize(wholeFileBuffer.position(), variableSize))
-               {
-                  wholeFileBuffer.position(pos + 1);
-                  continue;
-               }
+               PosFiles posFiles = posFilesMap.remove(recordID);
 
-               if (recordType != DELETE_RECORD_TX)
+               if (posFiles != null)
                {
-                  userRecordType = wholeFileBuffer.get();
+                  posFiles.addDelete(file);
                }
-
-               record = new byte[variableSize];
-
-               wholeFileBuffer.get(record);
             }
 
-            if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+            public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
             {
-               if (recordType == PREPARE_RECORD)
-               {
-                  // Add the variable size required for preparedTransactions
-                  preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
-               }
-               // Both commit and record contain the recordSummary, and this is
-               // used to calculate the record-size on both record-types
-               variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
+               addRecordTX(transactionID, info);
             }
 
-            int recordSize = getRecordSize(recordType);
-
-            // VI - this is completing V, We will validate the size at the end
-            // of the record,
-            // But we avoid buffer overflows by damaged data
-            if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
+            public void addRecordTX(long transactionID, RecordInfo info) throws Exception
             {
-               // Avoid a buffer overflow caused by damaged data... continue
-               // scanning for more records...
-               log.debug("Record at position " + pos +
-                        " recordType = " + recordType +
-                        " file:" + file.getFile().getFileName() +
-                        " recordSize: " + recordSize + 
-                        " variableSize: " + variableSize +
-                        " preparedTransactionExtraDataSize: " + preparedTransactionExtraDataSize + 
-                        " is corrupted and it is being ignored (II)");
-               // If a file has damaged records, we make it a dataFile, and the
-               // next reclaiming will fix it
-               hasData = true;
-               wholeFileBuffer.position(pos + 1);
 
-               continue;
-            }
+               if (trace)
+               {
+                  trace((info.isUpdate ? "updateRecordTX: " : "addRecordTX: ") + info + ", txid = " + transactionID);
+               }
 
-            int oldPos = wholeFileBuffer.position();
+               hasData.set(true);
 
-            wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+               TransactionHolder tx = transactions.get(transactionID);
 
-            int checkSize = wholeFileBuffer.getInt();
+               if (tx == null)
+               {
+                  tx = new TransactionHolder(transactionID);
 
-            // VII - The checkSize at the end has to match with the size
-            // informed at the beggining.
-            // This is like testing a hash for the record. (We could replace the
-            // checkSize by some sort of calculated hash)
-            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
-            {
-               log.debug("Record at position " + pos +
-                        " recordType = " +
-                        recordType +
-                        " file:" +
-                        file.getFile().getFileName() +
-                        " is corrupted and it is being ignored (III)");
+                  transactions.put(transactionID, tx);
+               }
 
-               // If a file has damaged records, we make it a dataFile, and the
-               // next reclaiming will fix it
-               hasData = true;
+               tx.recordInfos.add(info);
 
-               wholeFileBuffer.position(pos + SIZE_BYTE);
+               JournalTransaction tnp = transactionInfos.get(transactionID);
 
-               continue;
-            }
+               if (tnp == null)
+               {
+                  tnp = new JournalTransaction();
 
-            // This record is from a previous file-usage. The file was
-            // reused and we need to ignore this record
-            if (readFileId != file.getOrderingID())
-            {
-               // If a file has damaged records, we make it a dataFile, and the
-               // next reclaiming will fix it
-               hasData = true;
+                  transactionInfos.put(transactionID, tnp);
+               }
 
-               continue;
+               tnp.addPositive(file, info.id);
             }
 
-            wholeFileBuffer.position(oldPos);
-
-            // At this point everything is checked. So we relax and just load
-            // the data now.
-
-            switch (recordType)
+            public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
             {
-               case ADD_RECORD:
+               if (trace)
                {
-                  loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+                  trace("DeleteRecordTX: " + transactionID + " info = " + info);
+               }
 
-                  posFilesMap.put(recordID, new PosFiles(file));
+               hasData.set(true);
 
-                  hasData = true;
+               TransactionHolder tx = transactions.get(transactionID);
 
-                  break;
-               }
-               case UPDATE_RECORD:
+               if (tx == null)
                {
-                  loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+                  tx = new TransactionHolder(transactionID);
 
-                  hasData = true;
+                  transactions.put(transactionID, tx);
+               }
 
-                  PosFiles posFiles = posFilesMap.get(recordID);
+               tx.recordsToDelete.add(info);
 
-                  if (posFiles != null)
-                  {
-                     // It's legal for this to be null. The file(s) with the may
-                     // have been deleted
-                     // just leaving some updates in this file
+               JournalTransaction tnp = transactionInfos.get(transactionID);
 
-                     posFiles.addUpdateFile(file);
-                  }
+               if (tnp == null)
+               {
+                  tnp = new JournalTransaction();
 
-                  break;
+                  transactionInfos.put(transactionID, tnp);
                }
-               case DELETE_RECORD:
-               {
-                  loadManager.deleteRecord(recordID);
 
-                  hasData = true;
+               tnp.addNegative(file, info.id);
 
-                  PosFiles posFiles = posFilesMap.remove(recordID);
+            }
 
-                  if (posFiles != null)
-                  {
-                     posFiles.addDelete(file);
-                  }
-
-                  break;
+            public void prepareRecord(long transactionID, byte[] extraData, Pair<Integer, Integer>[] summary) throws Exception
+            {
+               if (trace)
+               {
+                  trace("prepareRecordTX: txid = " + transactionID);
                }
-               case ADD_RECORD_TX:
-               case UPDATE_RECORD_TX:
-               {
-                  TransactionHolder tx = transactions.get(transactionID);
 
-                  if (tx == null)
-                  {
-                     tx = new TransactionHolder(transactionID);
+               hasData.set(true);
 
-                     transactions.put(transactionID, tx);
-                  }
+               TransactionHolder tx = transactions.get(transactionID);
 
-                  tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType == UPDATE_RECORD_TX));
+               if (tx == null)
+               {
+                  // The user could choose to prepare empty transactions
+                  tx = new TransactionHolder(transactionID);
 
-                  JournalTransaction tnp = transactionInfos.get(transactionID);
+                  transactions.put(transactionID, tx);
+               }
 
-                  if (tnp == null)
-                  {
-                     tnp = new JournalTransaction();
+               tx.prepared = true;
 
-                     transactionInfos.put(transactionID, tnp);
-                  }
+               tx.extraData = extraData;
 
-                  tnp.addPositive(file, recordID);
+               JournalTransaction journalTransaction = transactionInfos.get(transactionID);
 
-                  hasData = true;
+               if (journalTransaction == null)
+               {
+                  journalTransaction = new JournalTransaction();
 
-                  break;
+                  transactionInfos.put(transactionID, journalTransaction);
                }
-               case DELETE_RECORD_TX:
-               {
-                  TransactionHolder tx = transactions.get(transactionID);
 
-                  if (tx == null)
-                  {
-                     tx = new TransactionHolder(transactionID);
+               boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, summary);
 
-                     transactions.put(transactionID, tx);
-                  }
-
-                  tx.recordsToDelete.add(new RecordInfo(recordID, (byte)0, record, true));
-
-                  JournalTransaction tnp = transactionInfos.get(transactionID);
-
-                  if (tnp == null)
-                  {
-                     tnp = new JournalTransaction();
-
-                     transactionInfos.put(transactionID, tnp);
-                  }
-
-                  tnp.addNegative(file, recordID);
-
-                  hasData = true;
-
-                  break;
+               if (healthy)
+               {
+                  journalTransaction.prepare(file);
                }
-               case PREPARE_RECORD:
+               else
                {
-                  TransactionHolder tx = transactions.get(transactionID);
+                  log.warn("Prepared transaction " + transactionID + " wasn't considered completed, it will be ignored");
+                  tx.invalid = true;
+               }
+            }
 
-                  if (tx == null)
-                  {
-                     // The user could choose to prepare empty transactions
-                     tx = new TransactionHolder(transactionID);
+            public void commitRecord(long transactionID, Pair<Integer, Integer>[] summary) throws Exception
+            {
+               if (trace)
+               {
+                  trace("commitRecord: txid = " + transactionID);
+               }
 
-                     transactions.put(transactionID, tx);
-                  }
+               TransactionHolder tx = transactions.remove(transactionID);
 
-                  byte extraData[] = new byte[preparedTransactionExtraDataSize];
+               // The commit could be alone on its own journal-file and the
+               // whole transaction body was reclaimed but not the
+               // commit-record
+               // So it is completely legal to not find a transaction at this
+               // point
+               // If we can't find it, we assume the TX was reclaimed and we
+               // ignore this
+               if (tx != null)
+               {
+                  JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
 
-                  wholeFileBuffer.get(extraData);
-
-                  // Pair <FileID, NumberOfElements>
-                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize,
-                                                                                              wholeFileBuffer);
-
-                  tx.prepared = true;
-
-                  tx.extraData = extraData;
-
-                  JournalTransaction journalTransaction = transactionInfos.get(transactionID);
-
                   if (journalTransaction == null)
                   {
-                     journalTransaction = new JournalTransaction();
-
-                     transactionInfos.put(transactionID, journalTransaction);
+                     throw new IllegalStateException("Cannot find tx " + transactionID);
                   }
 
-                  boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
+                  boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, summary);
 
                   if (healthy)
                   {
-                     journalTransaction.prepare(file);
-                  }
-                  else
-                  {
-                     log.warn("Prepared transaction " + transactionID +
-                              " wasn't considered completed, it will be ignored");
-                     tx.invalid = true;
-                  }
-
-                  hasData = true;
-
-                  break;
-               }
-               case COMMIT_RECORD:
-               {
-                  TransactionHolder tx = transactions.remove(transactionID);
-
-                  // We need to read it even if transaction was not found, or
-                  // the reading checks would fail
-                  // Pair <OrderId, NumberOfElements>
-                  Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize,
-                                                                                              wholeFileBuffer);
-
-                  // The commit could be alone on its own journal-file and the
-                  // whole transaction body was reclaimed but not the
-                  // commit-record
-                  // So it is completely legal to not find a transaction at this
-                  // point
-                  // If we can't find it, we assume the TX was reclaimed and we
-                  // ignore this
-                  if (tx != null)
-                  {
-                     JournalTransaction journalTransaction = transactionInfos.remove(transactionID);
-
-                     if (journalTransaction == null)
+                     for (RecordInfo txRecord : tx.recordInfos)
                      {
-                        throw new IllegalStateException("Cannot find tx " + transactionID);
-                     }
-
-                     boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
-
-                     if (healthy)
-                     {
-                        for (RecordInfo txRecord : tx.recordInfos)
+                        if (txRecord.isUpdate)
                         {
-                           if (txRecord.isUpdate)
-                           {
-                              loadManager.updateRecord(txRecord);
-                           }
-                           else
-                           {
-                              loadManager.addRecord(txRecord);
-                           }
+                           loadManager.updateRecord(txRecord);
                         }
-
-                        for (RecordInfo deleteValue : tx.recordsToDelete)
+                        else
                         {
-                           loadManager.deleteRecord(deleteValue.id);
+                           loadManager.addRecord(txRecord);
                         }
+                     }
 
-                        journalTransaction.commit(file);
-                     }
-                     else
+                     for (RecordInfo deleteValue : tx.recordsToDelete)
                      {
-                        log.warn("Transaction " + transactionID +
-                                 " is missing elements so the transaction is being ignored");
-
-                        journalTransaction.forget();
+                        loadManager.deleteRecord(deleteValue.id);
                      }
 
-                     hasData = true;
+                     journalTransaction.commit(file);
                   }
+                  else
+                  {
+                     log.warn("Transaction " + transactionID +
+                              " is missing elements so the transaction is being ignored");
 
-                  break;
+                     journalTransaction.forget();
+                  }
+
+                  hasData.set(true);
                }
-               case ROLLBACK_RECORD:
+
+            }
+
+            public void rollbackRecord(long transactionID) throws Exception
+            {
+               if (trace)
                {
-                  TransactionHolder tx = transactions.remove(transactionID);
+                  trace("rollbackRecord: txid = " + transactionID);
+               }
 
-                  // The rollback could be alone on its own journal-file and the
-                  // whole transaction body was reclaimed but the commit-record
-                  // So it is completely legal to not find a transaction at this
-                  // point
-                  if (tx != null)
-                  {
-                     JournalTransaction tnp = transactionInfos.remove(transactionID);
+               TransactionHolder tx = transactions.remove(transactionID);
 
-                     if (tnp == null)
-                     {
-                        throw new IllegalStateException("Cannot find tx " + transactionID);
-                     }
+               // The rollback could be alone on its own journal-file and the
+               // whole transaction body was reclaimed but the commit-record
+               // So it is completely legal to not find a transaction at this
+               // point
+               if (tx != null)
+               {
+                  JournalTransaction tnp = transactionInfos.remove(transactionID);
 
-                     // There is no need to validate summaries/holes on
-                     // Rollbacks.. We will ignore the data anyway.
-                     tnp.rollback(file);
-
-                     hasData = true;
+                  if (tnp == null)
+                  {
+                     throw new IllegalStateException("Cannot find tx " + transactionID);
                   }
 
-                  break;
+                  // There is no need to validate summaries/holes on
+                  // Rollbacks.. We will ignore the data anyway.
+                  tnp.rollback(file);
+
+                  hasData.set(true);
                }
-               default:
+            }
+
+            public void markAsDataFile(JournalFile file)
+            {
+               if (trace)
                {
-                  throw new IllegalStateException("Journal " + file.getFile().getFileName() +
-                                                  " is corrupt, invalid record type " +
-                                                  recordType);
+                  trace("Marking " + file + " as data file");
                }
-            }
 
-            checkSize = wholeFileBuffer.getInt();
-
-            // This is a sanity check about the loading code itself.
-            // If this checkSize doesn't match, it means the reading method is
-            // not doing what it was supposed to do
-            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
-            {
-               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
-                                               ", pos = " +
-                                               pos);
+               hasData.set(true);
             }
 
-            lastDataPos = wholeFileBuffer.position();
-         }
+         });
 
-         fileFactory.releaseBuffer(wholeFileBuffer);
-
-         file.getFile().close();
-
-         if (hasData)
+         if (hasData.get())
          {
+            lastDataPos = resultLastPost;
             dataFiles.add(file);
          }
          else
@@ -1354,7 +1167,7 @@
 
          openFile(currentFile);
       }
-      
+
       fileFactory.activate(currentFile.getFile());
 
       pushOpenedFile();
@@ -1463,7 +1276,7 @@
    /** Method for use on testcases.
     *  It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
    public void debugWait() throws Exception
-   {         
+   {
       for (TransactionCallback callback : transactionCallbacks.values())
       {
          callback.waitCompletion();
@@ -1583,7 +1396,7 @@
          lock.release();
       }
    }
-   
+
    public void perfBlast(final int pages) throws Exception
    {
       new PerfBlast(pages).start();
@@ -1605,7 +1418,7 @@
       }
 
       filesExecutor = Executors.newSingleThreadExecutor();
-      
+
       fileFactory.start();
 
       state = STATE_STARTED;
@@ -1680,13 +1493,13 @@
       SequentialFile sf = file.getFile();
 
       sf.open(1);
-      
+
       sf.position(0);
 
       ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
-      
+
       bb.putInt(newOrderingID);
-      
+
       bb.rewind();
 
       sf.write(bb, true);
@@ -1700,6 +1513,317 @@
       return jf;
    }
 
+   
+   private int readJournalFile(JournalFile file, JournalReader reader) throws Exception
+   {
+      ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
+
+      file.getFile().open(1);
+
+      int bytesRead = file.getFile().read(wholeFileBuffer);
+
+      if (bytesRead != fileSize)
+      {
+         // FIXME - We should extract everything we can from this file
+         // and then we shouldn't ever reuse this file on reclaiming (instead
+         // reclaim on different size files would aways throw the file away)
+         // rather than throw ISE!
+         // We don't want to leave the user with an unusable system
+         throw new IllegalStateException("File is wrong size " + bytesRead +
+                                         " expected " +
+                                         fileSize +
+                                         " : " +
+                                         file.getFile().getFileName());
+      }
+
+      wholeFileBuffer.position(0);
+
+      // First long is the ordering timestamp, we just jump its position
+      wholeFileBuffer.position(SIZE_HEADER);
+
+      int lastDataPos = SIZE_HEADER;
+
+      while (wholeFileBuffer.hasRemaining())
+      {
+         final int pos = wholeFileBuffer.position();
+
+         byte recordType = wholeFileBuffer.get();
+
+         if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+         {
+            // I - We scan for any valid record on the file. If a hole
+            // happened on the middle of the file we keep looking until all
+            // the possibilities are gone
+            continue;
+         }
+
+         if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
+         {
+            reader.markAsDataFile(file);
+
+            wholeFileBuffer.position(pos + 1);
+            // II - Ignore this record, lets keep looking
+            continue;
+         }
+
+         // III - Every record has the file-id.
+         // This is what supports us from not re-filling the whole file
+         int readFileId = wholeFileBuffer.getInt();
+
+         long transactionID = 0;
+
+         if (isTransaction(recordType))
+         {
+            if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+            {
+               wholeFileBuffer.position(pos + 1);
+               reader.markAsDataFile(file);
+               continue;
+            }
+
+            transactionID = wholeFileBuffer.getLong();
+         }
+
+         long recordID = 0;
+
+         if (!isCompleteTransaction(recordType))
+         {
+            if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
+            {
+               wholeFileBuffer.position(pos + 1);
+               reader.markAsDataFile(file);
+               continue;
+            }
+
+            recordID = wholeFileBuffer.getLong();
+         }
+
+         // We use the size of the record to validate the health of the
+         // record.
+         // (V) We verify the size of the record
+
+         // The variable record portion used on Updates and Appends
+         int variableSize = 0;
+
+         // Used to hold extra data on transaction prepares
+         int preparedTransactionExtraDataSize = 0;
+
+         byte userRecordType = 0;
+
+         byte record[] = null;
+
+         if (isContainsBody(recordType))
+         {
+            if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
+            {
+               wholeFileBuffer.position(pos + 1);
+               reader.markAsDataFile(file);
+               continue;
+            }
+
+            variableSize = wholeFileBuffer.getInt();
+
+            if (isInvalidSize(wholeFileBuffer.position(), variableSize))
+            {
+               wholeFileBuffer.position(pos + 1);
+               continue;
+            }
+
+            if (recordType != DELETE_RECORD_TX)
+            {
+               userRecordType = wholeFileBuffer.get();
+            }
+
+            record = new byte[variableSize];
+
+            wholeFileBuffer.get(record);
+         }
+
+         if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+         {
+            if (recordType == PREPARE_RECORD)
+            {
+               // Add the variable size required for preparedTransactions
+               preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
+            }
+            // Both commit and record contain the recordSummary, and this is
+            // used to calculate the record-size on both record-types
+            variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
+         }
+
+         int recordSize = getRecordSize(recordType);
+
+         // VI - this is completing V, We will validate the size at the end
+         // of the record,
+         // But we avoid buffer overflows by damaged data
+         if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
+         {
+            // Avoid a buffer overflow caused by damaged data... continue
+            // scanning for more records...
+            trace("Record at position " + pos +
+                  " recordType = " +
+                  recordType +
+                  " file:" +
+                  file.getFile().getFileName() +
+                  " recordSize: " +
+                  recordSize +
+                  " variableSize: " +
+                  variableSize +
+                  " preparedTransactionExtraDataSize: " +
+                  preparedTransactionExtraDataSize +
+                  " is corrupted and it is being ignored (II)");
+            // If a file has damaged records, we make it a dataFile, and the
+            // next reclaiming will fix it
+            reader.markAsDataFile(file);
+            wholeFileBuffer.position(pos + 1);
+
+            continue;
+         }
+
+         int oldPos = wholeFileBuffer.position();
+
+         wholeFileBuffer.position(pos + variableSize + recordSize + preparedTransactionExtraDataSize - SIZE_INT);
+
+         int checkSize = wholeFileBuffer.getInt();
+
+         // VII - The checkSize at the end has to match with the size
+         // informed at the beggining.
+         // This is like testing a hash for the record. (We could replace the
+         // checkSize by some sort of calculated hash)
+         if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+         {
+            trace("Record at position " + pos +
+                  " recordType = " +
+                  recordType +
+                  " file:" +
+                  file.getFile().getFileName() +
+                  " is corrupted and it is being ignored (III)");
+
+            // If a file has damaged records, we make it a dataFile, and the
+            // next reclaiming will fix it
+            reader.markAsDataFile(file);
+
+            wholeFileBuffer.position(pos + SIZE_BYTE);
+
+            continue;
+         }
+
+         // This record is from a previous file-usage. The file was
+         // reused and we need to ignore this record
+         if (readFileId != file.getOrderingID())
+         {
+            // If a file has damaged records, we make it a dataFile, and the
+            // next reclaiming will fix it
+            reader.markAsDataFile(file);
+
+            continue;
+         }
+
+         wholeFileBuffer.position(oldPos);
+
+         // At this point everything is checked. So we relax and just load
+         // the data now.
+
+         switch (recordType)
+         {
+            case ADD_RECORD:
+            {
+               reader.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+               break;
+            }
+            
+            case UPDATE_RECORD:
+            {
+               reader.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+               break;
+            }
+            
+            case DELETE_RECORD:
+            {
+               reader.deleteRecord(recordID);
+               break;
+            }
+
+            case ADD_RECORD_TX:
+            {
+               reader.addRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+               break;
+            }
+            
+            case UPDATE_RECORD_TX:
+            {
+               reader.updateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+               break;
+            }
+            
+            case DELETE_RECORD_TX:
+            {
+               reader.deleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+               break;
+            }
+            
+            case PREPARE_RECORD:
+            {
+
+               byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+               wholeFileBuffer.get(extraData);
+
+               // Pair <FileID, NumberOfElements>
+               Pair<Integer, Integer>[] summary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
+
+               reader.prepareRecord(transactionID, extraData, summary);
+
+               break;
+            }
+            case COMMIT_RECORD:
+            {
+               // We need to read it even if transaction was not found, or
+               // the reading checks would fail
+               // Pair <OrderId, NumberOfElements>
+               Pair<Integer, Integer>[] summary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
+
+               reader.commitRecord(transactionID, summary);
+               break;
+            }
+            case ROLLBACK_RECORD:
+            {
+               reader.rollbackRecord(transactionID);
+               break;
+            }
+            default:
+            {
+               throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+                                               " is corrupt, invalid record type " +
+                                               recordType);
+            }
+         }
+
+         checkSize = wholeFileBuffer.getInt();
+
+         // This is a sanity check about the loading code itself.
+         // If this checkSize doesn't match, it means the reading method is
+         // not doing what it was supposed to do
+         if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+         {
+            throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
+                                            ", pos = " +
+                                            pos);
+         }
+
+         lastDataPos = wholeFileBuffer.position();
+
+      }
+
+      fileFactory.releaseBuffer(wholeFileBuffer);
+
+      file.getFile().close();
+
+      return lastDataPos;
+
+   }
+
+   
    /** It will read the elements-summary back from the commit/prepare transaction 
     *  Pair<FileID, Counter> */
    @SuppressWarnings("unchecked")
@@ -1806,9 +1930,9 @@
     * @throws Exception
     */
    private ChannelBuffer writeTransaction(final byte recordType,
-                                       final long txID,
-                                       final JournalTransaction tx,
-                                       final EncodingSupport transactionData) throws Exception
+                                          final long txID,
+                                          final JournalTransaction tx,
+                                          final EncodingSupport transactionData) throws Exception
    {
       int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() *
                  SIZE_INT *
@@ -1957,7 +2081,6 @@
 
       return orderedFiles;
    }
-            
 
    /** 
     * Note: You should aways guarantee locking the semaphore lock.
@@ -1987,7 +2110,7 @@
             currentFile.getFile().unlockBuffer();
             moveNextFile();
             currentFile.getFile().lockBuffer();
-            
+
             // The same check needs to be done at the new file also
             if (!currentFile.getFile().fits(size))
             {
@@ -2082,7 +2205,7 @@
       closeFile(currentFile);
 
       currentFile = enqueueOpenFile();
-      
+
       fileFactory.activate(currentFile.getFile());
    }
 
@@ -2210,8 +2333,7 @@
 
       return tx;
    }
-   
-   
+
    private IOCallback getSyncCallback(boolean sync)
    {
       if (fileFactory.isSupportsCallbacks())
@@ -2351,6 +2473,26 @@
             }
          }
       }
+
+      public String toString()
+      {
+         StringBuffer buffer = new StringBuffer();
+         buffer.append("PosFiles(add=" + addFile.getFile().getFileName());
+
+         if (updateFiles != null)
+         {
+
+            for (JournalFile update : updateFiles)
+            {
+               buffer.append(", update=" + update.getFile().getFileName());
+            }
+
+         }
+
+         buffer.append(")");
+
+         return buffer.toString();
+      }
    }
 
    private class JournalTransaction
@@ -2564,25 +2706,25 @@
    private class PerfBlast extends Thread
    {
       private final int pages;
-      
+
       private PerfBlast(final int pages)
       {
-         this.pages = pages;         
+         this.pages = pages;
       }
-      
+
       public void run()
-      {             
+      {
          try
-         {            
+         {
             lock.acquire();
-         
+
             MessagingBuffer bb = newBuffer(128 * 1024);
-            
+
             for (int i = 0; i < pages; i++)
             {
                appendRecord(bb, false, null);
             }
-            
+
             lock.release();
          }
          catch (Exception e)
@@ -2591,5 +2733,62 @@
          }
       }
    }
-   
+
+   private static interface JournalReader
+   {
+      void addRecord(RecordInfo info) throws Exception;
+
+      /**
+       * @param recordInfo
+       * @throws Exception 
+       */
+      void updateRecord(RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param recordID
+       */
+      void deleteRecord(long recordID) throws Exception;
+
+      /**
+       * @param transactionID
+       * @param recordInfo
+       * @throws Exception 
+       */
+      void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param transactionID
+       * @param recordInfo
+       * @throws Exception 
+       */
+      void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param transactionID
+       * @param recordInfo
+       */
+      void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+      /**
+       * @param transactionID
+       * @param extraData
+       * @param summaryData
+       */
+      void prepareRecord(long transactionID, byte[] extraData, Pair<Integer, Integer>[] summary) throws Exception;
+
+      /**
+       * @param transactionID
+       * @param summaryData
+       */
+      void commitRecord(long transactionID, Pair<Integer, Integer>[] summary) throws Exception;
+
+      /**
+       * @param transactionID
+       */
+      void rollbackRecord(long transactionID) throws Exception;
+
+      public void markAsDataFile(JournalFile file);
+
+   }
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-16 23:38:44 UTC (rev 7372)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-16 23:58:22 UTC (rev 7373)
@@ -2944,30 +2944,30 @@
       createJournal();
       startJournal();
       load();
+      
+      int transactionID = 0;
 
       for (int i = 0; i < 100; i++)
       {
          add(i);
          if (i % 10 == 0 && i > 0)
          {
-            System.out.println("new file at " + i);
             journal.forceMoveNextFile();
          }
          update(i);
 
       }
 
-      for (int i = 0; i < 100; i++)
+      for (int i = 100; i < 200; i++)
       {
 
-         addTx(i, i + 100);
+         addTx(transactionID, i);
          updateTx(i + 100);
          if (i % 10 == 0 && i > 0)
          {
-            System.out.println("new file at " + i);
             journal.forceMoveNextFile();
          }
-         commit(i);
+         commit(transactionID++);
          update(i);
       }
 
@@ -2986,22 +2986,44 @@
 
       journal.forceMoveNextFile();
 
-      for (int i = 0; i < 200; i++)
+      for (int i = 0; i < 100; i++)
       {
          delete(i);
       }
 
+      for (int i = 100; i < 200; i++)
+      {
+         updateTx(transactionID, i);
+      }
+
       journal.forceMoveNextFile();
-
+      
+      commit(transactionID++);
+      
+      for (int i = 100; i < 200; i++)
+      {
+         updateTx(transactionID, i);
+         deleteTx(transactionID, i);
+      }
+      
+      commit(transactionID++);
+      
+ 
       System.out.println("Before reclaim ****************************");
       System.out.println(journal.debug());
       System.out.println("*****************************************");
 
-      journal.checkAndReclaimFiles();
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
 
       System.out.println("After reclaim ****************************");
       System.out.println(journal.debug());
       System.out.println("*****************************************");
+      
+      journal.forceMoveNextFile();
+      journal.checkAndReclaimFiles();
 
       assertEquals(0, journal.getDataFilesCount());
 




More information about the jboss-cvs-commits mailing list