[jboss-cvs] JBoss Messaging SVN: r7443 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 23 12:46:34 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-23 12:46:34 -0400 (Tue, 23 Jun 2009)
New Revision: 7443

Modified:
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
tweaks on the temp branch

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-06-23 11:07:33 UTC (rev 7442)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-06-23 16:46:34 UTC (rev 7443)
@@ -57,6 +57,8 @@
 
    long getOffset();
 
+   int getFileID();
+   
    int getOrderingID();
 
    SequentialFile getFile();

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2009-06-23 11:07:33 UTC (rev 7442)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2009-06-23 16:46:34 UTC (rev 7443)
@@ -44,6 +44,8 @@
 
    private final SequentialFile file;
 
+   private final int fileID;
+   
    private final int orderingID;
 
    private long offset;
@@ -56,10 +58,12 @@
 
    private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
 
-   public JournalFileImpl(final SequentialFile file, final int orderingID)
+   public JournalFileImpl(final SequentialFile file, final int fileID, final int orderingID)
    {
       this.file = file;
 
+      this.fileID = fileID;
+      
       this.orderingID = orderingID;
    }
 
@@ -133,6 +137,11 @@
       return offset;
    }
 
+   public int getFileID()
+   {
+      return fileID;
+   }
+   
    public int getOrderingID()
    {
       return orderingID;

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-23 11:07:33 UTC (rev 7442)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-23 16:46:34 UTC (rev 7443)
@@ -934,56 +934,83 @@
       }
 
       final ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot = recordsSnapshotList;
+      
+      Compactor compactor = new Compactor(recordsSnapshot, dataFilesToProcess.get(0).getFileID());
 
       for (final JournalFile file : dataFilesToProcess)
       {
-         readJournalFile(file, new CompactorJournalReader(recordsSnapshot));
+         readJournalFile(file, compactor);
 
-         writeLockCompact.lock();
-         try
-         {
-            // Restore relationshipMap
-            // Deal with updates and deletes that happened during the compacting
+      }
 
-         }
-         finally
-         {
-            writeLockCompact.unlock();
-         }
+      writeLockCompact.lock();
+      try
+      {
+         // Restore relationshipMap
+         // Deal with updates and deletes that happened during the compacting
 
       }
+      finally
+      {
+         writeLockCompact.unlock();
+      }
 
    }
 
