[jboss-cvs] JBoss Messaging SVN: r7507 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 1 01:44:19 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-07-01 01:44:19 -0400 (Wed, 01 Jul 2009)
New Revision: 7507

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
Computing live size of records (1st commit)

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java	2009-07-01 05:44:19 UTC (rev 7507)
@@ -305,10 +305,10 @@
     * @param id
     * @param usedFile
     */
-   public void addCommandUpdate(final long id, final JournalFile usedFile)
+   public void addCommandUpdate(final long id, final JournalFile usedFile, final int size)
    {
       log.info("Adding update command");
-      pendingCommands.add(new UpdateCompactCommand(id, usedFile));
+      pendingCommands.add(new UpdateCompactCommand(id, usedFile, size));
    }
 
    public boolean lookupRecord(final long id)
@@ -383,7 +383,7 @@
                                     size,
                                     writingChannel);
 
-         newRecords.put(info.id, new JournalRecord(currentFile));
+         newRecords.put(info.id, new JournalRecord(currentFile, size));
       }
    }
 
@@ -397,7 +397,7 @@
 
          checkSize(size);
 
-         newTransaction.addPositive(currentFile, info.id);
+         newTransaction.addPositive(currentFile, info.id, size);
 
          JournalImpl.writeAddRecordTX(fileID,
                                       transactionID,
@@ -512,7 +512,7 @@
          }
          else
          {
-            newRecord.addUpdateFile(currentFile);
+            newRecord.addUpdateFile(currentFile, size);
          }
 
          JournalImpl.writeUpdateRecord(fileID,
@@ -543,7 +543,7 @@
                                          size,
                                          writingChannel);
 
