[hornetq-commits] JBoss hornetq SVN: r9404 - in trunk: tests/src/org/hornetq/tests/integration/journal and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 16 00:47:08 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-07-16 00:47:08 -0400 (Fri, 16 Jul 2010)
New Revision: 9404

Modified:
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
   trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - Fix on the journal

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-07-15 18:23:17 UTC (rev 9403)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-07-16 04:47:08 UTC (rev 9404)
@@ -904,15 +904,24 @@
       try
       {
 
-         JournalRecord record = records.remove(id);
+         JournalRecord record = null;
+         
+         if (compactor == null)
+         {
+            record = records.remove(id);
 
-         if (record == null)
-         {
-            if (!(compactor != null && compactor.lookupRecord(id)))
+            if (record == null)
             {
                throw new IllegalStateException("Cannot find add info " + id);
             }
          }
+         else
+         {
+            if (!records.containsKey(id) && !compactor.lookupRecord(id))
+            {
+               throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
+            }
+         }
 
          JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
 
@@ -1464,7 +1473,7 @@
                return;
             }
 
-            autoReclaim = false;
+            setAutoReclaim(false);
 
             // We need to move to the next file, as we need a clear start for negatives and positives counts
             moveNextFile(true);

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java	2010-07-15 18:23:17 UTC (rev 9403)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java	2010-07-16 04:47:08 UTC (rev 9404)
@@ -317,18 +317,18 @@
          {
             for (JournalUpdate trDelete : neg)
             {
-               JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
-
-               if (posFiles != null)
+               if (compactor != null)
                {
-                  posFiles.delete(trDelete.file);
+                  compactor.addCommandDelete(trDelete.id, trDelete.file);
                }
-               else if (compactor != null && compactor.lookupRecord(trDelete.id))
+               else
                {
-                  // This is a case where the transaction was opened after compacting was started,
-                  // but the commit arrived while compacting was working
-                  // We need to cache the counter update, so compacting will take the correct files when it is done
-                  compactor.addCommandDelete(trDelete.id, trDelete.file);
+                  JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
+   
+                  if (posFiles != null)
+                  {
+                     posFiles.delete(trDelete.file);
+                  }
                }
             }
          }

Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-07-15 18:23:17 UTC (rev 9403)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-07-16 04:47:08 UTC (rev 9404)
@@ -563,8 +563,7 @@
 
    }
    
