[hornetq-commits] JBoss hornetq SVN: r9471 - in trunk: tests/src/org/hornetq/tests/soak/journal and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jul 27 15:14:33 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-07-27 15:14:32 -0400 (Tue, 27 Jul 2010)
New Revision: 9471

Modified:
   trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java
   trunk/tests/src/org/hornetq/tests/util/ListJournal.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - Making sure freed files during compacting won't go out of order + adding a few methods to help debug the journal.

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java	2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java	2010-07-27 19:14:32 UTC (rev 9471)
@@ -161,7 +161,7 @@
    {
       try
       {
-         return "JournalFileImpl: " + file.getFileName();
+         return "JournalFileImpl: (" + file.getFileName() + " id = " + this.fileID + ", recordID = " + recordID + ")";
       }
       catch (Exception e)
       {

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-07-27 19:14:32 UTC (rev 9471)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.journal.impl;
 
+import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -94,7 +95,7 @@
 
    private static final Logger log = Logger.getLogger(JournalImpl.class);
 
-   private static final boolean trace = JournalImpl.log.isTraceEnabled();
+   private static final boolean trace = log.isTraceEnabled();
 
    /** This is to be set to true at DEBUG & development only */
    private static final boolean LOAD_TRACE = false;
@@ -104,7 +105,7 @@
    // Journal
    private static final void trace(final String message)
    {
-      JournalImpl.log.trace(message);
+      log.trace(message);
    }
 
    // The sizes of primitive types
@@ -385,6 +386,142 @@
       return compactor;
    }
 
+
+   
+   /** this method is used internally only however tools may use it to maintenance. 
+    *  It won't be part of the interface as the tools should be specific to the implementation */
+   public List<JournalFile> orderFiles() throws Exception
+   {
+      List<String> fileNames = fileFactory.listFiles(fileExtension);
+
+      List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
+
+      for (String fileName : fileNames)
+      {
+         SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
+
+         file.open(1, false);
+         
+         try
+         {
+            long fileID = readFileHeader(file);
+   
+            orderedFiles.add(new JournalFileImpl(file, fileID));
+         }
+         finally
+         {
+            file.close();
+         }
+      }
+
+      // Now order them by ordering id - we can't use the file name for ordering
+      // since we can re-use dataFiles
+
+      Collections.sort(orderedFiles, new JournalFileComparator());
+
+      return orderedFiles;
+   }
+   
+   private void calculateNextfileID(List<JournalFile> files)
+   {
+      
+      for (JournalFile file : files)
+      {
+         long fileID = file.getFileID();
+         if (nextFileID.get() < fileID)
+         {
+            nextFileID.set(fileID);
+         }
+   
+         long fileNameID = getFileNameID(file.getFile().getFileName());
+   
+         // The compactor could create a fileName but use a previously assigned ID.
+         // Because of that we need to take both parts into account
+         if (nextFileID.get() < fileNameID)
+         {
+            nextFileID.set(fileNameID);
+         }
+      }
+
+
+   }
+
+   
+   /**
+    * @param fileFactory
+    * @param journal
+    * @throws Exception
+    */
+   public static void listJournalFiles(final PrintStream out, final JournalImpl journal) throws Exception
+   {
+      List<JournalFile> files = journal.orderFiles();
+      
+      SequentialFileFactory fileFactory = journal.fileFactory;
+
+      for (JournalFile file : files)
+      {
+         out.println("####### listing file " + file.getFile().getFileName() +
+                            "  sequence = " +
+                            file.getFileID());
+
+         JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+         {
+
+            public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+            {
+               out.println("ReadUpdateTX, txID=" + transactionID + ", " + recordInfo);
+            }
+
+            public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+            {
+               out.println("ReadUpdate  " + recordInfo);
+            }
+
+            public void onReadRollbackRecord(long transactionID) throws Exception
+            {
+               out.println("Rollback txID=" + transactionID);
+            }
+
+            public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+            {
+               out.println("Prepare txID=" + transactionID);
+            }
+
+            public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+            {
+               out.println("DeleteRecordTX txID=" + transactionID + ", " + recordInfo);
+            }
+
+            public void onReadDeleteRecord(long recordID) throws Exception
+            {
+               out.println("DeleteRecord id=" + recordID);
+            }
+
+            public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+            {
+               out.println("CommitRecord txID=" + transactionID);
+            }
+
+            public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+            {
+               out.println("AddRecordTX, txID=" + transactionID + ", " + recordInfo);
+            }
+
+            public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+            {
+               out.println("AddRecord " + recordInfo);
+            }
+
+            public void markAsDataFile(JournalFile file)
+            {
+            }
+         });
+      }
+   }
+
+
+
+   /** this method is used internally only however tools may use it to maintenance.  */
    public static int readJournalFile(final SequentialFileFactory fileFactory,
                                      final JournalFile file,
                                      final JournalReaderCallback reader) throws Exception
