[jboss-cvs] JBoss Messaging SVN: r7475 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 25 20:02:15 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-25 20:02:15 -0400 (Thu, 25 Jun 2009)
New Revision: 7475

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
tweaks

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-06-25 23:21:12 UTC (rev 7474)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-06-26 00:02:15 UTC (rev 7475)
@@ -35,6 +35,10 @@
  */
 public interface JournalFile
 {
+   
+   /** Used during compacting (clearing counters) */
+   void clearCounts();
+   
    int getNegCount(JournalFile file);
 
    void incNegCount(JournalFile file);
@@ -45,12 +49,6 @@
 
    void decPosCount();
    
-   void incPendingTransaction();
-   
-   void decPendingTransaction();
-   
-   int getPendingTransactions();
-
    void setCanReclaim(boolean canDelete);
 
    boolean isCanReclaim();

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2009-06-25 23:21:12 UTC (rev 7474)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2009-06-26 00:02:15 UTC (rev 7475)
@@ -49,15 +49,13 @@
    private final int orderingID;
 
    private long offset;
-
-   private final AtomicInteger pendingTransactions = new AtomicInteger(0);
    
    private final AtomicInteger posCount = new AtomicInteger(0);
 
    private boolean canReclaim;
 
    private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
-
+   
    public JournalFileImpl(final SequentialFile file, final int fileID, final int orderingID)
    {
       this.file = file;
@@ -67,6 +65,12 @@
       this.orderingID = orderingID;
    }
 
+   public void clearCounts()
+   {
+      negCounts.clear();
+      posCount.set(0);
+   }
+
    public int getPosCount()
    {
       return posCount.intValue();
@@ -111,22 +115,6 @@
       posCount.decrementAndGet();
    }
    
-   public void incPendingTransaction()
-   {
-      pendingTransactions.incrementAndGet();
-   }
-   
-   public void decPendingTransaction()
-   {
-      pendingTransactions.decrementAndGet();
-   }
-   
-   public int getPendingTransactions()
-   {
-      return pendingTransactions.get();
-   }
-
-
    public void extendOffset(final int delta)
    {
       offset += delta;
@@ -197,4 +185,5 @@
       return count;
    }
 
+
 }

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-25 23:21:12 UTC (rev 7474)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-26 00:02:15 UTC (rev 7475)
@@ -817,6 +817,7 @@
          // We need to guarantee that the journal is frozen for this short time
          // We don't freeze the journal as we compact, only for the short time where we replace records
          compactingLock.writeLock().lock();
+         currentFile.clearCounts();
          try
          {
             autoReclaim = false;
@@ -828,16 +829,8 @@
 
             for (JournalFile file : dataFiles)
             {
-               if (file.getPendingTransactions() == 0)
-               {
-                  trace("Adding " + file + " to compact list");
-                  dataFilesToProcess.add(file);
-               }
-               else
-               {
-                  trace(file + " will not be compacted as it has pending transactions");
-                  break;
-               }
+               file.clearCounts();
+               dataFilesToProcess.add(file);
             }
 
             this.compactor = new Compactor(recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
@@ -877,8 +870,7 @@
 
    class Compactor implements JournalReader
    {
-      
-      
+
       JournalFile currentFile;
 
       SequentialFile sequentialFile;
@@ -888,7 +880,7 @@
       ChannelBuffer channelWrapper;
 
       int nextOrderingID;
-      
+
       final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
 
       final Map<Long, JournalRecord> recordsSnapshot;
@@ -928,7 +920,7 @@
          if (channelWrapper != null)
          {
             sequentialFile.position(0);
-            sequentialFile.write(channelWrapper, true);
+            sequentialFile.write(channelWrapper.toByteBuffer(), true);
             sequentialFile.close();
             newDataFiles.add(currentFile);
          }
@@ -954,17 +946,14 @@
          fileID = nextOrderingID++;
          System.out.println("Next OrderingID = " + nextOrderingID);
 
-         
          channelWrapper.writeInt(fileID);
          channelWrapper.writeInt(fileID);
-         
-         
-         
-         for (int i = 0 ; i < 1000; i++)
+
+         for (int i = 0; i < 1000; i++)
          {
             channelWrapper.writeByte(UnitTestCase.getSamplebyte(i));
          }
-         
+
       }
 
       public void addRecord(RecordInfo info) throws Exception
@@ -1106,12 +1095,11 @@
             checkSize(size);
 
             JournalRecord newRecord = newRecords.get(info.id);
-            
+
             if (newRecord == null)
             {
                log.warn("Couldn't find addRecord information for record " + info.id + " during compacting");
             }
-            
 
             writeUpdateRecord(fileID,
                               info.id,
@@ -1119,9 +1107,9 @@
                               new ByteArrayEncoding(info.data),
                               size,
                               channelWrapper);
-            
+
             newRecord.addUpdateFile(currentFile);
-            
+
          }
       }
 
