[hornetq-commits] JBoss hornetq SVN: r9552 - in trunk: src/main/org/hornetq/core/journal/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Aug 16 19:52:29 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-16 19:52:29 -0400 (Mon, 16 Aug 2010)
New Revision: 9552

Modified:
   trunk/src/main/org/hornetq/core/journal/TestableJournal.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - another fix

Modified: trunk/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TestableJournal.java	2010-08-16 14:31:38 UTC (rev 9551)
+++ trunk/src/main/org/hornetq/core/journal/TestableJournal.java	2010-08-16 23:52:29 UTC (rev 9552)
@@ -49,6 +49,8 @@
 
    void forceMoveNextFile() throws Exception;
 
+   void forceMoveNextFile(boolean synchronous) throws Exception;
+
    void setAutoReclaim(boolean autoReclaim);
 
    boolean isAutoReclaim();

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-16 14:31:38 UTC (rev 9551)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-16 23:52:29 UTC (rev 9552)
@@ -23,8 +23,6 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,6 +33,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -105,7 +104,7 @@
    // Journal
    private static final void trace(final String message)
    {
-      log.trace(message);
+      log.info(message);
    }
 
    // The sizes of primitive types
@@ -191,7 +190,7 @@
 
    private final LinkedBlockingDeque<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
 
-   private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+   private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
 
    private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
 
@@ -651,11 +650,13 @@
             // checkSize by some sort of calculated hash)
             if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
             {
-              JournalImpl.trace("Record at position " + pos +
+               JournalImpl.trace("Record at position " + pos +
                                  " recordType = " +
                                  recordType +
-                                 " possible transactionID = " + transactionID + 
-                                 " possible recordID = " + recordID +
+                                 " possible transactionID = " +
+                                 transactionID +
+                                 " possible recordID = " +
+                                 recordID +
                                  " file:" +
                                  file.getFile().getFileName() +
                                  " is corrupted and it is being ignored (III)");
@@ -763,7 +764,7 @@
       catch (Throwable e)
       {
          log.warn(e.getMessage(), e);
-         throw new Exception (e.getMessage(), e);
+         throw new Exception(e.getMessage(), e);
       }
       finally
       {
@@ -828,7 +829,11 @@
       {
          if (JournalImpl.LOAD_TRACE)
          {
-            JournalImpl.trace("appendAddRecord id = " + id + ", recordType = " + recordType + " compacting = " + (compactor != null));
+            JournalImpl.trace("appendAddRecord id = " + id +
+                              ", recordType = " +
+                              recordType +
+                              " compacting = " +
+                              (compactor != null));
          }
 
          JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
@@ -969,12 +974,11 @@
       }
 
       compactingLock.readLock().lock();
-
       try
       {
 
          JournalRecord record = null;
-         
+
          if (compactor == null)
          {
             record = records.remove(id);
@@ -1049,12 +1053,13 @@
       {
          if (JournalImpl.LOAD_TRACE)
          {
-             JournalImpl.trace("appendAddRecordTransactional txID " + txID +
+            JournalImpl.trace("appendAddRecordTransactional txID " + txID +
                               ", id = " +
                               id +
                               ", recordType = " +
                               recordType +
-                              ", compacting " + (this.compactor != null));
+                              ", compacting " +
+                              (this.compactor != null));
          }
 
          JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
@@ -1107,7 +1112,9 @@
                               ", id = " +
                               id +
                               ", recordType = " +
-                              recordType + ", compacting = " + (compactor != null));
+                              recordType +
+                              ", compacting = " +
+                              (compactor != null));
          }
 
          JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
@@ -1462,7 +1469,7 @@
 
          private void checkDeleteSize()
          {
-            // HORNETQ-482 - Flush deletes only if memory is critical 
+            // HORNETQ-482 - Flush deletes only if memory is critical
             if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() < (runtime.maxMemory() * 0.2)))
             {
                log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
@@ -1480,7 +1487,7 @@
                }
 
                recordsToDelete.clear();
-               
+
                log.debug("flush delete done");
             }
          }
@@ -1568,6 +1575,8 @@
                return;
             }
 
+            onCompactLock();
+
             setAutoReclaim(false);
 
             // We need to move to the next file, as we need a clear start for negatives and positives counts
@@ -1612,7 +1621,15 @@
          // well
          for (final JournalFile file : dataFilesToProcess)
          {
-            JournalImpl.readJournalFile(fileFactory, file, compactor);
+            try
+            {
+               JournalImpl.readJournalFile(fileFactory, file, compactor);
+            }
+            catch (Throwable e)
+            {
+               log.warn("Error on reading compacting for "  + file);
+               throw new Exception("Error on reading compacting for "  + file, e);
+            }
          }
 
          compactor.flush();
@@ -1634,6 +1651,8 @@
             // Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
             compactor = null;
 
+            onCompactLock();
+
             newDatafiles = localCompactor.getNewDataFiles();
 
             // Restore newRecords created during compacting
@@ -1664,7 +1683,7 @@
             {
                newTransaction.replaceRecordProvider(this);
             }
-            
+
             localCompactor.replayPendingCommands();
 
             // Merge transactions back after compacting
@@ -2300,13 +2319,11 @@
          return;
       }
 