-   // This test is under investigation... disabled for now
-   public void _testCompactAddAndUpdateFollowedByADelete() throws Exception
+   public void testCompactAddAndUpdateFollowedByADelete() throws Exception
    {
 
       setup(2, 60 * 1024, false);
@@ -601,15 +600,210 @@
 
       startJournal();
       load();
+
+      long consumerTX = idGen.generateID();
       
       long firstID = idGen.generateID();
+      
+      long appendTX = idGen.generateID();
+      
+      long addedRecord = idGen.generateID();
+      
+      addTx(consumerTX, firstID);
+      
+      Thread tCompact = new Thread()
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               journal.compact();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
 
+
+      tCompact.start();
+      
+
+      reusableLatchDone.waitCompletion();
+      
+      addTx(appendTX, addedRecord);
+
+      commit(appendTX);
+
+      updateTx(consumerTX, addedRecord);
+      
+      commit(consumerTX);
+      
+      delete(addedRecord);
+      
+      reusableLatchWait.down();
+      
+      tCompact.join();
+
+      journal.forceMoveNextFile();
+      
+      long newRecord = idGen.generateID();
+      add(newRecord);
+      update(newRecord);
+
+      journal.compact();
+      
+      System.out.println("Debug after compact\n" + journal.debug());
+      
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+   public void testCompactAddAndUpdateFollowedByADelete2() throws Exception
+   {
+
+      setup(2, 60 * 1024, false);
+      
+      SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+
+      final VariableLatch reusableLatchDone = new VariableLatch();
+      reusableLatchDone.up();
+      final VariableLatch reusableLatchWait = new VariableLatch();
+      reusableLatchWait.up();
+
+      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+      {
+
+         @Override
+         public void onCompactDone()
+         {
+            reusableLatchDone.down();
+            System.out.println("Waiting on Compact");
+            try
+            {
+               reusableLatchWait.waitCompletion();
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            System.out.println("Done");
+         }
+      };
+
+      journal.setAutoReclaim(false);
+
+      startJournal();
+      load();
+      
+      long firstID = idGen.generateID();
+
       long consumerTX = idGen.generateID();
       
       long appendTX = idGen.generateID();
       
       long addedRecord = idGen.generateID();
       
+      addTx(consumerTX, firstID);
+
+      
+      Thread tCompact = new Thread()
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               journal.compact();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+
+      tCompact.start();
+
+      reusableLatchDone.waitCompletion();
+      
+      addTx(appendTX, addedRecord);
+      commit(appendTX);
+      updateTx(consumerTX, addedRecord);
+      commit(consumerTX);
+      
+      long deleteTXID = idGen.generateID();
+      
+      deleteTx(deleteTXID, addedRecord);
+
+      commit(deleteTXID);
+    
+      reusableLatchWait.down();
+      
+      tCompact.join();
+
+      journal.forceMoveNextFile();
+      
+      journal.compact();
+      
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+   public void testCompactAddAndUpdateFollowedByADelete3() throws Exception
+   {
+
+      setup(2, 60 * 1024, false);
+      
+      SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+
+      final VariableLatch reusableLatchDone = new VariableLatch();
+      reusableLatchDone.up();
+      final VariableLatch reusableLatchWait = new VariableLatch();
+      reusableLatchWait.up();
+
+      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+      {
+
+         @Override
+         public void onCompactDone()
+         {
+            reusableLatchDone.down();
+            System.out.println("Waiting on Compact");
+            try
+            {
+               reusableLatchWait.waitCompletion();
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            System.out.println("Done");
+         }
+      };
+
+      journal.setAutoReclaim(false);
+
+      startJournal();
+      load();
+      
+      long firstID = idGen.generateID();
+
+      long consumerTX = idGen.generateID();
+      
+      long addedRecord = idGen.generateID();
+      
       add(firstID);
 
       updateTx(consumerTX, firstID);
@@ -637,11 +831,102 @@
 
       reusableLatchDone.waitCompletion();
       
+      addTx(consumerTX, addedRecord);
+      commit(consumerTX);
+      delete(addedRecord);
+      
+      reusableLatchWait.down();
+      
+      tCompact.join();
+
+      journal.compact();
+      
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+
+   public void testCompactAddAndUpdateFollowedByADelete4() throws Exception
+   {
+
+      setup(2, 60 * 1024, false);
+      
+      SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+
+      final VariableLatch reusableLatchDone = new VariableLatch();
+      reusableLatchDone.up();
+      final VariableLatch reusableLatchWait = new VariableLatch();
+      reusableLatchWait.up();
+
+      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+      {
+
+         @Override
+         public void onCompactDone()
+         {
+            reusableLatchDone.down();
+            System.out.println("Waiting on Compact");
+            try
+            {
+               reusableLatchWait.waitCompletion();
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            System.out.println("Done");
+         }
+      };
+
+      journal.setAutoReclaim(false);
+
+      startJournal();
+      load();
+
+      long consumerTX = idGen.generateID();
+      
+      long firstID = idGen.generateID();
+      
+      long appendTX = idGen.generateID();
+      
+      long addedRecord = idGen.generateID();
+      
+      Thread tCompact = new Thread()
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               journal.compact();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+
+      tCompact.start();
+      
+
+      reusableLatchDone.waitCompletion();
+      
+      addTx(consumerTX, firstID);
+      
       addTx(appendTX, addedRecord);
-      updateTx(appendTX, addedRecord);
+
       commit(appendTX);
+
       updateTx(consumerTX, addedRecord);
+      
       commit(consumerTX);
+      
       delete(addedRecord);
       
       reusableLatchWait.down();
@@ -656,6 +941,8 @@
 
       journal.compact();
       
+      System.out.println("Debug after compact\n" + journal.debug());
+      
       stopJournal();
       createJournal();
       startJournal();



More information about the hornetq-commits mailing list