[hornetq-commits] JBoss hornetq SVN: r10374 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/journal/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Mar 26 17:27:29 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-26 17:27:28 -0400 (Sat, 26 Mar 2011)
New Revision: 10374

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/TestableJournal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
Log:
https://issues.jboss.org/browse/JBPAPP-6198 - fixing compactor tests

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/TestableJournal.java	2011-03-26 01:03:41 UTC (rev 10373)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/TestableJournal.java	2011-03-26 21:27:28 UTC (rev 10374)
@@ -53,7 +53,7 @@
 
    boolean isAutoReclaim();
 
-   void compact() throws Exception;
+   void testCompact() throws Exception;
    
    JournalFile getCurrentFile();
    

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-03-26 01:03:41 UTC (rev 10373)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-03-26 21:27:28 UTC (rev 10374)
@@ -79,7 +79,9 @@
 
    private final int userVersion;
 
-   private Executor filesExecutor;
+   private Executor openFilesExecutor;
+   
+   private Executor closeFilesExecutor;
 
    // Static --------------------------------------------------------
 
@@ -104,9 +106,10 @@
 
    // Public --------------------------------------------------------
 
-   public void setExecutor(final Executor executor)
+   public void setExecutor(final Executor fileExecutor, final Executor closeExecutor)
    {
-      filesExecutor = executor;
+      this.openFilesExecutor = fileExecutor;
+      this.closeFilesExecutor = closeExecutor;
    }
 
    public void clear()
@@ -358,13 +361,13 @@
          }
       };
 
-      if (filesExecutor == null)
+      if (openFilesExecutor == null)
       {
          run.run();
       }
       else
       {
-         filesExecutor.execute(run);
+         openFilesExecutor.execute(run);
       }
 
       JournalFile nextFile = null;
@@ -417,13 +420,15 @@
          }
       };
 
-      if (filesExecutor == null)
+      // We can't close files while the compactor is running
+      // as we may be closing files that are being read by the compactor
+      if (closeFilesExecutor == null)
       {
          run.run();
       }
       else
       {
-         filesExecutor.execute(run);
+         closeFilesExecutor.execute(run);
       }
 
    }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-03-26 01:03:41 UTC (rev 10373)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-03-26 21:27:28 UTC (rev 10374)
@@ -32,6 +32,7 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -1551,13 +1552,67 @@
 
       return info;
    }
+   
+   
+   public void testCompact() throws Exception
+   {
+      final AtomicInteger errors = new AtomicInteger(0);
+      
+      final CountDownLatch latch = new CountDownLatch(1);
+      
+      compactorRunning.set(true);
+      
+      // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+      // operations (that will use the executor)
+      compactorExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
 
+            try
+            {
+               JournalImpl.this.compact();
+            }
+            catch (Throwable e)
+            {
+               errors.incrementAndGet();
+               JournalImpl.log.error(e.getMessage(), e);
+               e.printStackTrace();
+            }
+            finally
+            {
+               latch.countDown();
+            }
+         }
+      });
+      
+      try
+      {
+         if (!latch.await(60, TimeUnit.SECONDS))
+         {
+            throw new RuntimeException("Didn't finish compact timely");
+         }
+         
+         if (errors.get() > 0)
+         {
+            throw new RuntimeException("Error during testCompact, look at the logs");
+         }
+      }
+      finally
+      {
+         compactorRunning.set(false);
+      }
+   }
+
    /**
     * 
     *  Note: This method can't be called from the main executor, as it will invoke other methods depending on it.
     *  
+    *  Note: only synchronized methods on journal are methods responsible for the life-cycle such as stop, start
+    *        records will still come as this is being executed
+    *  
     */
-   public synchronized void compact() throws Exception
+   protected synchronized void compact() throws Exception
    {
 
       if (compactor != null)
@@ -2489,7 +2544,7 @@
          }
       });
 
-      filesRepository.setExecutor(filesExecutor);
+      filesRepository.setExecutor(filesExecutor, compactorExecutor);
 
       fileFactory.start();
 
@@ -2521,7 +2576,7 @@
 
          filesExecutor.shutdown();
 
-         filesRepository.setExecutor(null);
+         filesRepository.setExecutor(null, null);
 
          if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
          {
@@ -2892,7 +2947,7 @@
             callback = null;
          }
 
