[jboss-cvs] JBoss Messaging SVN: r7510 - in branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal: impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 1 14:37:58 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-07-01 14:37:58 -0400 (Wed, 01 Jul 2009)
New Revision: 7510

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
compacting calculations

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java	2009-07-01 16:12:40 UTC (rev 7509)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java	2009-07-01 18:37:58 UTC (rev 7510)
@@ -24,6 +24,7 @@
 
 import java.util.List;
 
+import org.jboss.messaging.core.journal.impl.JournalFile;
 import org.jboss.messaging.core.server.MessagingComponent;
 
 /**
@@ -41,7 +42,7 @@
    void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
    void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
-   
+
    void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
    void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
@@ -81,20 +82,29 @@
 
    void appendRollbackRecord(long txID, boolean sync) throws Exception;
 
+   // Load
 
+   long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
+
+   int getAlignment() throws Exception;
+
+   void perfBlast(int pages) throws Exception;
+
+   /** This method is called automatically when a new file is opened  */
+   void checkAndReclaimFiles() throws Exception;
+
+   /** This method check for the need of compacting based on the minCompactPercentage 
+    * This method is usually called automatically when new files are opened
+   */
+   void checkCompact() throws Exception;
+
    /**
-    * Eliminate deleted records of the journal
+    * Eliminate deleted records of the journal.
     * @throws Exception 
     */
    void compact() throws Exception;
    
-
-   // Load
-
-   long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
-
-   int getAlignment() throws Exception;
    
-   void perfBlast(int pages) throws Exception;
+   JournalFile[] getDataFiles();
 
 }

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2009-07-01 16:12:40 UTC (rev 7509)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2009-07-01 18:37:58 UTC (rev 7510)
@@ -34,11 +34,7 @@
  */
 public interface TestableJournal extends Journal
 {
-   void checkAndReclaimFiles() throws Exception;
-
    int getDataFilesCount();
-   
-   JournalFile[] getDataFiles();
 
    int getFreeFilesCount();
 

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-07-01 16:12:40 UTC (rev 7509)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-07-01 18:37:58 UTC (rev 7510)
@@ -82,8 +82,8 @@
    private final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
 
    /** Commands that happened during compacting
-    *  We can't take any counts during compactation, as we won't know in what files the records are taking place, so
-    *  we cache those updates during compacting. As soon as we are done we take the right account. */
+    *  We can't process any counts during compacting, as we won't know in what files the records are taking place, so
+    *  we cache those updates. As soon as we are done we take the right account. */
    private final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
 
    /**

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-07-01 16:12:40 UTC (rev 7509)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-07-01 18:37:58 UTC (rev 7510)
@@ -88,6 +88,9 @@
    private static final int STATE_STARTED = 1;
 
    private static final int STATE_LOADED = 2;
+   
+   
+   private static final float COMPACT_MARGIN = 0.3f;
 
    // Static --------------------------------------------------------
 
@@ -844,7 +847,10 @@
       return maxID;
    }
 
-   // Note: This method can't be called from the executor, as it will invoke other methods depending on it.
+   /**
+    * 
+    *  Note: This method can't be called from the main executor, as it will invoke other methods depending on it.
+    */
    public void compact() throws Exception
    {
       if (compactor != null)
@@ -958,6 +964,7 @@
             localCompactor.replayPendingCommands();
 
             // Merge transactions back after compacting
+            // This has to be done after the replay pending commands, as we need to delete committs that happened during the compacting
 
             for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
             {
@@ -1694,13 +1701,38 @@
          compactingLock.readLock().unlock();
       }
    }
+   
+   
+   public void checkCompact() throws Exception
+   {
+      JournalFile[] dataFiles = getDataFiles();
+      
+      long totalSize = 0;
+      
+      for (JournalFile file : dataFiles)
+      {
+         totalSize += file.getLiveSize();
+      }
+      
+      long totalBytes = (long)dataFiles.length * (long)fileSize;
+      
+      long compactMargin = (long)(totalBytes * COMPACT_MARGIN);
 
+      if (totalBytes < compactMargin)
+      {
+         System.out.println("IT SHOULD COMPACT NOW!!!!!!!!!!!!!!!!!!!!"); // deleteme
+      }
+      else
+      {
+         System.out.println("Total bytes = " + totalBytes + " > compactMarging=" + compactMargin); // deleteme
+      }
+   }
+
    public int getDataFilesCount()
    {
       return dataFiles.size();
    }
    
-   /** Test only */
    public JournalFile[] getDataFiles()
    {
       return (JournalFile[]) dataFiles.toArray(new JournalFile[dataFiles.size()]);
@@ -1844,9 +1876,7 @@
 
    private void checkReclaimStatus() throws Exception
    {
-      JournalFile[] files = new JournalFile[dataFiles.size()];
-
-      reclaimer.scan(dataFiles.toArray(files));
+      reclaimer.scan(getDataFiles());
    }
 
    // Discard the old JournalFile and set it with a new ID
@@ -2725,6 +2755,20 @@
                }
             }
          });
+         filesExecutor.execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  checkCompact();
+               }
+               catch (Exception e)
+               {
+                  log.error(e.getMessage(), e);
+               }
+            }
+         });
       }
 
       JournalFile nextFile = null;




More information about the jboss-cvs-commits mailing list