-         newTransaction.addPositive(currentFile, info.id);
+         newTransaction.addPositive(currentFile, info.id, size);
       }
       else
       {
@@ -626,21 +626,24 @@
 
    private class UpdateCompactCommand extends CompactCommand
    {
-      long id;
+      private long id;
 
-      JournalFile usedFile;
+      private JournalFile usedFile;
+      
+      private final int size;
 
-      public UpdateCompactCommand(final long id, final JournalFile usedFile)
+      public UpdateCompactCommand(final long id, final JournalFile usedFile, final int size)
       {
          this.id = id;
          this.usedFile = usedFile;
+         this.size = size;
       }
 
       @Override
       void execute() throws Exception
       {
          JournalRecord updateRecord = journal.getRecords().get(id);
-         updateRecord.addUpdateFile(usedFile);
+         updateRecord.addUpdateFile(usedFile, size);
       }
    }
 

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-07-01 05:44:19 UTC (rev 7507)
@@ -49,6 +49,12 @@
 
    void decPosCount();
    
+   void addSize(int bytes);
+   
+   void decSize(int bytes);
+   
+   int getLiveSize();
+   
    void setCanReclaim(boolean canDelete);
 
    boolean isCanReclaim();

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2009-07-01 05:44:19 UTC (rev 7507)
@@ -51,6 +51,8 @@
    private long offset;
    
    private final AtomicInteger posCount = new AtomicInteger(0);
+   
+   private final AtomicInteger liveBytes = new AtomicInteger(0);
 
    private boolean canReclaim;
 
@@ -69,6 +71,7 @@
    {
       negCounts.clear();
       posCount.set(0);
+      liveBytes.set(0);
    }
 
    public int getPosCount()
@@ -185,5 +188,29 @@
       return count;
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.impl.JournalFile#addSize(int)
+    */
+   public void addSize(int bytes)
+   {
+      liveBytes.addAndGet(bytes);
+   }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.impl.JournalFile#decSize(int)
+    */
+   public void decSize(int bytes)
+   {
+      liveBytes.addAndGet(-bytes);
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.impl.JournalFile#getSize()
+    */
+   public int getLiveSize()
+   {
+      return liveBytes.get();
+   }
+
+
 }

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-07-01 05:44:19 UTC (rev 7507)
@@ -62,6 +62,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
 
 /**
@@ -322,7 +323,7 @@
          {
             JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
 
-            records.put(id, new JournalRecord(usedFile));
+            records.put(id, new JournalRecord(usedFile, record.getEncodeSize()));
          }
          finally
          {
@@ -386,11 +387,11 @@
             // compacting is done
             if (posFiles == null)
             {
-               compactor.addCommandUpdate(id, usedFile);
+               compactor.addCommandUpdate(id, usedFile, size);
             }
             else
             {
-               posFiles.addUpdateFile(usedFile);
+               posFiles.addUpdateFile(usedFile, record.getEncodeSize());
             }
          }
          finally
@@ -511,7 +512,7 @@
          {
             JournalFile usedFile = appendRecord(bb, false, false, tx, null);
 
-            tx.addPositive(usedFile, id);
+            tx.addPositive(usedFile, id, size);
          }
          finally
          {
@@ -560,7 +561,7 @@
          {
             JournalFile usedFile = appendRecord(bb, false, false, tx, null);
 
-            tx.addPositive(usedFile, id);
+            tx.addPositive(usedFile, id, size);
          }
          finally
          {
@@ -1137,7 +1138,7 @@
 
                loadManager.addRecord(info);
 
-               records.put(info.id, new JournalRecord(file));
+               records.put(info.id, new JournalRecord(file, info.data.length + SIZE_ADD_RECORD));
             }
 
             public void onReadUpdateRecord(RecordInfo info) throws Exception
@@ -1158,7 +1159,7 @@
                   // have been deleted
                   // just leaving some updates in this file
 
-                  posFiles.addUpdateFile(file);
+                  posFiles.addUpdateFile(file, info.data.length + SIZE_UPDATE_RECORD);
                }
             }
 
@@ -1215,7 +1216,7 @@
                   transactions.put(transactionID, tnp);
                }
 
-               tnp.addPositive(file, info.id);
+               tnp.addPositive(file, info.id, info.data.length + SIZE_ADD_RECORD_TX);
             }
 
             public void onReadDeleteRecordTX(long transactionID, RecordInfo info) throws Exception
@@ -1582,6 +1583,8 @@
                         file.getPosCount() +
                         " reclaimStatus = " +
                         file.isCanReclaim() +
+                        " live size = " +
+                        file.getLiveSize() + 
                         "\n");
          if (file instanceof JournalFileImpl)
          {
@@ -2847,37 +2850,46 @@
    public static class JournalRecord
    {
       private final JournalFile addFile;
+      private final int size;
 
-      private List<JournalFile> updateFiles;
+      private List<Pair<JournalFile, Integer>> updateFiles;
 
-      JournalRecord(final JournalFile addFile)
+      JournalRecord(final JournalFile addFile, final int size)
       {
          this.addFile = addFile;
+         
+         this.size = size;
 
          addFile.incPosCount();
+         
+         addFile.addSize(size);
       }
 
-      void addUpdateFile(final JournalFile updateFile)
+      void addUpdateFile(final JournalFile updateFile, final int size)
       {
          if (updateFiles == null)
          {
-            updateFiles = new ArrayList<JournalFile>();
+            updateFiles = new ArrayList<Pair<JournalFile, Integer>>();
          }
 
-         updateFiles.add(updateFile);
+         updateFiles.add(new Pair<JournalFile, Integer>(updateFile, size));
 
          updateFile.incPosCount();
+         
+         updateFile.addSize(size);
       }
 
       void delete(final JournalFile file)
       {
          file.incNegCount(addFile);
+         addFile.decSize(size);
 
          if (updateFiles != null)
          {
-            for (JournalFile updFile : updateFiles)
+            for (Pair<JournalFile, Integer> updFile : updateFiles)
             {
-               file.incNegCount(updFile);
+               file.incNegCount(updFile.a);
+               file.decSize(updFile.b);
             }
          }
       }
@@ -2890,9 +2902,9 @@
          if (updateFiles != null)
          {
 
-            for (JournalFile update : updateFiles)
+            for (Pair<JournalFile, Integer> update : updateFiles)
             {
-               buffer.append(", update=" + update.getFile().getFileName());
+               buffer.append(", update=" + update.a.getFile().getFileName());
             }
 
          }

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java	2009-07-01 05:44:19 UTC (rev 7507)
@@ -47,9 +47,9 @@
 
    private final JournalImpl journal;
 
-   private List<Pair<JournalFile, Long>> pos;
+   private List<JournalUpdate> pos;
 
-   private List<Pair<JournalFile, Long>> neg;
+   private List<JournalUpdate> neg;
 
    private final long id;
 
@@ -101,9 +101,9 @@
       {
          int i = 0;
          long[] ids = new long[pos.size()];
-         for (Pair<JournalFile, Long> el : pos)
+         for (JournalUpdate el : pos)
          {
-            ids[i++] = el.b;
+            ids[i++] = el.getId();
          }
          return ids;
       }
@@ -125,7 +125,7 @@
       {
          if (pos == null)
          {
-            pos = new ArrayList<Pair<JournalFile, Long>>();
+            pos = new ArrayList<JournalUpdate>();
          }
 
          pos.addAll(other.pos);
@@ -135,7 +135,7 @@
       {
          if (neg == null)
          {
-            neg = new ArrayList<Pair<JournalFile, Long>>();
+            neg = new ArrayList<JournalUpdate>();
          }
 
          neg.addAll(other.neg);
@@ -259,7 +259,7 @@
       return currentCallback;
    }
 
-   public void addPositive(final JournalFile file, final long id)
+   public void addPositive(final JournalFile file, final long id, final int size)
    {
       incCounter(file);
 
@@ -267,10 +267,10 @@
 
       if (pos == null)
       {
-         pos = new ArrayList<Pair<JournalFile, Long>>();
+         pos = new ArrayList<JournalUpdate>();
       }
 
-      pos.add(new Pair<JournalFile, Long>(file, id));
+      pos.add(new JournalUpdate(file, id, size));
    }
 
    public void addNegative(final JournalFile file, final long id)
@@ -281,10 +281,10 @@
 
       if (neg == null)
       {
-         neg = new ArrayList<Pair<JournalFile, Long>>();
+         neg = new ArrayList<JournalUpdate>();
       }
 
-      neg.add(new Pair<JournalFile, Long>(file, id));
+      neg.add(new JournalUpdate(file, id, 0));
    }
 
    /** 
@@ -303,47 +303,46 @@
 
          if (pos != null)
          {
-            for (Pair<JournalFile, Long> trUpdate : pos)
+            for (JournalUpdate trUpdate : pos)
             {
-               JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.b);
+               JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.id);
 
-               if (compactor != null && compactor.lookupRecord(trUpdate.b))
+               if (compactor != null && compactor.lookupRecord(trUpdate.id))
                {
                   // This is a case where the transaction was opened after compacting was started,
                   // but the commit arrived while compacting was working
                   // We need to cache the counter update, so compacting will take the correct files when it is done
-                  compactor.addCommandUpdate(trUpdate.b, trUpdate.a);
+                  compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size);
                }
-               else
-               if (posFiles == null)
+               else if (posFiles == null)
                {
-                  posFiles = new JournalImpl.JournalRecord(trUpdate.a);
+                  posFiles = new JournalImpl.JournalRecord(trUpdate.file, trUpdate.size);
 
-                  journal.getRecords().put(trUpdate.b, posFiles);
+                  journal.getRecords().put(trUpdate.id, posFiles);
                }
                else
                {
-                  posFiles.addUpdateFile(trUpdate.a);
+                  posFiles.addUpdateFile(trUpdate.file, trUpdate.size);
                }
             }
          }
 
          if (neg != null)
          {
-            for (Pair<JournalFile, Long> trDelete : neg)
+            for (JournalUpdate trDelete : neg)
             {
-               JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.b);
-               
+               JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
+
                if (posFiles != null)
                {
-                  posFiles.delete(trDelete.a);
+                  posFiles.delete(trDelete.file);
                }
-               else if (compactor != null && compactor.lookupRecord(trDelete.b))
+               else if (compactor != null && compactor.lookupRecord(trDelete.id))
                {
                   // This is a case where the transaction was opened after compacting was started,
                   // but the commit arrived while compacting was working
                   // We need to cache the counter update, so compacting will take the correct files when it is done
-                  compactor.addCommandDelete(trDelete.b, trDelete.a);
+                  compactor.addCommandDelete(trDelete.id, trDelete.file);
                }
             }
          }
@@ -465,4 +464,52 @@
          file.incPosCount();
       }
    }
+
+   static class JournalUpdate
+   {
+      JournalFile file;
+
+      long id;
+
+      int size;
+      
+      
+      /**
+       * @param file
+       * @param id
+       * @param size
+       */
+      public JournalUpdate(JournalFile file, long id, int size)
+      {
+         super();
+         this.file = file;
+         this.id = id;
+         this.size = size;
+      }
+
+      /**
+       * @return the file
+       */
+      public JournalFile getFile()
+      {
+         return file;
+      }
+
+      /**
+       * @return the id
+       */
+      public long getId()
+      {
+         return id;
+      }
+
+      /**
+       * @return the size
+       */
+      public int getSize()
+      {
+         return size;
+      }
+
+   }
 }

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-07-01 05:44:19 UTC (rev 7507)
@@ -3023,15 +3023,29 @@
       {
          delete(i);
       }