@@ -703,7 +840,11 @@
 
          return lastDataPos;
       }
-
+      catch (Throwable e)
+      {
+         log.warn(e.getMessage(), e);
+         throw new Exception (e.getMessage(), e);
+      }
       finally
       {
          if (wholeFileBuffer != null)
@@ -1476,7 +1617,10 @@
 
       try
       {
-         JournalImpl.trace("Starting compacting operation on journal");
+         if (trace)
+         {
+            JournalImpl.trace("Starting compacting operation on journal");
+         }
 
          // We need to guarantee that the journal is frozen for this short time
          // We don't freeze the journal as we compact, only for the short time where we replace records
@@ -1579,7 +1723,10 @@
                dataFiles.addFirst(fileToAdd);
             }
 
-            JournalImpl.trace("There are " + dataFiles.size() + " datafiles Now");
+            if (trace)
+            {
+               JournalImpl.trace("There are " + dataFiles.size() + " datafiles Now");
+            }
 
             // Replay pending commands (including updates, deletes and commits)
 
@@ -1616,7 +1763,10 @@
          renameFiles(dataFilesToProcess, newDatafiles);
          deleteControlFile(controlFile);
 
-         JournalImpl.trace("Finished compacting on journal");
+         if (trace)
+         {
+            JournalImpl.trace("Finished compacting on journal");
+         }
 
       }
       finally
@@ -1698,6 +1848,8 @@
       final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
 
       final List<JournalFile> orderedFiles = orderFiles();
+      
+      calculateNextfileID(orderedFiles);
 
       int lastDataPos = JournalImpl.SIZE_HEADER;
 
@@ -2646,10 +2798,26 @@
    /** being protected as testcases can override this method */
    protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
    {
-      for (JournalFile file : oldFiles)
+      
+      // addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong order
+      filesExecutor.execute(new Runnable()
       {
-         addFreeFile(file);
-      }
+         public void run()
+         {
+            for (JournalFile file : oldFiles)
+            {
+               try
+               {
+                  addFreeFile(file);
+               }
+               catch (Exception e)
+               {
+                  log.warn("Error reinitializing file "  + file, e);
+               }
+            }
+         }
+      });
+      
 
       for (JournalFile file : newFiles)
       {
@@ -2795,52 +2963,6 @@
       return recordSize;
    }
 