-      compactingLock.readLock().lock();
-
       try
       {
          JournalCleaner cleaner = null;
          ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
-         lockAppend.lock();
+         compactingLock.writeLock().lock();
 
          try
          {
@@ -2345,11 +2362,21 @@
          }
          finally
          {
-            lockAppend.unlock();
+            compactingLock.writeLock().unlock();
          }
 
-         JournalImpl.readJournalFile(fileFactory, file, cleaner);
+         compactingLock.readLock().lock();
 
+         try
+         {
+            JournalImpl.readJournalFile(fileFactory, file, cleaner);
+         }
+         catch (Throwable e)
+         {
+            log.warn("Error reading cleanup on " + file, e);
+            throw new Exception("Error reading cleanup on " + file, e);
+         }
+
          cleaner.flush();
 
          // pointcut for tests
@@ -2379,6 +2406,11 @@
          controlFile.delete();
 
          final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
+ 
+         if (trace)
+         {
+             trace("Adding free file back from cleanup" + retJournalfile);
+         }
 
          filesExecutor.execute(new Runnable()
          {
@@ -2616,14 +2648,20 @@
    // In some tests we need to force the journal to move to a next file
    public void forceMoveNextFile() throws Exception
    {
+      forceMoveNextFile(true);
+   }
+
+   // In some tests we need to force the journal to move to a next file
+   public void forceMoveNextFile(final boolean synchronous) throws Exception
+   {
       compactingLock.readLock().lock();
       try
       {
          lockAppend.lock();
          try
          {
-            moveNextFile(true);
-            if (autoReclaim)
+            moveNextFile(synchronous);
+            if (autoReclaim && !synchronous)
             {
                checkReclaimStatus();
             }
@@ -2660,10 +2698,24 @@
          throw new IllegalStateException("Journal is not stopped");
       }
 
-      filesExecutor = Executors.newSingleThreadExecutor();
+      filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
+      {
 
-      compactorExecutor = Executors.newCachedThreadPool();
+         public Thread newThread(Runnable r)
+         {
+            return new Thread(r, "JournalImpl::FilesExecutor");
+         }
+      });
 
+      compactorExecutor = Executors.newCachedThreadPool(new ThreadFactory()
+      {
+
+         public Thread newThread(Runnable r)
+         {
+            return new Thread(r, "JournalImpl::CompactorExecutor");
+         }
+      });
+
       fileFactory.start();
 
       state = JournalImpl.STATE_STARTED;
@@ -2818,6 +2870,12 @@
    {
    }
 
+   /** This is an interception point for testcases, when the compacted files are written, to be called
+    *  as soon as the compactor gets a writeLock */
+   protected void onCompactLock() throws Exception
+   {
+   }
+
    /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
    protected void onCompactDone()
    {
@@ -2843,6 +2901,11 @@
       {
          // Re-initialise it
 
+         if (trace)
+         {
+            trace("Adding free file " + file);
+         }
+
          JournalFile jf = reinitializeFile(file);
 
          if (renameTmp)
@@ -3182,6 +3245,11 @@
 
       sequentialFile.close();
 
+      if (JournalImpl.trace)
+      {
+         JournalImpl.trace("Renaming file " + tmpFileName + " as " + fileName);
+      }
+
       sequentialFile.renameTo(fileName);
 
       if (keepOpened)
@@ -3289,7 +3357,7 @@
          filesExecutor.execute(run);
       }
 
-      if (autoReclaim && !synchronous)
+      if (!synchronous)
       {
          scheduleReclaim();
       }
@@ -3320,26 +3388,28 @@
       {
          return;
       }
-
-      filesExecutor.execute(new Runnable()
+ 
+      if (autoReclaim && !compactorRunning.get())
       {
-         public void run()
+         filesExecutor.execute(new Runnable()
          {
-            try
+            public void run()
             {
-               drainClosedFiles();
-
-               if (!checkReclaimStatus())
+               try
                {
-                  checkCompact();
+                  drainClosedFiles();
+                  if (!checkReclaimStatus())
+                  {
+                     checkCompact();
+                  }
                }
+               catch (Exception e)
+               {
+                  JournalImpl.log.error(e.getMessage(), e);
+               }
             }
-            catch (Exception e)
-            {
-               JournalImpl.log.error(e.getMessage(), e);
-            }
-         }
-      });
+         });
+      }
    }
 
    /** 
@@ -3368,25 +3438,21 @@
                        final boolean tmpCompactExtension) throws Exception
    {
       JournalFile nextOpenedFile = null;
-      try
+
+      nextOpenedFile = freeFiles.poll();
+
+      if (nextOpenedFile == null)
       {
-         nextOpenedFile = freeFiles.remove();
+         nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+      }
+      else
+      {
          if (tmpCompactExtension)
          {
             SequentialFile sequentialFile = nextOpenedFile.getFile();
             sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
          }
-      }
-      catch (NoSuchElementException ignored)
-      {
-      }
 
-      if (nextOpenedFile == null)
-      {
-         nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
-      }
-      else
-      {
          if (keepOpened)
          {
             openFile(nextOpenedFile, multiAIO);

Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2010-08-16 14:31:38 UTC (rev 9551)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2010-08-16 23:52:29 UTC (rev 9552)
@@ -121,6 +121,13 @@
                                 "hq",
                                 maxAIO)
       {
+         protected void onCompactLock() throws Exception
+         {
+            System.out.println("OnCompactLock");
+            journal.forceMoveNextFile(false);
+            System.out.println("OnCompactLock done");
+         }
+      
          protected void onCompactStart() throws Exception
          {
             testExecutor.execute(new Runnable()
@@ -134,7 +141,7 @@
                      {
                         long id = idGen.generateID();
                         journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
-                        journal.forceMoveNextFile();
+                        journal.forceMoveNextFile(false);
                         journal.appendDeleteRecord(id, id == 20);
                      }
                      System.out.println("OnCompactStart leave");
@@ -176,7 +183,7 @@
 
    protected long getTotalTimeMilliseconds()
    {
-      return TimeUnit.MINUTES.toMillis(10);
+      return TimeUnit.MINUTES.toMillis(1);
    }
 
    public void testAppend() throws Exception



More information about the hornetq-commits mailing list