-   class CompactorJournalReader implements JournalReader
+   class Compactor implements JournalReader
    {
       JournalFile currentOutputFile;
+      
+      SequentialFile sequentialFile;
 
       ByteBuffer bufferWrite;
+      
+      int nextOrderingID;
 
       ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot;
 
-      public CompactorJournalReader(ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot)
+      public Compactor(ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot, int firstFileID)
       {
          this.recordsSnapshot = recordsSnapshot;
+         this.nextOrderingID = firstFileID;
       }
 
       private void checkSize(int size) throws Exception
       {
          if (bufferWrite == null)
          {
-            bufferWrite = fileFactory.newBuffer(size);
+            flushFile();
          }
+         else
+         {
+            if (bufferWrite.position() + size > bufferWrite.limit())
+            {
+               flushFile();
+            }
+         }
+      }
 
-         if (currentOutputFile == null)
+      /**
+       * @throws Exception
+       */
+      private void flushFile() throws Exception
+      {
+         if (bufferWrite != null)
          {
-            currentOutputFile = openFile(false);
+            sequentialFile.position(0);
+            sequentialFile.write(bufferWrite, true);
+            sequentialFile.close();
          }
+         
+         bufferWrite = fileFactory.newBuffer(fileSize);
+         currentOutputFile = openFile(false);
+         sequentialFile = currentOutputFile.getFile();
+         bufferWrite.putInt(currentOutputFile.getFileID());
+         bufferWrite.putInt(nextOrderingID++);
       }
 
       public void addRecord(RecordInfo info) throws Exception
       {
-
          if (recordsSnapshot.get(info.id) != null)
          {
             System.out.println("Record " + info.id + " to be out on compacted file");
@@ -1780,7 +1807,7 @@
 
       sf.write(bb, true);
 
-      JournalFile jf = new JournalFileImpl(sf, newFileID);
+      JournalFile jf = new JournalFileImpl(sf, newFileID, newFileID);
 
       sf.position(bb.limit());
 
@@ -1985,7 +2012,7 @@
 
          // This record is from a previous file-usage. The file was
          // reused and we need to ignore this record
-         if (readFileId != file.getOrderingID())
+         if (readFileId != file.getFileID())
          {
             // If a file has damaged records, we make it a dataFile, and the
             // next reclaiming will fix it
@@ -2153,7 +2180,7 @@
          {
             for (JournalFile lookupFile : orderedFiles)
             {
-               if (lookupFile.getOrderingID() == ref.a)
+               if (lookupFile.getFileID() == ref.a)
                {
                   // (III) oops, we were expecting at least one record on this
                   // file.
@@ -2321,18 +2348,20 @@
 
          file.read(bb);
 
+         int fileID = bb.getInt();
+         
          int orderingID = bb.getInt();
 
          fileFactory.releaseBuffer(bb);
 
          bb = null;
 
-         if (nextFileID.get() < orderingID)
+         if (nextFileID.get() < fileID)
          {
-            nextFileID.set(orderingID);
+            nextFileID.set(fileID);
          }
 
-         orderedFiles.add(new JournalFileImpl(file, orderingID));
+         orderedFiles.add(new JournalFileImpl(file, fileID, orderingID));
 
          file.close();
       }
@@ -2344,10 +2373,19 @@
       {
          public int compare(final JournalFile f1, final JournalFile f2)
          {
-            int id1 = f1.getOrderingID();
-            int id2 = f2.getOrderingID();
-
-            return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
+            int oid1 = f1.getOrderingID();
+            int oid2 = f2.getOrderingID();
+            
+            if (oid1 == oid2)
+            {
+               int id1 = f1.getFileID();
+               int id2 = f2.getFileID();
+               return oid1 < id2 ? -1 : id1 == id2 ? 0 : 1;
+            }
+            else
+            {
+               return oid1 < oid2 ? -1 : oid1 == oid2 ? 0 : 1;
+            }
          }
       }
 
@@ -2400,7 +2438,7 @@
 
          bb.writerIndex(SIZE_BYTE);
 
-         bb.writeInt(currentFile.getOrderingID());
+         bb.writeInt(currentFile.getFileID());
 
          if (callback != null)
          {
@@ -2459,7 +2497,7 @@
 
       sequentialFile.write(bb, true);
 
-      JournalFile info = new JournalFileImpl(sequentialFile, fileID);
+      JournalFile info = new JournalFileImpl(sequentialFile, fileID, fileID);
 
       if (!keepOpened)
       {
@@ -2975,12 +3013,12 @@
 
       private AtomicInteger getCounter(final JournalFile file)
       {
-         AtomicInteger value = numberOfElementsPerFile.get(file.getOrderingID());
+         AtomicInteger value = numberOfElementsPerFile.get(file.getFileID());
 
          if (value == null)
          {
             value = new AtomicInteger();
-            numberOfElementsPerFile.put(file.getOrderingID(), value);
+            numberOfElementsPerFile.put(file.getFileID(), value);
          }
 
          return value;

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java	2009-06-23 11:07:33 UTC (rev 7442)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java	2009-06-23 16:46:34 UTC (rev 7443)
@@ -762,7 +762,7 @@
          return 0;
       }
 
-      public int getOrderingID()
+      public int getFileID()
       {
          return 0;
       }
@@ -885,5 +885,13 @@
       public void incPendingTransaction()
       {
       }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#getOrderingID()
+       */
+      public int getOrderingID()
+      {
+         return 0;
+      }
    }
 }




More information about the jboss-cvs-commits mailing list