-   private List<JournalFile> orderFiles() throws Exception
-   {
-      List<String> fileNames = fileFactory.listFiles(fileExtension);
-
-      List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
-
-      for (String fileName : fileNames)
-      {
-         SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
-
-         file.open(1, false);
-         
-         try
-         {
-            long fileID = readFileHeader(file);
-   
-            if (nextFileID.get() < fileID)
-            {
-               nextFileID.set(fileID);
-            }
-   
-            long fileNameID = getFileNameID(fileName);
-   
-            // The compactor could create a fileName but use a previously assigned ID.
-            // Because of that we need to take both parts into account
-            if (nextFileID.get() < fileNameID)
-            {
-               nextFileID.set(fileNameID);
-            }
-   
-            orderedFiles.add(new JournalFileImpl(file, fileID));
-         }
-         finally
-         {
-            file.close();
-         }
-      }
-
-      // Now order them by ordering id - we can't use the file name for ordering
-      // since we can re-use dataFiles
-
-      Collections.sort(orderedFiles, new JournalFileComparator());
-
-      return orderedFiles;
-   }
-
    /**
     * @param file
     * @return
@@ -3117,7 +3239,7 @@
 
       if (JournalImpl.trace)
       {
-         JournalImpl.trace("moveNextFile: " + currentFile.getFile().getFileName() + " sync: " + synchronous);
+         JournalImpl.trace("moveNextFile: " + currentFile + " sync: " + synchronous);
       }
 
       fileFactory.activateBuffer(currentFile.getFile());
@@ -3175,6 +3297,11 @@
                                  new Exception("Warning: Couldn't open a file in 60 Seconds"));
          }
       }
+      
+      if (trace)
+      {
+         JournalImpl.trace("Returning file " + nextFile);
+      }
 
       return nextFile;
    }
@@ -3212,6 +3339,11 @@
    private void pushOpenedFile() throws Exception
    {
       JournalFile nextOpenedFile = getFile(true, true, true, false);
+      
+      if (trace)
+      {
+         JournalImpl.trace("pushing openFile " + nextOpenedFile);
+      }
 
       openedFiles.offer(nextOpenedFile);
    }

Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2010-07-27 19:14:32 UTC (rev 9471)
@@ -320,6 +320,7 @@
 
             // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
             // Using bufferToFlush.put(buffer) would make several append calls for each byte
+            // We also transfer the content of this buffer to the native file's buffer
 
             bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
 
@@ -348,6 +349,7 @@
 
             pendingSync = false;
 
+            // swap the instance as the previous callback list is being used asynchronously
             callbacks = new LinkedList<IOAsyncTask>();
 
             buffer.clear();

Modified: trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java	2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java	2010-07-27 19:14:32 UTC (rev 9471)
@@ -118,7 +118,7 @@
          updaters[i].start();
       }
 
-      Thread.sleep(3600000000l);
+      Thread.sleep(TimeUnit.HOURS.toMillis(24));
 
       running = false;
 

Modified: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java	2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java	2010-07-27 19:14:32 UTC (rev 9471)
@@ -13,19 +13,21 @@
 
 package org.hornetq.tests.util;
 
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 import java.util.ArrayList;
 
 import org.hornetq.core.config.impl.FileConfiguration;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 
 /**
  * Lists the journal content for debug purposes.
  * 
  * This is just a class useful on debug during development,
- * listing journal contents (As we don't have access to SQL on Journal :-) ).
  *
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  * 
@@ -52,15 +54,19 @@
       {
          FileConfiguration fileConf = new FileConfiguration();
 
+         fileConf.setJournalDirectory("/work/projects/trunk/journal");
+
          // fileConf.setConfigurationUrl(arg[0]);
 
          fileConf.start();
 
+         SequentialFileFactory fileFactory = new AIOSequentialFileFactory(fileConf.getJournalDirectory());
+
          JournalImpl journal = new JournalImpl(fileConf.getJournalFileSize(),
-                                               fileConf.getJournalMinFiles(),
+                                               10,
                                                0,
                                                0,
-                                               new NIOSequentialFileFactory(fileConf.getJournalDirectory()),
+                                               fileFactory,
                                                "hornetq-data",
                                                "hq",
                                                fileConf.getJournalMaxIO_NIO());
@@ -69,18 +75,31 @@
          ArrayList<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
 
          journal.start();
+
+
+         PrintStream out = new PrintStream(new FileOutputStream("/tmp/file.out"));
+
+         out.println("######### Journal records per file");
+         
+         JournalImpl.listJournalFiles(out, journal);
+
          journal.load(records, prepared, null);
+         
+         out.println();
+         
+         out.println("##########################################");
+         out.println("#  T O T A L   L I S T                   #");
 
          if (prepared.size() > 0)
          {
-            System.out.println("There are " + prepared.size() + " prepared transactions on the journal");
+            out.println("There are " + prepared.size() + " prepared transactions on the journal");
          }
 
-         System.out.println("Total of " + records.size() + " committed records");
+         out.println("Total of " + records.size() + " committed records");
 
          for (RecordInfo record : records)
          {
-            System.out.println("user record: " + record);
+            out.println("user record: " + record);
          }
 
          journal.checkReclaimStatus();
@@ -89,6 +108,9 @@
 
          journal.stop();
 
+         journal.stop();
+         
+         out.close();
       }
       catch (Exception e)
       {



More information about the hornetq-commits mailing list