[jboss-cvs] JBoss Messaging SVN: r4718 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jul 23 19:03:13 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-07-23 19:03:13 -0400 (Wed, 23 Jul 2008)
New Revision: 4718

Modified:
   trunk/build-messaging.xml
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
More journal improvements and tests

Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2008-07-23 14:53:11 UTC (rev 4717)
+++ trunk/build-messaging.xml	2008-07-23 23:03:13 UTC (rev 4718)
@@ -781,7 +781,7 @@
       </junit>
    </target>
 
-   <target name="all-tests" depends="unit-tests, integration-tests, timing-tests, performance-tests, concurrent-tests, jms-tests"/>
+   <target name="all-tests" depends="unit-tests, integration-tests, timing-tests, performance-tests, concurrent-tests, stress-tests, jms-tests"/>
 
    <target name="compile-reports">
       <mkdir dir="${test.stylesheets.dir}"/>

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-23 14:53:11 UTC (rev 4717)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-23 23:03:13 UTC (rev 4718)
@@ -63,8 +63,10 @@
 
 /**
  * 
- * A JournalImpl
+ * <p>A JournalImpl</p
  * 
+ * <p>WIKI Page: <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal"> http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal</a></p>
+ * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
@@ -183,7 +185,6 @@
     */
    
    //TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
-   // this locks access to currentFile
    private final Semaphore lock = new Semaphore(1, true);
    
    private volatile JournalFile currentFile ;
@@ -728,6 +729,9 @@
             
             if (readFileId != file.getOrderingID())
             {
+               // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+               hasData = true;
+
                bb.position(pos + 1);
                //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
                continue;
@@ -802,6 +806,8 @@
             if (checkSize != variableSize + recordSize)
             {
                log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
+               // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+               hasData = true;
                bb.position(pos + SIZE_BYTE);
                continue;
             }
@@ -992,7 +998,7 @@
                      else
                      {
                         log.warn("Transaction " + transactionID + " is missing elements so the transaction is being ignored");
-                        journalTransaction.rollback(file);
+                        journalTransaction.forget();
                      }
                      
                      hasData = true;         
@@ -1062,10 +1068,13 @@
       //FIXME - size() involves a scan
       int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
       
-      for (int i = 0; i < filesToCreate; i++)
+      if (filesToCreate > 0)
       {
-         // Keeping all files opened can be very costly (mainly on AIO)
-         freeFiles.add(createFile(false));
+         for (int i = 0; i < filesToCreate; i++)
+         {
+            // Keeping all files opened can be very costly (mainly on AIO)
+            freeFiles.add(createFile(false));
+         }
       }
       
       //The current file is the last one
@@ -1167,12 +1176,19 @@
          builder.append("FreeFile:" + file + "\n");
       }
       
-      builder.append("CurrentFile:" + currentFile+ " posCounter = " + currentFile.getPosCount() + "\n");
-      
-      if (currentFile instanceof JournalFileImpl)
+      if (currentFile != null)
       {
-         builder.append(((JournalFileImpl)currentFile).debug());
+         builder.append("CurrentFile:" + currentFile+ " posCounter = " + currentFile.getPosCount() + "\n");
+         
+         if (currentFile instanceof JournalFileImpl)
+         {
+            builder.append(((JournalFileImpl)currentFile).debug());
+         }
       }
+      else
+      {
+         builder.append("CurrentFile: No current file at this point!");
+      }
       
       builder.append("#Opened Files:" + this.openedFiles.size());
       
@@ -1229,26 +1245,8 @@
             {
                //Re-initialise it
                
-               int newOrderingID = generateOrderingID();
+               JournalFile jf = reinitializeFile(file);
                
-               SequentialFile sf = file.getFile();
-               
-               sf.open();
-               
-               ByteBuffer bb = fileFactory.newBuffer(SIZE_INT); 
-               
-               bb.putInt(newOrderingID);
-               
-               int bytesWritten = sf.write(bb, true);
-               
-               JournalFile jf = new JournalFileImpl(sf, newOrderingID);
-               
-               sf.position(bytesWritten);
-               
-               jf.setOffset(bytesWritten);
-               
-               sf.close();
-               
                freeFiles.add(jf);  
             }
             else
@@ -1262,7 +1260,7 @@
          }
       }
    }