+      
+      System.out.println("After delete ****************************");
+      System.out.println(journal.debug());
+      System.out.println("*****************************************");
 
+
       for (int i = 100; i < 200; i++)
       {
          updateTx(transactionID, i);
       }
+      
+      System.out.println("After updatetx ****************************");
+      System.out.println(journal.debug());
+      System.out.println("*****************************************");
 
+
       journal.forceMoveNextFile();
 
       commit(transactionID++);
+      
+      System.out.println("After commit ****************************");
+      System.out.println(journal.debug());
+      System.out.println("*****************************************");
 
       for (int i = 100; i < 200; i++)
       {
@@ -3039,9 +3053,14 @@
          deleteTx(transactionID, i);
       }
 
+      System.out.println("After delete ****************************");
+      System.out.println(journal.debug());
+      System.out.println("*****************************************");
+
+
       commit(transactionID++);
 
-      System.out.println("Before reclaim ****************************");
+      System.out.println("Before reclaim/after commit ****************************");
       System.out.println(journal.debug());
       System.out.println("*****************************************");
 
@@ -3067,27 +3086,6 @@
       assertEquals(0, journal.getDataFilesCount());
    }
 
-   public void testCompactwithPendingPrepare() throws Exception
-   {
-   }
-
-   public void testCompactwithConcurrentDeletes() throws Exception
-   {
-   }
-
-   public void testCompactwithConcurrentAppend() throws Exception
-   {
-   }
-
-   public void testCompactWithPendingTransactionAndDelete() throws Exception
-   {
-   }
-
-   public void testCompactingWithPendingTransaction() throws Exception
-   {
-
-   }
-
    protected abstract int getAlignment();
 
 }

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java	2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java	2009-07-01 05:44:19 UTC (rev 7507)
@@ -900,5 +900,27 @@
       public void clearCounts()
       {
       }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#addSize(int)
+       */
+      public void addSize(int bytes)
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#decSize(int)
+       */
+      public void decSize(int bytes)
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#getSize()
+       */
+      public int getLiveSize()
+      {
+         return 0;
+      }
    }
 }




More information about the jboss-cvs-commits mailing list