-         if (sync)
+         if (sync && !compactorRunning.get())
          {
             // In an edge case the transaction could still have pending data from previous files.
             // This shouldn't cause any blocking issues, as this is here to guarantee we cover all possibilities
@@ -2956,7 +3011,7 @@
 
       if (autoReclaim && !compactorRunning.get())
       {
-         filesExecutor.execute(new Runnable()
+         compactorExecutor.execute(new Runnable()
          {
             public void run()
             {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2011-03-26 01:03:41 UTC (rev 10373)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2011-03-26 21:27:28 UTC (rev 10374)
@@ -174,10 +174,14 @@
       return read(bytes, null);
    }
 
-   public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
+   public synchronized int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
    {
       try
       {
+         if (channel == null)
+         {
+            throw new Exception("File " + this.getFileName() + " has a null channel");
+         }
          int bytesRead = channel.read(bytes);
 
          if (callback != null)

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2011-03-26 01:03:41 UTC (rev 10373)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2011-03-26 21:27:28 UTC (rev 10374)
@@ -131,6 +131,19 @@
       Assert.assertFalse(iterNewFiles.hasNext());
 
    }
+   
+//   public void testRepeat() throws Exception
+//   {
+//      int i = 0 ;
+//      
+//      while (true)
+//      {
+//         System.out.println("#test (" + (i++) + ")");
+//         testCrashRenamingFiles();
+//         tearDown();
+//         setUp();
+//      }
+//   }
 
    public void testCrashRenamingFiles() throws Exception
    {
@@ -229,7 +242,7 @@
          journal.appendDeleteRecord(i, true);
       }
 
-      journal.compact();
+      journal.testCompact();
 
       journal.stop();
 
@@ -277,7 +290,7 @@
 
       finishCompact();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
 
@@ -316,7 +329,7 @@
 
       finishCompact();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
 
@@ -347,7 +360,7 @@
 
       finishCompact();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
 
@@ -367,6 +380,8 @@
 
       startJournal();
 
+      journal.setAutoReclaim(false);
+
       load();
 
       add(1);
@@ -375,7 +390,7 @@
 
       rollback(2);
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
 
@@ -408,7 +423,7 @@
 
       journal.forceMoveNextFile();
 
-      journal.compact();
+      journal.testCompact();
 
       add(10);
 
@@ -437,9 +452,9 @@
 
       updateTx(2, 1);
 
-      journal.compact();
+      journal.testCompact();
 
-      journal.compact();
+      journal.testCompact();
 
       commit(2);
 
@@ -610,7 +625,7 @@
          {
             try
             {
-               journal.compact();
+               journal.testCompact();
             }
             catch (Exception e)
             {
@@ -769,7 +784,7 @@
 
       if (createControlFile && deleteControlFile && renameFilesAfterCompacting)
       {
-         journal.compact();
+         journal.testCompact();
       }
 
       stopJournal();
@@ -822,7 +837,7 @@
       add(newRecord);
       update(newRecord);
 
-      journal.compact();
+      journal.testCompact();
 
       System.out.println("Debug after compact\n" + journal.debug());
 
@@ -874,7 +889,7 @@
 
       journal.forceMoveNextFile();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -915,7 +930,7 @@
 
       finishCompact();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -966,7 +981,7 @@
       add(newRecord);
       update(newRecord);
 
-      journal.compact();
+      journal.testCompact();
 
       System.out.println("Debug after compact\n" + journal.debug());
 
@@ -1025,7 +1040,7 @@
 
       journal.forceMoveNextFile();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -1117,7 +1132,7 @@
 
       finishCompact();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -1201,11 +1216,11 @@
       System.out.println(journal.debug());
       System.out.println("*****************************************");
 
-      journal.compact();
+      journal.testCompact();
 
       add(idGenerator.generateID());
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -1326,11 +1341,11 @@
       journal.forceMoveNextFile();
 
       // This operation used to be journal.cleanup(journal.getDataFiles()[0]); when cleanup was still in place
-      journal.compact();
+      journal.testCompact();
 
       journal.checkReclaimStatus();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -1384,7 +1399,7 @@
 
       ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2, this.fileSize, "/tmp/out4.dmp");
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -1432,7 +1447,7 @@
 
       journal.checkReclaimStatus();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -1492,7 +1507,7 @@
 
       journal.checkReclaimStatus();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -1552,7 +1567,7 @@
 
       journal.checkReclaimStatus();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -1619,7 +1634,7 @@
 
       journal.forceMoveNextFile();
 
-      journal.compact();
+      journal.testCompact();
 
       stopJournal();
       createJournal();
@@ -1829,7 +1844,7 @@
                {
                   Thread.sleep(500);
                   System.out.println("Compacting");
-                  ((JournalImpl)storage.getMessageJournal()).compact();
+                  ((JournalImpl)storage.getMessageJournal()).testCompact();
                   ((JournalImpl)storage.getMessageJournal()).checkReclaimStatus();
                }
             }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java	2011-03-26 01:03:41 UTC (rev 10373)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java	2011-03-26 21:27:28 UTC (rev 10374)
@@ -200,7 +200,7 @@
       }
       if (secondCompactAt == currentOperation)
       {
-         journal.compact();
+         journal.testCompact();
       }
 
       currentOperation++;
@@ -236,7 +236,7 @@
          {
             try
             {
-               journal.compact();
+               journal.testCompact();
             }
             catch (Exception e)
             {

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2011-03-26 01:03:41 UTC (rev 10373)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2011-03-26 21:27:28 UTC (rev 10374)
@@ -185,7 +185,7 @@
          {
             try
             {
-               journal.compact();
+               journal.testCompact();
             }
             catch (Throwable e)
             {



More information about the hornetq-commits mailing list