-   
+
    public int getDataFilesCount()
    {
       return dataFiles.size();
@@ -1404,6 +1402,31 @@
    
    // Private -----------------------------------------------------------------------------
 
+   // Discard the old JournalFile and set it with a new ID
+   private JournalFile reinitializeFile(JournalFile file) throws Exception
+   {
+      int newOrderingID = generateOrderingID();
+      
+      SequentialFile sf = file.getFile();
+      
+      sf.open();
+      
+      ByteBuffer bb = fileFactory.newBuffer(SIZE_INT); 
+      
+      bb.putInt(newOrderingID);
+      
+      int bytesWritten = sf.write(bb, true);
+      
+      JournalFile jf = new JournalFileImpl(sf, newOrderingID);
+      
+      sf.position(bytesWritten);
+      
+      jf.setOffset(bytesWritten);
+      
+      sf.close();
+      return jf;
+   }
+   
    @SuppressWarnings("unchecked")
    private Pair<Integer, Integer>[] readReferencesOnTransaction(int variableSize, ByteBuffer bb)
    {

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java	2008-07-23 14:53:11 UTC (rev 4717)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java	2008-07-23 23:03:13 UTC (rev 4718)
@@ -27,21 +27,23 @@
 
 /**
  * 
- * A ReclaimerTest
+ * <p>A ReclaimerTest</p>
  * 
- * The journal consists of an ordered list of journal files Fn where 0 <= n <= N
+ * <p>The journal consists of an ordered list of journal files Fn where 0 <= n <= N</p>
  * 
- * A journal file can contain either positives (pos) or negatives (neg)
+ * <p>A journal file can contain either positives (pos) or negatives (neg)</p>
  * 
- * (Positives correspond either to adds or updates, and negatives correspond to deletes).
+ * <p>(Positives correspond either to adds or updates, and negatives correspond to deletes).</p>
  * 
- * A file Fn can be deleted if, and only if the following criteria are satisified
+ * <p>A file Fn can be deleted if, and only if the following criteria are satisified</p>
  * 
- * 1) All pos in a file Fn, must have corresponding neg in any file Fm where m >= n.
+ * <p>1) All pos in a file Fn, must have corresponding neg in any file Fm where m >= n.</p>
  * 
- * 2) All pos that correspond to any neg in file Fn, must all live in any file Fm where 0 <= m <= n
- * which are also marked for deletion in the same pass of the algorithm.
+ * <p>2) All pos that correspond to any neg in file Fn, must all live in any file Fm where 0 <= m <= n
+ * which are also marked for deletion in the same pass of the algorithm.</p>
  * 
+ * <p>WIKI Page: <a href="http://wiki.jboss.org/wiki/JBossMessaging2Reclaiming">http://wiki.jboss.org/wiki/JBossMessaging2Reclaiming</a></p>
+ * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2008-07-23 14:53:11 UTC (rev 4717)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2008-07-23 23:03:13 UTC (rev 4718)
@@ -25,6 +25,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.List;
 
 import org.jboss.messaging.core.journal.EncodingSupport;
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;
@@ -70,8 +71,7 @@
    public void testBasicAlignment() throws Exception
    {
       
-      FakeSequentialFileFactory factory = new FakeSequentialFileFactory(200,
-            true, false);
+      FakeSequentialFileFactory factory = new FakeSequentialFileFactory(200, true);
       
       SequentialFile file = factory.createSequentialFile("test1", 100, 10000);
 
@@ -438,8 +438,6 @@
 
       journalImpl.checkAndReclaimFiles();
       
-      System.out.println("Journal: " + journalImpl.debug());
-      
       assertEquals(2, factory.listFiles("tt").size());
       
    }
@@ -570,19 +568,274 @@
    }
    
    
-   public void testReloadInvalidVariableSize() throws Exception
+   public void testReloadInvalidCheckSizeOnTransaction() throws Exception
    {
-      // Test to be written
+      final int JOURNAL_SIZE = 20000;
+      
+      setupJournal(JOURNAL_SIZE, 100);
+      
+      assertEquals(2, factory.listFiles("tt").size());
+      
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+      
+      for (int i = 0; i < 20 ; i++)
+      {
+         journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.forceMoveNextFile();
+      }
+      
+      journalImpl.forceMoveNextFile();
+      
+      journalImpl.appendCommitRecord(1l);
+      
+      SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000, 5000);
+      
+      file.open();
+      
+      ByteBuffer buffer = ByteBuffer.allocate(100);
+      
+      // Messing up with the first record (removing the position)
+      file.position(100);
+      
+      file.read(buffer);
+
+      // jumping RecordType, FileId, TransactionID, RecordID, VariableSize, RecordType, RecordBody (that we know it is 1 )
+      buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1); 
+      
+      int posCheckSize = buffer.position();
+      
+      assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+      
+      buffer.position(posCheckSize);
+      
+      buffer.putInt(-1);
+      
+      buffer.rewind();
+      
+      // Changing the check size, so reload will ignore this record
+      file.position(100);
+
+      file.write(buffer, true);
+      
+      file.close();
+
+      setupJournal(JOURNAL_SIZE, 100);
+      
+      assertEquals(0, records.size());
+      
+      journalImpl.checkAndReclaimFiles();
+      
+      assertEquals(0, journalImpl.getDataFilesCount());
+      
+      assertEquals(2, factory.listFiles("tt").size());
+      
    }
+
+   public void testPartiallyBrokenFile() throws Exception
+   {
+      final int JOURNAL_SIZE = 20000;
+      
+      setupJournal(JOURNAL_SIZE, 100);
+      
+      assertEquals(2, factory.listFiles("tt").size());
+      
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+      
+      for (int i = 0; i < 20 ; i++)
+      {
+         journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.appendAddRecordTransactional(2l, (long)i + 20l, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.forceMoveNextFile();
+      }
+      
+      
+      journalImpl.forceMoveNextFile();
+      
+      journalImpl.appendCommitRecord(1l);
+      
+      journalImpl.appendCommitRecord(2l);
+      
+      SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000, 5000);
+      
+      file.open();
+      
+      ByteBuffer buffer = ByteBuffer.allocate(100);
+      
+      // Messing up with the first record (removing the position)
+      file.position(100);
+      
+      file.read(buffer);
+
+      // jumping RecordType, FileId, TransactionID, RecordID, VariableSize, RecordType, RecordBody (that we know it is 1 )
+      buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1); 
+      
+      int posCheckSize = buffer.position();
+      
+      assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+      
+      buffer.position(posCheckSize);
+      
+      buffer.putInt(-1);
+      
+      buffer.rewind();
+      
+      // Changing the check size, so reload will ignore this record
+      file.position(100);
+
+      file.write(buffer, true);
+      
+      file.close();
+
+      setupJournal(JOURNAL_SIZE, 100);
+      
+      assertEquals(20, records.size());
+      
+      journalImpl.checkAndReclaimFiles();
+      
+      assertEquals(20, journalImpl.getDataFilesCount());
+      
+      assertEquals(22, factory.listFiles("tt").size());
+      
+   }
+
+   public void testReduceFreeFiles() throws Exception
+   {
+      final int JOURNAL_SIZE = 20000;
+      
+      setupJournal(JOURNAL_SIZE, 100, 10);
+      
+      assertEquals(10, factory.listFiles("tt").size());
+      
+      setupJournal(JOURNAL_SIZE, 100, 2);
+      
+      assertEquals(10, factory.listFiles("tt").size());
+      
+      for (int i = 0; i < 10; i++)
+      {
+         journalImpl.appendAddRecord(i, (byte)0, new SimpleEncoding(1,(byte)0));
+         journalImpl.forceMoveNextFile();
+      }
+      
+      setupJournal(JOURNAL_SIZE, 100, 2);
+      
+      assertEquals(10, records.size());
+      
+      assertEquals(12, factory.listFiles("tt").size());
+      
+      for (int i = 0; i < 10; i++)
+      {
+         journalImpl.appendDeleteRecord(i);
+      }
+      
+      journalImpl.forceMoveNextFile();
+      
+      journalImpl.checkAndReclaimFiles();
+      
+      setupJournal(JOURNAL_SIZE, 100, 2);
+
+      assertEquals(0, records.size());
+      
+      assertEquals(2, factory.listFiles("tt").size());
+   }
+
    
    public void testReloadIncompleteTransaction() throws Exception
    {
-      // We should miss one record (hole) on the transaction
+      final int JOURNAL_SIZE = 20000;
+      
+      setupJournal(JOURNAL_SIZE, 100);
+      
+      assertEquals(2, factory.listFiles("tt").size());
+      
+      
+      assertEquals(0, records.size());
+      assertEquals(0, transactions.size());
+      
+      for (int i = 0; i < 10 ; i++)
+      {
+         journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.forceMoveNextFile();
+      }
+      
+      
+      for (int i = 10; i < 20 ; i++)
+      {
+         journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.forceMoveNextFile();
+      }
+      
+      journalImpl.forceMoveNextFile();
+      
+      journalImpl.appendCommitRecord(1l);
+      
+      SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000, 5000);
+      
+      file.open();
+      
+      ByteBuffer buffer = ByteBuffer.allocate(100);
+      
+      // Messing up with the first record (removing the position)
+      file.position(100);
+      
+      file.read(buffer);
+
+      buffer.position(1);
+      
+      buffer.putInt(-1);
+      
+      buffer.rewind();
+      
+      // Messing up with the first record (changing the fileID, so Journal reload will think the record came from a different journal usage)
+      file.position(100);
+
+      file.write(buffer, true);
+      
+      file.close();
+
+      setupJournal(JOURNAL_SIZE, 100);
+      
+      assertEquals(0, records.size());
+      
+      journalImpl.checkAndReclaimFiles();
+      
+      assertEquals(0, journalImpl.getDataFilesCount());
+      
+      assertEquals(2, factory.listFiles("tt").size());
+      
    }
    
    public void testAsynchronousCommit() throws Exception
    {
-      // We should miss one record (hole) on the transaction
+//      final int JOURNAL_SIZE = 20000;
+//      
+//      setupJournal(JOURNAL_SIZE, 100, 5);
+//      
+//      assertEquals(2, factory.listFiles("tt").size());
+//      
+//      assertEquals(0, records.size());
+//      assertEquals(0, transactions.size());
+//      
+//      for (int i = 0; i < 10 ; i++)
+//      {
+//         journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+//         journalImpl.forceMoveNextFile();
+//      }
+//      
+//      
+//      for (int i = 10; i < 20 ; i++)
+//      {
+//         journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+//         journalImpl.forceMoveNextFile();
+//      }
+//      
+//      journalImpl.forceMoveNextFile();
+//      
+//      journalImpl.appendCommitRecord(1l);
+//      
    }
    
    public void testAsynchronousRollback() throws Exception
@@ -590,16 +843,11 @@
       // We should miss one record (hole) on the transaction
    }
    
-   public void testGarbageBetweenRecords() throws Exception
-   {
-      // We should miss one record (hole) on the transaction
-   }
-   
    public void testPrepareAloneOnSeparatedFile() throws Exception
    {
       final int JOURNAL_SIZE = 20000;
       
-      setupJournal(JOURNAL_SIZE, 100, 5);
+      setupJournal(JOURNAL_SIZE, 100);
       
       assertEquals(0, records.size());
       assertEquals(0, transactions.size());
@@ -624,7 +872,7 @@
       journalImpl.forceMoveNextFile();
       journalImpl.checkAndReclaimFiles();
 
-      setupJournal(JOURNAL_SIZE, 100, 5);
+      setupJournal(JOURNAL_SIZE, 100);
       
       assertEquals(1, records.size());
    }
@@ -633,7 +881,7 @@
    {
       final int JOURNAL_SIZE = 20000;
       
-      setupJournal(JOURNAL_SIZE, 100, 5);
+      setupJournal(JOURNAL_SIZE, 100);
       
       assertEquals(0, records.size());
       assertEquals(0, transactions.size());
@@ -662,7 +910,7 @@
       journalImpl.forceMoveNextFile();
       journalImpl.checkAndReclaimFiles();
 
-      setupJournal(JOURNAL_SIZE, 100, 5);
+      setupJournal(JOURNAL_SIZE, 100);
       
       assertEquals(40, records.size());
       
@@ -687,8 +935,6 @@
       
       journalImpl.debugWait();
       
-      //System.out.println("files = " + journalImpl.debug());
-      
       journalImpl.appendPrepareRecord(1l);
 
       assertEquals(12, factory.listFiles("tt").size());
@@ -733,6 +979,8 @@
       assertEquals(0, transactions.size());
 
       journalImpl.forceMoveNextFile();
+
+      // Reclaiming should still be able to reclaim a file if a transaction was ignored
       journalImpl.checkAndReclaimFiles();
       
       assertEquals(2, factory.listFiles("tt").size());
@@ -790,7 +1038,7 @@
       if (factory == null)
       {
          factory = new FakeSequentialFileFactory(alignment,
-               true, false);
+               true);
       }
       
       if (journalImpl != null)

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-07-23 14:53:11 UTC (rev 4717)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2008-07-23 23:03:13 UTC (rev 4718)
@@ -55,7 +55,7 @@
    
    private final boolean supportsCallback; 
    
-   private final boolean holdCallbacks;
+   private volatile boolean holdCallbacks;
    
    private final List<Runnable> callbacksInHold;
    
@@ -63,24 +63,16 @@
    
    // Constructors --------------------------------------------------
    
-   public FakeSequentialFileFactory(final int alignment, final boolean supportsCallback, final boolean holdCallback)
+   public FakeSequentialFileFactory(final int alignment, final boolean supportsCallback)
    {
       this.alignment = alignment;
       this.supportsCallback = supportsCallback;
-      this.holdCallbacks = holdCallback;
-      if (holdCallbacks)
-      {
-         callbacksInHold = new ArrayList<Runnable>();
-      }
-      else
-      {
-         callbacksInHold = null;
-      }
+      callbacksInHold = new ArrayList<Runnable>();
    }
 
    public FakeSequentialFileFactory()
    {
-      this(1, false, false);
+      this(1, false);
    }
 
    
@@ -151,6 +143,16 @@
       return ByteBuffer.wrap(bytes);
    }
    
+   public boolean isHoldCallbacks()
+   {
+      return holdCallbacks;
+   }
+
+   public void setHoldCallbacks(boolean holdCallbacks)
+   {
+      this.holdCallbacks = holdCallbacks;
+   }
+
    public void flushAllCallbacks()
    {
       for (Runnable action : callbacksInHold)




More information about the jboss-cvs-commits mailing list