[jboss-cvs] JBoss Messaging SVN: r7518 - trunk/src/main/org/jboss/messaging/core/journal/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jul 2 19:00:15 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-07-02 19:00:14 -0400 (Thu, 02 Jul 2009)
New Revision: 7518

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
Just formatting.. no code changes at this commit.

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-07-02 22:02:47 UTC (rev 7517)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-07-02 23:00:14 UTC (rev 7518)
@@ -207,7 +207,7 @@
 
    // This will be set only while the JournalCompactor is being executed
    private volatile JournalCompactor compactor;
-   
+
    // Latch used to wait compactor finish, to make sure we won't stop the journal with the compactor running
    private final VariableLatch compactorWait = new VariableLatch();
 
@@ -298,6 +298,180 @@
    // Public methods (used by package members such as JournalCompactor) (these methods are not part of the JournalImpl
    // interface)
 
+   /**
+    * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
+    * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file. 
+    *    (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
+    * <p>The element-summary will then have</p>
+    * <p>FileID1, 10</p>
+    * <p>FileID2, 10</p>
+    * <p>FileID3, 10</p>
+    * 
+    * <br>
+    * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
+    * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
+    * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed. 
+    *     That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
+    * 
+    * @param recordType
+    * @param txID
+    * @param tx
+    * @param transactionData
+    * @return
+    * @throws Exception
+    */
+   public static void writeTransaction(final int fileID,
+                                       final byte recordType,
+                                       final long txID,
+                                       final JournalTransaction tx,
+                                       final EncodingSupport transactionData,
+                                       final int size,
+                                       final int numberOfRecords,
+                                       final ChannelBuffer bb) throws Exception
+   {
+      bb.writeByte(recordType);
+      bb.writeInt(fileID); // skip ID part
+      bb.writeLong(txID);
+      bb.writeInt(numberOfRecords); // skip number of pendingTransactions part
+
+      if (transactionData != null)
+      {
+         bb.writeInt(transactionData.getEncodeSize());
+      }
+
+      if (transactionData != null)
+      {
+         transactionData.encode(bb);
+      }
+
+      bb.writeInt(size);
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param recordType
+    * @param record
+    * @param size
+    * @param bb
+    */
+   public static void writeUpdateRecordTX(final int fileID,
+                                          final long txID,
+                                          final long id,
+                                          final byte recordType,
+                                          final EncodingSupport record,
+                                          final int size,
+                                          final ChannelBuffer bb)
+   {
+      bb.writeByte(UPDATE_RECORD_TX);
+      bb.writeInt(fileID);
+      bb.writeLong(txID);
+      bb.writeLong(id);
+      bb.writeInt(record.getEncodeSize());
+      bb.writeByte(recordType);
+      record.encode(bb);
+      bb.writeInt(size);
+   }
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param size
+    * @param bb
+    */
+   public static void writeUpdateRecord(final int fileId,
+                                        final long id,
+                                        final byte recordType,
+                                        final EncodingSupport record,
+                                        final int size,
+                                        final ChannelBuffer bb)
+   {
+      bb.writeByte(UPDATE_RECORD);
+      bb.writeInt(fileId);
+      bb.writeLong(id);
+      bb.writeInt(record.getEncodeSize());
+      bb.writeByte(recordType);
+      record.encode(bb);
+      bb.writeInt(size);
+   }
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param size
+    * @param bb
+    */
+   public static void writeAddRecord(final int fileId,
+                                     final long id,
+                                     final byte recordType,
+                                     final EncodingSupport record,
+                                     final int size,
+                                     final ChannelBuffer bb)
+   {
+      bb.writeByte(ADD_RECORD);
+      bb.writeInt(fileId);
+      bb.writeLong(id);
+      bb.writeInt(record.getEncodeSize());
+      bb.writeByte(recordType);
+      record.encode(bb);
+      bb.writeInt(size);
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param record
+    * @param size
+    * @param bb
+    */
+   public static void writeDeleteRecordTransactional(final int fileID,
+                                                     final long txID,
+                                                     final long id,
+                                                     final EncodingSupport record,
+                                                     final int size,
+                                                     final ChannelBuffer bb)
+   {
+      bb.writeByte(DELETE_RECORD_TX);
+      bb.writeInt(fileID);
+      bb.writeLong(txID);
+      bb.writeLong(id);
+      bb.writeInt(record != null ? record.getEncodeSize() : 0);
+      if (record != null)
+      {
+         record.encode(bb);
+      }
+      bb.writeInt(size);
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param recordType
+    * @param record
+    * @param recordLength
+    * @param size
+    * @param bb
+    */
+   public static void writeAddRecordTX(final int fileID,
+                                       final long txID,
+                                       final long id,
+                                       final byte recordType,
+                                       final EncodingSupport record,
+                                       final int size,
+                                       final ChannelBuffer bb)
+   {
+      bb.writeByte(ADD_RECORD_TX);
+      bb.writeInt(fileID);
+      bb.writeLong(txID);
+      bb.writeLong(id);
+      bb.writeInt(record.getEncodeSize());
+      bb.writeByte(recordType);
+      record.encode(bb);
+      bb.writeInt(size);
+   }
+
    public Map<Long, JournalRecord> getRecords()
    {
       return records;
@@ -310,9 +484,334 @@
 
    public JournalCompactor getCompactor()
    {
-      return this.compactor;
+      return compactor;
    }
 
+   public static int readJournalFile(final SequentialFileFactory fileFactory,
+                                     final JournalFile file,
+                                     final JournalReaderCallback reader) throws Exception
+   {
+
+      file.getFile().open(1);
+      ByteBuffer wholeFileBuffer = null;
+      try
+      {
+
+         wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
+
+         final int journalFileSize = file.getFile().read(wholeFileBuffer);
+
+         if (journalFileSize != file.getFile().size())
+         {
+            throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
+         }
+
+         wholeFileBuffer.position(0);
+
+         // First long is the ordering timestamp, we just jump its position
+         wholeFileBuffer.position(SIZE_HEADER);
+
+         int lastDataPos = SIZE_HEADER;
+
+         while (wholeFileBuffer.hasRemaining())
+         {
+            final int pos = wholeFileBuffer.position();
+
+            byte recordType = wholeFileBuffer.get();
+
+            if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+            {
+               // I - We scan for any valid record on the file. If a hole
+               // happened on the middle of the file we keep looking until all
+               // the possibilities are gone
+               continue;
+            }
+
+            if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
+            {
+               reader.markAsDataFile(file);
+
+               wholeFileBuffer.position(pos + 1);
+               // II - Ignore this record, lets keep looking
+               continue;
+            }
+
+            // III - Every record has the file-id.
+            // This is what supports us from not re-filling the whole file
+            int readFileId = wholeFileBuffer.getInt();
+
+            long transactionID = 0;
+
+            if (isTransaction(recordType))
+            {
+               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  reader.markAsDataFile(file);
+                  continue;
+               }
+
+               transactionID = wholeFileBuffer.getLong();
+            }
+
+            long recordID = 0;
+
+            // If prepare or commit
+            if (!isCompleteTransaction(recordType))
+            {
+               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  reader.markAsDataFile(file);
+                  continue;
+               }
+
+               recordID = wholeFileBuffer.getLong();
+            }
+
+            // We use the size of the record to validate the health of the
+            // record.
+            // (V) We verify the size of the record
+
+            // The variable record portion used on Updates and Appends
+            int variableSize = 0;
+
+            // Used to hold extra data on transaction prepares
+            int preparedTransactionExtraDataSize = 0;
+
+            byte userRecordType = 0;
+
+            byte record[] = null;
+
+            if (isContainsBody(recordType))
+            {
+               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  reader.markAsDataFile(file);
+                  continue;
+               }
+
+               variableSize = wholeFileBuffer.getInt();
+
+               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  continue;
+               }
+
+               if (recordType != DELETE_RECORD_TX)
+               {
+                  userRecordType = wholeFileBuffer.get();
+               }
+
+               record = new byte[variableSize];
+
+               wholeFileBuffer.get(record);
+            }
+
+            // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
+            // currentFile
+            int transactionCheckNumberOfRecords = 0;
+
+            if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
+            {
+               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
+               {
+                  wholeFileBuffer.position(pos + 1);
+                  continue;
+               }
+
+               transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
+
+               if (recordType == PREPARE_RECORD)
+               {
+                  // Add the variable size required for preparedTransactions
+                  preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
+               }
+               variableSize = 0;
+            }
+
+            int recordSize = getRecordSize(recordType);
+
+            // VI - this is completing V, We will validate the size at the end
+            // of the record,
+            // But we avoid buffer overflows by damaged data
+            if (isInvalidSize(journalFileSize, pos, recordSize + variableSize + preparedTransactionExtraDataSize))
+            {
+               // Avoid a buffer overflow caused by damaged data... continue
+               // scanning for more pendingTransactions...
+               trace("Record at position " + pos +
+                     " recordType = " +
+                     recordType +
+                     " file:" +
+                     file.getFile().getFileName() +
+                     " recordSize: " +
+                     recordSize +
+                     " variableSize: " +
+                     variableSize +
+                     " preparedTransactionExtraDataSize: " +
+                     preparedTransactionExtraDataSize +
+                     " is corrupted and it is being ignored (II)");
+               // If a file has damaged pendingTransactions, we make it a dataFile, and the
+               // next reclaiming will fix it
+               reader.markAsDataFile(file);
+               wholeFileBuffer.position(pos + 1);
+
+               continue;
+            }
+
+            int oldPos = wholeFileBuffer.position();
+
+            wholeFileBuffer.position(pos + variableSize +
+                                     recordSize +
+                                     preparedTransactionExtraDataSize -
+                                     DataConstants.SIZE_INT);
+
+            int checkSize = wholeFileBuffer.getInt();
+
+            // VII - The checkSize at the end has to match with the size
+            // informed at the beggining.
+            // This is like testing a hash for the record. (We could replace the
+            // checkSize by some sort of calculated hash)
+            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+            {
+               trace("Record at position " + pos +
+                     " recordType = " +
+                     recordType +
+                     " file:" +
+                     file.getFile().getFileName() +
+                     " is corrupted and it is being ignored (III)");
+
+               // If a file has damaged pendingTransactions, we make it a dataFile, and the
+               // next reclaiming will fix it
+               reader.markAsDataFile(file);
+
+               wholeFileBuffer.position(pos + DataConstants.SIZE_BYTE);
+
+               continue;
+            }
+
+            // This record is from a previous file-usage. The file was
+            // reused and we need to ignore this record
+            if (readFileId != file.getFileID())
+            {
+               // If a file has damaged pendingTransactions, we make it a dataFile, and the
+               // next reclaiming will fix it
+               reader.markAsDataFile(file);
+
+               continue;
+            }
+
+            wholeFileBuffer.position(oldPos);
+
+            // At this point everything is checked. So we relax and just load
+            // the data now.
+
+            switch (recordType)
+            {
+               case ADD_RECORD:
+               {
+                  reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
+                  break;
+               }
+
+               case UPDATE_RECORD:
+               {
+                  reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
+                  break;
+               }
+
+               case DELETE_RECORD:
+               {
+                  reader.onReadDeleteRecord(recordID);
+                  break;
+               }
+
+               case ADD_RECORD_TX:
+               {
+                  reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+                  break;
+               }
+
+               case UPDATE_RECORD_TX:
+               {
+                  reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+                  break;
+               }
+
+               case DELETE_RECORD_TX:
+               {
+                  reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+                  break;
+               }
+
+               case PREPARE_RECORD:
+               {
+
+                  byte extraData[] = new byte[preparedTransactionExtraDataSize];
+
+                  wholeFileBuffer.get(extraData);
+
+                  reader.onReadPrepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
+
+                  break;
+               }
+               case COMMIT_RECORD:
+               {
+
+                  reader.onReadCommitRecord(transactionID, transactionCheckNumberOfRecords);
+                  break;
+               }
+               case ROLLBACK_RECORD:
+               {
+                  reader.onReadRollbackRecord(transactionID);
+                  break;
+               }
+               default:
+               {
+                  throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+                                                  " is corrupt, invalid record type " +
+                                                  recordType);
+               }
+            }
+
+            checkSize = wholeFileBuffer.getInt();
+
+            // This is a sanity check about the loading code itself.
+            // If this checkSize doesn't match, it means the reading method is
+            // not doing what it was supposed to do
+            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
+            {
+               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
+                                               ", pos = " +
+                                               pos);
+            }
+
+            lastDataPos = wholeFileBuffer.position();
+
+         }
+
+         return lastDataPos;
+      }
+
+      finally
+      {
+         if (wholeFileBuffer != null)
+         {
+            fileFactory.releaseBuffer(wholeFileBuffer);
+         }
+
+         try
+         {
+            file.getFile().close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    // Journal implementation
    // ----------------------------------------------------------------
 
@@ -825,6 +1324,11 @@
 
    }
 
+   public int getAlignment() throws Exception
+   {
+      return fileFactory.getAlignment();
+   }
+
    /**
     * @see JournalImpl#load(LoaderCallback)
     */
@@ -836,22 +1340,22 @@
 
       long maxID = load(new LoaderCallback()
       {
-         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+         public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
          {
             preparedTransactions.add(preparedTransaction);
          }
 
-         public void addRecord(RecordInfo info)
+         public void addRecord(final RecordInfo info)
          {
             records.add(info);
          }
 
-         public void updateRecord(RecordInfo info)
+         public void updateRecord(final RecordInfo info)
          {
             records.add(info);
          }
 
-         public void deleteRecord(long id)
+         public void deleteRecord(final long id)
          {
             recordsToDelete.add(id);
          }
@@ -903,10 +1407,7 @@
 
             dataFiles.clear();
 
-            this.compactor = new JournalCompactor(fileFactory,
-                                                  this,
-                                                  this.records.keySet(),
-                                                  dataFilesToProcess.get(0).getFileID());
+            compactor = new JournalCompactor(fileFactory, this, records.keySet(), dataFilesToProcess.get(0).getFileID());
 
             for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
             {
@@ -916,7 +1417,7 @@
 
             // We will calculate the new records during compacting, what will take the position the records will take
             // after compacting
-            this.records.clear();
+            records.clear();
          }
          finally
          {
@@ -961,7 +1462,7 @@
          try
          {
             // Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
-            this.compactor = null;
+            compactor = null;
 
             newDatafiles = localCompactor.getNewDataFiles();
 
@@ -996,7 +1497,7 @@
                {
                   trace("Merging pending transaction " + newTransaction + " after compacting to the journal");
                }
-               JournalTransaction liveTransaction = this.transactions.get(newTransaction.getId());
+               JournalTransaction liveTransaction = transactions.get(newTransaction.getId());
                if (liveTransaction == null)
                {
                   log.warn("Inconsistency: Can't merge transaction " + newTransaction.getId() +
@@ -1038,56 +1539,6 @@
 
    }
 
-   protected void deleteControlFile(SequentialFile controlFile) throws Exception
-   {
-      controlFile.delete();
-   }
-
-   /** being protected as testcases can override this method */
-   protected void renameFiles(List<JournalFile> oldFiles, List<JournalFile> newFiles) throws Exception
-   {
-      for (JournalFile file : oldFiles)
-      {
-         dataFiles.remove(file);
-         freeFiles.add(reinitializeFile(file));
-      }
-
-      for (JournalFile file : newFiles)
-      {
-         String newName = file.getFile().getFileName();
-         newName = newName.substring(0, newName.lastIndexOf(".cmp"));
-         file.getFile().renameTo(newName);
-      }
-
-   }
-
-   /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
-   protected void onCompactDone()
-   {
-   }
-
-   /**
-    * @throws Exception
-    */
-   protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
-   {
-      return JournalCompactor.writeControlFile(fileFactory, files, newFiles);
-   }
-
-   private static boolean isInvalidSize(int fileSize, int bufferPos, int size)
-   {
-      if (size < 0)
-      {
-         return true;
-      }
-      else
-      {
-         final int position = bufferPos + size;
-         return position > fileSize || position < 0;
-
-      }
-   }
-
    /** 
     * <p>Load data accordingly to the record layouts</p>
     * 
@@ -1149,7 +1600,7 @@
          int resultLastPost = readJournalFile(fileFactory, file, new JournalReaderCallback()
          {
 
-            public void onReadAddRecord(RecordInfo info) throws Exception
+            public void onReadAddRecord(final RecordInfo info) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1162,7 +1613,7 @@
                records.put(info.id, new JournalRecord(file, info.data.length + SIZE_ADD_RECORD));
             }
 
-            public void onReadUpdateRecord(RecordInfo info) throws Exception
+            public void onReadUpdateRecord(final RecordInfo info) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1184,7 +1635,7 @@
                }
             }
 
-            public void onReadDeleteRecord(long recordID) throws Exception
+            public void onReadDeleteRecord(final long recordID) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1202,12 +1653,12 @@
                }
             }
 
-            public void onReadUpdateRecordTX(long transactionID, RecordInfo info) throws Exception
+            public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
             {
                onReadAddRecordTX(transactionID, info);
             }
 
-            public void onReadAddRecordTX(long transactionID, RecordInfo info) throws Exception
+            public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
             {
 
                if (trace && LOAD_TRACE)
@@ -1240,7 +1691,7 @@
                tnp.addPositive(file, info.id, info.data.length + SIZE_ADD_RECORD_TX);
             }
 
-            public void onReadDeleteRecordTX(long transactionID, RecordInfo info) throws Exception
+            public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1273,7 +1724,7 @@
 
             }
 
-            public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+            public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1318,7 +1769,7 @@
                }
             }
 
-            public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+            public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1379,7 +1830,7 @@
 
             }
 
-            public void onReadRollbackRecord(long transactionID) throws Exception
+            public void onReadRollbackRecord(final long transactionID) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
@@ -1409,7 +1860,7 @@
                }
             }
 
-            public void markAsDataFile(JournalFile file)
+            public void markAsDataFile(final JournalFile file)
             {
                if (trace && LOAD_TRACE)
                {
@@ -1516,67 +1967,42 @@
       return maxID;
    }
 
-   /**
-    * @return
-    * @throws Exception
-    */
-   private void checkControlFile() throws Exception
+   protected void deleteControlFile(final SequentialFile controlFile) throws Exception
    {
-      ArrayList<String> dataFiles = new ArrayList<String>();
-      ArrayList<String> newFiles = new ArrayList<String>();
+      controlFile.delete();
+   }
 
-      SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles);
-      if (controlFile != null)
+   /** being protected as testcases can override this method */
+   protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
+   {
+      for (JournalFile file : oldFiles)
       {
-         log.info("Journal Compactor was interrupted during renaming phase, renaming files");
-
-         for (String dataFile : dataFiles)
-         {
-            SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
-            log.info("Removing old compacted file" + file.getFileName());
-            if (file.exists())
-            {
-               file.delete();
-            }
-         }
-
-         for (String newFile : newFiles)
-         {
-            SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
-            log.info("Renaming file " + file.getFileName() + " as an part of the data files");
-            if (file.exists())
-            {
-               final String originalName = file.getFileName();
-               final String newName = originalName.substring(0, originalName.lastIndexOf(".cmp"));
-               file.renameTo(newName);
-            }
-         }
-
-         controlFile.delete();
+         dataFiles.remove(file);
+         freeFiles.add(reinitializeFile(file));
       }
 
-      List<String> leftFiles = fileFactory.listFiles(this.getFileExtension() + ".cmp");
-
-      if (leftFiles.size() > 0)
+      for (JournalFile file : newFiles)
       {
-         log.warn("Compacted files were left unnatended on journal directory, deleting invalid files now");
-
-         for (String fileToDelete : leftFiles)
-         {
-            log.warn("Deleting unnatended file " + fileToDelete);
-            SequentialFile file = fileFactory.createSequentialFile(fileToDelete, 1);
-            file.delete();
-         }
+         String newName = file.getFile().getFileName();
+         newName = newName.substring(0, newName.lastIndexOf(".cmp"));
+         file.getFile().renameTo(newName);
       }
 
-      return;
    }
 
-   public int getAlignment() throws Exception
+   /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
+   protected void onCompactDone()
    {
-      return fileFactory.getAlignment();
    }
 
+   /**
+    * @throws Exception
+    */
+   protected SequentialFile createControlFile(final List<JournalFile> files, final List<JournalFile> newFiles) throws Exception
+   {
+      return JournalCompactor.writeControlFile(fileFactory, files, newFiles);
+   }
+
    // TestableJournal implementation
    // --------------------------------------------------------------
 
@@ -1744,7 +2170,7 @@
                   totalLiveSize +
                   ", margin to start compacting = " +
                   compactMargin);
-         
+
          compactorWait.up();
 
          // We can't use the executor for the compacting... or we would lock files opening and creation (besides other
@@ -1885,7 +2311,7 @@
          {
             log.warn("Couldn't stop journal executor after 60 seconds");
          }
-         
+
          while (!compactorWait.waitCompletion(60000))
          {
             log.warn("Waiting the compactor to finish its operations");
@@ -1965,329 +2391,6 @@
       return jf;
    }
 
-   public static int readJournalFile(SequentialFileFactory fileFactory, JournalFile file, JournalReaderCallback reader) throws Exception
-   {
-
-      file.getFile().open(1);
-      ByteBuffer wholeFileBuffer = null;
-      try
-      {
-
-         wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
-
-         final int journalFileSize = file.getFile().read(wholeFileBuffer);
-
-         if (journalFileSize != file.getFile().size())
-         {
-            throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
-         }
-
-         wholeFileBuffer.position(0);
-
-         // First long is the ordering timestamp, we just jump its position
-         wholeFileBuffer.position(SIZE_HEADER);
-
-         int lastDataPos = SIZE_HEADER;
-
-         while (wholeFileBuffer.hasRemaining())
-         {
-            final int pos = wholeFileBuffer.position();
-
-            byte recordType = wholeFileBuffer.get();
-
-            if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
-            {
-               // I - We scan for any valid record on the file. If a hole
-               // happened on the middle of the file we keep looking until all
-               // the possibilities are gone
-               continue;
-            }
-
-            if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
-            {
-               reader.markAsDataFile(file);
-
-               wholeFileBuffer.position(pos + 1);
-               // II - Ignore this record, lets keep looking
-               continue;
-            }
-
-            // III - Every record has the file-id.
-            // This is what supports us from not re-filling the whole file
-            int readFileId = wholeFileBuffer.getInt();
-
-            long transactionID = 0;
-
-            if (isTransaction(recordType))
-            {
-               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
-               {
-                  wholeFileBuffer.position(pos + 1);
-                  reader.markAsDataFile(file);
-                  continue;
-               }
-
-               transactionID = wholeFileBuffer.getLong();
-            }
-
-            long recordID = 0;
-
-            // If prepare or commit
-            if (!isCompleteTransaction(recordType))
-            {
-               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_LONG))
-               {
-                  wholeFileBuffer.position(pos + 1);
-                  reader.markAsDataFile(file);
-                  continue;
-               }
-
-               recordID = wholeFileBuffer.getLong();
-            }
-
-            // We use the size of the record to validate the health of the
-            // record.
-            // (V) We verify the size of the record
-
-            // The variable record portion used on Updates and Appends
-            int variableSize = 0;
-
-            // Used to hold extra data on transaction prepares
-            int preparedTransactionExtraDataSize = 0;
-
-            byte userRecordType = 0;
-
-            byte record[] = null;
-
-            if (isContainsBody(recordType))
-            {
-               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
-               {
-                  wholeFileBuffer.position(pos + 1);
-                  reader.markAsDataFile(file);
-                  continue;
-               }
-
-               variableSize = wholeFileBuffer.getInt();
-
-               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), variableSize))
-               {
-                  wholeFileBuffer.position(pos + 1);
-                  continue;
-               }
-
-               if (recordType != DELETE_RECORD_TX)
-               {
-                  userRecordType = wholeFileBuffer.get();
-               }
-
-               record = new byte[variableSize];
-
-               wholeFileBuffer.get(record);
-            }
-
-            // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
-            // currentFile
-            int transactionCheckNumberOfRecords = 0;
-
-            if (recordType == PREPARE_RECORD || recordType == COMMIT_RECORD)
-            {
-               if (isInvalidSize(journalFileSize, wholeFileBuffer.position(), DataConstants.SIZE_INT))
-               {
-                  wholeFileBuffer.position(pos + 1);
-                  continue;
-               }
-
-               transactionCheckNumberOfRecords = wholeFileBuffer.getInt();
-
-               if (recordType == PREPARE_RECORD)
-               {
-                  // Add the variable size required for preparedTransactions
-                  preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
-               }
-               variableSize = 0;
-            }
-
-            int recordSize = getRecordSize(recordType);
-
-            // VI - this is completing V, We will validate the size at the end
-            // of the record,
-            // But we avoid buffer overflows by damaged data
-            if (isInvalidSize(journalFileSize, pos, recordSize + variableSize + preparedTransactionExtraDataSize))
-            {
-               // Avoid a buffer overflow caused by damaged data... continue
-               // scanning for more pendingTransactions...
-               trace("Record at position " + pos +
-                     " recordType = " +
-                     recordType +
-                     " file:" +
-                     file.getFile().getFileName() +
-                     " recordSize: " +
-                     recordSize +
-                     " variableSize: " +
-                     variableSize +
-                     " preparedTransactionExtraDataSize: " +
-                     preparedTransactionExtraDataSize +
-                     " is corrupted and it is being ignored (II)");
-               // If a file has damaged pendingTransactions, we make it a dataFile, and the
-               // next reclaiming will fix it
-               reader.markAsDataFile(file);
-               wholeFileBuffer.position(pos + 1);
-
-               continue;
-            }
-
-            int oldPos = wholeFileBuffer.position();
-
-            wholeFileBuffer.position(pos + variableSize +
-                                     recordSize +
-                                     preparedTransactionExtraDataSize -
-                                     DataConstants.SIZE_INT);
-
-            int checkSize = wholeFileBuffer.getInt();
-
-            // VII - The checkSize at the end has to match with the size
-            // informed at the beggining.
-            // This is like testing a hash for the record. (We could replace the
-            // checkSize by some sort of calculated hash)
-            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
-            {
-               trace("Record at position " + pos +
-                     " recordType = " +
-                     recordType +
-                     " file:" +
-                     file.getFile().getFileName() +
-                     " is corrupted and it is being ignored (III)");
-
-               // If a file has damaged pendingTransactions, we make it a dataFile, and the
-               // next reclaiming will fix it
-               reader.markAsDataFile(file);
-
-               wholeFileBuffer.position(pos + DataConstants.SIZE_BYTE);
-
-               continue;
-            }
-
-            // This record is from a previous file-usage. The file was
-            // reused and we need to ignore this record
-            if (readFileId != file.getFileID())
-            {
-               // If a file has damaged pendingTransactions, we make it a dataFile, and the
-               // next reclaiming will fix it
-               reader.markAsDataFile(file);
-
-               continue;
-            }
-
-            wholeFileBuffer.position(oldPos);
-
-            // At this point everything is checked. So we relax and just load
-            // the data now.
-
-            switch (recordType)
-            {
-               case ADD_RECORD:
-               {
-                  reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
-                  break;
-               }
-
-               case UPDATE_RECORD:
-               {
-                  reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
-                  break;
-               }
-
-               case DELETE_RECORD:
-               {
-                  reader.onReadDeleteRecord(recordID);
-                  break;
-               }
-
-               case ADD_RECORD_TX:
-               {
-                  reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
-                  break;
-               }
-
-               case UPDATE_RECORD_TX:
-               {
-                  reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
-                  break;
-               }
-
-               case DELETE_RECORD_TX:
-               {
-                  reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
-                  break;
-               }
-
-               case PREPARE_RECORD:
-               {
-
-                  byte extraData[] = new byte[preparedTransactionExtraDataSize];
-
-                  wholeFileBuffer.get(extraData);
-
-                  reader.onReadPrepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
-
-                  break;
-               }
-               case COMMIT_RECORD:
-               {
-
-                  reader.onReadCommitRecord(transactionID, transactionCheckNumberOfRecords);
-                  break;
-               }
-               case ROLLBACK_RECORD:
-               {
-                  reader.onReadRollbackRecord(transactionID);
-                  break;
-               }
-               default:
-               {
-                  throw new IllegalStateException("Journal " + file.getFile().getFileName() +
-                                                  " is corrupt, invalid record type " +
-                                                  recordType);
-               }
-            }
-
-            checkSize = wholeFileBuffer.getInt();
-
-            // This is a sanity check about the loading code itself.
-            // If this checkSize doesn't match, it means the reading method is
-            // not doing what it was supposed to do
-            if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
-            {
-               throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
-                                               ", pos = " +
-                                               pos);
-            }
-
-            lastDataPos = wholeFileBuffer.position();
-
-         }
-
-         return lastDataPos;
-      }
-
-      finally
-      {
-         if (wholeFileBuffer != null)
-         {
-            fileFactory.releaseBuffer(wholeFileBuffer);
-         }
-
-         try
-         {
-            file.getFile().close();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-   }
-
    /**
     * <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
     * <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
@@ -2307,180 +2410,6 @@
       return journalTransaction.getCounter(currentFile) == numberOfRecords;
    }
 
-   /**
-    * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
-    * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file. 
-    *    (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
-    * <p>The element-summary will then have</p>
-    * <p>FileID1, 10</p>
-    * <p>FileID2, 10</p>
-    * <p>FileID3, 10</p>
-    * 
-    * <br>
-    * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
-    * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
-    * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed. 
-    *     That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
-    * 
-    * @param recordType
-    * @param txID
-    * @param tx
-    * @param transactionData
-    * @return
-    * @throws Exception
-    */
-   public static void writeTransaction(final int fileID,
-                                       final byte recordType,
-                                       final long txID,
-                                       final JournalTransaction tx,
-                                       final EncodingSupport transactionData,
-                                       final int size,
-                                       final int numberOfRecords,
-                                       final ChannelBuffer bb) throws Exception
-   {
-      bb.writeByte(recordType);
-      bb.writeInt(fileID); // skip ID part
-      bb.writeLong(txID);
-      bb.writeInt(numberOfRecords); // skip number of pendingTransactions part
-
-      if (transactionData != null)
-      {
-         bb.writeInt(transactionData.getEncodeSize());
-      }
-
-      if (transactionData != null)
-      {
-         transactionData.encode(bb);
-      }
-
-      bb.writeInt(size);
-   }
-
-   /**
-    * @param txID
-    * @param id
-    * @param recordType
-    * @param record
-    * @param size
-    * @param bb
-    */
-   public static void writeUpdateRecordTX(final int fileID,
-                                          final long txID,
-                                          final long id,
-                                          final byte recordType,
-                                          final EncodingSupport record,
-                                          int size,
-                                          ChannelBuffer bb)
-   {
-      bb.writeByte(UPDATE_RECORD_TX);
-      bb.writeInt(fileID);
-      bb.writeLong(txID);
-      bb.writeLong(id);
-      bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
-   }
-
-   /**
-    * @param id
-    * @param recordType
-    * @param record
-    * @param size
-    * @param bb
-    */
-   public static void writeUpdateRecord(final int fileId,
-                                        final long id,
-                                        final byte recordType,
-                                        final EncodingSupport record,
-                                        int size,
-                                        ChannelBuffer bb)
-   {
-      bb.writeByte(UPDATE_RECORD);
-      bb.writeInt(fileId);
-      bb.writeLong(id);
-      bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
-   }
-
-   /**
-    * @param id
-    * @param recordType
-    * @param record
-    * @param size
-    * @param bb
-    */
-   public static void writeAddRecord(final int fileId,
-                                     final long id,
-                                     final byte recordType,
-                                     final EncodingSupport record,
-                                     int size,
-                                     ChannelBuffer bb)
-   {
-      bb.writeByte(ADD_RECORD);
-      bb.writeInt(fileId);
-      bb.writeLong(id);
-      bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
-   }
-
-   /**
-    * @param txID
-    * @param id
-    * @param record
-    * @param size
-    * @param bb
-    */
-   public static void writeDeleteRecordTransactional(final int fileID,
-                                                     final long txID,
-                                                     final long id,
-                                                     final EncodingSupport record,
-                                                     int size,
-                                                     ChannelBuffer bb)
-   {
-      bb.writeByte(DELETE_RECORD_TX);
-      bb.writeInt(fileID);
-      bb.writeLong(txID);
-      bb.writeLong(id);
-      bb.writeInt(record != null ? record.getEncodeSize() : 0);
-      if (record != null)
-      {
-         record.encode(bb);
-      }
-      bb.writeInt(size);
-   }
-
-   /**
-    * @param txID
-    * @param id
-    * @param recordType
-    * @param record
-    * @param recordLength
-    * @param size
-    * @param bb
-    */
-   public static void writeAddRecordTX(final int fileID,
-                                       final long txID,
-                                       final long id,
-                                       final byte recordType,
-                                       final EncodingSupport record,
-                                       int size,
-                                       ChannelBuffer bb)
-   {
-      bb.writeByte(ADD_RECORD_TX);
-      bb.writeInt(fileID);
-      bb.writeLong(txID);
-      bb.writeLong(id);
-      bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
-   }
-
    private static boolean isTransaction(final byte recordType)
    {
       return recordType == ADD_RECORD_TX || recordType == UPDATE_RECORD_TX ||
@@ -2860,7 +2789,7 @@
     * @return
     * @throws Exception
     */
-   JournalFile getFile(boolean keepOpened, boolean multiAIO, boolean fill) throws Exception
+   JournalFile getFile(final boolean keepOpened, final boolean multiAIO, final boolean fill) throws Exception
    {
       JournalFile nextOpenedFile = null;
       try
@@ -2925,7 +2854,7 @@
       return tx;
    }
 
-   private IOCallback getSyncCallback(boolean sync)
+   private IOCallback getSyncCallback(final boolean sync)
    {
       if (fileFactory.isSupportsCallbacks())
       {
@@ -2944,8 +2873,78 @@
       }
    }
 
-   public ChannelBuffer newBuffer(final int size)
+   /**
+    * @return
+    * @throws Exception
+    */
+   private void checkControlFile() throws Exception
    {
+      ArrayList<String> dataFiles = new ArrayList<String>();
+      ArrayList<String> newFiles = new ArrayList<String>();
+
+      SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles);
+      if (controlFile != null)
+      {
+         log.info("Journal Compactor was interrupted during renaming phase, renaming files");
+
+         for (String dataFile : dataFiles)
+         {
+            SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
+            log.info("Removing old compacted file" + file.getFileName());
+            if (file.exists())
+            {
+               file.delete();
+            }
+         }
+
+         for (String newFile : newFiles)
+         {
+            SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
+            log.info("Renaming file " + file.getFileName() + " as an part of the data files");
+            if (file.exists())
+            {
+               final String originalName = file.getFileName();
+               final String newName = originalName.substring(0, originalName.lastIndexOf(".cmp"));
+               file.renameTo(newName);
+            }
+         }
+
+         controlFile.delete();
+      }
+
+      List<String> leftFiles = fileFactory.listFiles(getFileExtension() + ".cmp");
+
+      if (leftFiles.size() > 0)
+      {
+         log.warn("Compacted files were left unnatended on journal directory, deleting invalid files now");
+
+         for (String fileToDelete : leftFiles)
+         {
+            log.warn("Deleting unnatended file " + fileToDelete);
+            SequentialFile file = fileFactory.createSequentialFile(fileToDelete, 1);
+            file.delete();
+         }
+      }
+
+      return;
+   }
+
+   private static boolean isInvalidSize(final int fileSize, final int bufferPos, final int size)
+   {
+      if (size < 0)
+      {
+         return true;
+      }
+      else
+      {
+         final int position = bufferPos + size;
+         return position > fileSize || position < 0;
+
+      }
+   }
+
+   private ChannelBuffer newBuffer(final int size)
+   {
       return ChannelBuffers.buffer(size);
    }
 
@@ -3031,11 +3030,11 @@
 
       static NullEncoding instance = new NullEncoding();
 
-      public void decode(MessagingBuffer buffer)
+      public void decode(final MessagingBuffer buffer)
       {
       }
 
-      public void encode(MessagingBuffer buffer)
+      public void encode(final MessagingBuffer buffer)
       {
       }
 




More information about the jboss-cvs-commits mailing list