@@ -1131,7 +1119,7 @@
          if (pendingTransactions.get(transactionID) != null)
          {
             JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-            
+
             int size = SIZE_UPDATE_RECORD_TX + info.data.length;
 
             checkSize(size);
@@ -1143,13 +1131,12 @@
                                 new ByteArrayEncoding(info.data),
                                 size,
                                 channelWrapper);
-            
-            
+
             newTransaction.addPositive(currentFile, info.id);
          }
          else
          {
-            
+
             updateRecord(info);
          }
       }
@@ -1919,24 +1906,16 @@
 
    public int readJournalFile(JournalFile file, JournalReader reader) throws Exception
    {
-      ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
-
+      
       file.getFile().open(1);
 
-      int bytesRead = file.getFile().read(wholeFileBuffer);
+      ByteBuffer wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
 
-      if (bytesRead != fileSize)
+      int bytesRead = file.getFile().read(wholeFileBuffer);
+      
+      if (bytesRead != file.getFile().size())
       {
-         // FIXME - We should extract everything we can from this file
-         // and then we shouldn't ever reuse this file on reclaiming (instead
-         // reclaim on different size files would aways throw the file away)
-         // rather than throw ISE!
-         // We don't want to leave the user with an unusable system
-         throw new IllegalStateException("File is wrong size " + bytesRead +
-                                         " expected " +
-                                         fileSize +
-                                         " : " +
-                                         file.getFile().getFileName());
+         throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
       }
 
       wholeFileBuffer.position(0);
@@ -2227,9 +2206,6 @@
    /**
     * <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
     * <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
-    * <p> We record a summary about the pendingTransactions on the journal file on COMMIT and PREPARE. 
-    *     When we load the pendingTransactions we build a new summary and we check the original summary to the current summary.
-    *     This method is basically verifying if the entire transaction is being loaded </p> 
     *     
     * <p>Look at the javadoc on {@link JournalImpl#appendCommitRecord(long)} about how the transaction-summary is recorded</p> 
     *     
@@ -2243,8 +2219,6 @@
                                           final List<JournalFile> orderedFiles,
                                           final int numberOfRecords)
    {
-      // (I) First we get the summary of what we really have on the files now:
-
       return journalTransaction.getCounter(currentFile) == numberOfRecords;
    }
 
@@ -2338,7 +2312,7 @@
                                   ChannelBuffer bb)
    {
       bb.writeByte(UPDATE_RECORD);
-      bb.writeInt(fileId); // skip ID part
+      bb.writeInt(fileId);
       bb.writeLong(id);
       bb.writeInt(record.getEncodeSize());
       bb.writeByte(recordType);
@@ -3181,7 +3155,6 @@
          for (JournalFile jf : pendingFiles)
          {
             file.incNegCount(jf);
-            jf.decPendingTransaction();
          }
       }
 
@@ -3224,7 +3197,6 @@
          for (JournalFile jf : pendingFiles)
          {
             file.incNegCount(jf);
-            jf.decPendingTransaction();
          }
       }
 
@@ -3247,7 +3219,6 @@
          for (JournalFile jf : pendingFiles)
          {
             jf.decPosCount();
-            jf.decPendingTransaction();
          }
 
       }
@@ -3267,8 +3238,6 @@
             // prevents any transactional operations
             // being deleted before a commit or rollback is written
             file.incPosCount();
-
-            file.incPendingTransaction();
          }
       }
    }

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-25 23:21:12 UTC (rev 7474)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-26 00:02:15 UTC (rev 7475)
@@ -3145,7 +3145,7 @@
       }
 
       journal.forceMoveNextFile();
-
+      
       System.out.println("Number of Files: " + journal.getDataFilesCount());
 
       System.out.println("Before compact ****************************");

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java	2009-06-25 23:21:12 UTC (rev 7474)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java	2009-06-26 00:02:15 UTC (rev 7475)
@@ -893,5 +893,12 @@
       {
          return 0;
       }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#clearCounts()
+       */
+      public void clearCounts()
+      {
+      }
    }
 }




More information about the jboss-cvs-commits mailing list