[hornetq-commits] JBoss hornetq SVN: r9488 - in trunk: src/main/org/hornetq/core/journal/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 30 11:00:28 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-07-30 11:00:26 -0400 (Fri, 30 Jul 2010)
New Revision: 9488

Modified:
   trunk/src/main/org/hornetq/core/journal/TestableJournal.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
   trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
   trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 Changes after stress tests, making sure about the integrity of the journal

Modified: trunk/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TestableJournal.java	2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/TestableJournal.java	2010-07-30 15:00:26 UTC (rev 9488)
@@ -54,6 +54,11 @@
    boolean isAutoReclaim();
 
    void compact() throws Exception;
+   
+   void cleanUp(final JournalFile file) throws Exception;
+   
+   JournalFile getCurrentFile();
+   
 
    /** This method is called automatically when a new file is opened.
     * @return true if it needs to re-check due to cleanup or other factors  */

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java	2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java	2010-07-30 15:00:26 UTC (rev 9488)
@@ -176,20 +176,6 @@
       }
    }
 
-   /**
-    * Read files that depend on this file.
-    * Commits and rollbacks are also counted as negatives. We need to fix those also.
-    * @param dependencies
-    */
-   public void fixDependencies(final JournalFile originalFile, final ArrayList<JournalFile> dependencies) throws Exception
-   {
-      for (JournalFile dependency : dependencies)
-      {
-         fixDependency(originalFile, dependency);
-      }
-
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -220,32 +206,7 @@
    }
 
    // Private -------------------------------------------------------
-   private void fixDependency(final JournalFile originalFile, final JournalFile dependency) throws Exception
-   {
-      JournalReaderCallback txfix = new JournalReaderCallbackAbstract()
-      {
-         @Override
-         public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
-         {
-            if (transactionCounter.containsKey(transactionID))
-            {
-               dependency.incNegCount(originalFile);
-            }
-         }
 
-         @Override
-         public void onReadRollbackRecord(final long transactionID) throws Exception
-         {
-            if (transactionCounter.containsKey(transactionID))
-            {
-               dependency.incNegCount(originalFile);
-            }
-         }
-      };
-
-      JournalImpl.readJournalFile(fileFactory, dependency, txfix);
-   }
-
    // Inner classes -------------------------------------------------
 
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java	2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java	2010-07-30 15:00:26 UTC (rev 9488)
@@ -45,6 +45,9 @@
    void decSize(int bytes);
 
    int getLiveSize();
+   
+   /** The total number of deletes this file has */
+   int getTotalNegativeToOthers();
 
    void setCanReclaim(boolean canDelete);
 

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java	2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java	2010-07-30 15:00:26 UTC (rev 9488)
@@ -49,6 +49,9 @@
 
    private boolean needCleanup;
 
+   private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
+   
+
    private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
 
    public JournalFileImpl(final SequentialFile file, final long fileID)
@@ -65,6 +68,7 @@
       negCounts.clear();
       posCount.set(0);
       liveBytes.set(0);
+      totalNegativeToOthers.set(0);
    }
 
    public int getPosCount()
@@ -94,6 +98,10 @@
 
    public void incNegCount(final JournalFile file)
    {
+      if (file != this)
+      {
+         totalNegativeToOthers.incrementAndGet();
+      }
       getOrCreateNegCount(file).incrementAndGet();
    }
 
@@ -219,5 +227,12 @@
    {
       return liveBytes.get();
    }
+   
+   public int getTotalNegativeToOthers()
+   {
+      return totalNegativeToOthers.get();
+   }
+   
 
+   
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-07-30 15:00:26 UTC (rev 9488)
@@ -464,59 +464,70 @@
                             "  sequence = " +
                             file.getFileID());
 
-         JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+         listJournalFile(out, fileFactory, file);
+      }
+   }
+
+   /**
+    * @param out
+    * @param fileFactory
+    * @param file
+    * @throws Exception
+    */
+   public static void listJournalFile(final PrintStream out, SequentialFileFactory fileFactory, JournalFile file) throws Exception
+   {
+      JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+      {
+
+         public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
          {
+            out.println("ReadUpdateTX, txID=" + transactionID + ", " + recordInfo);
+         }
 
-            public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
-            {
-               out.println("ReadUpdateTX, txID=" + transactionID + ", " + recordInfo);
-            }
+         public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+         {
+            out.println("ReadUpdate  " + recordInfo);
+         }
 
-            public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
-            {
-               out.println("ReadUpdate  " + recordInfo);
-            }
+         public void onReadRollbackRecord(long transactionID) throws Exception
+         {
+            out.println("Rollback txID=" + transactionID);
+         }
 
-            public void onReadRollbackRecord(long transactionID) throws Exception
-            {
-               out.println("Rollback txID=" + transactionID);
-            }
+         public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+         {
+            out.println("Prepare txID=" + transactionID + ", numberOfRecords=" + numberOfRecords);
+         }
 
-            public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
-            {
-               out.println("Prepare txID=" + transactionID);
-            }
+         public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+         {
+            out.println("DeleteRecordTX txID=" + transactionID + ", " + recordInfo);
+         }
 
-            public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
-            {
-               out.println("DeleteRecordTX txID=" + transactionID + ", " + recordInfo);
-            }
+         public void onReadDeleteRecord(long recordID) throws Exception
+         {
+            out.println("DeleteRecord id=" + recordID);
+         }
 
-            public void onReadDeleteRecord(long recordID) throws Exception
-            {
-               out.println("DeleteRecord id=" + recordID);
-            }
+         public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+         {
+            out.println("CommitRecord txID=" + transactionID + ", numberOfRecords=" + numberOfRecords);
+         }
 
-            public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
-            {
-               out.println("CommitRecord txID=" + transactionID);
-            }
+         public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+         {
+            out.println("AddRecordTX, txID=" + transactionID + ", " + recordInfo);
+         }
 
-            public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
-            {
-               out.println("AddRecordTX, txID=" + transactionID + ", " + recordInfo);
-            }
+         public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+         {
+            out.println("AddRecord " + recordInfo);
+         }
 
-            public void onReadAddRecord(RecordInfo recordInfo) throws Exception
-            {
-               out.println("AddRecord " + recordInfo);
-            }
-
-            public void markAsDataFile(JournalFile file)
-            {
-            }
-         });
-      }
+         public void markAsDataFile(JournalFile file)
+         {
+         }
+      });
    }
 
 
@@ -1621,6 +1632,7 @@
          {
             JournalImpl.trace("Starting compacting operation on journal");
          }
+         JournalImpl.log.debug("Starting compacting operation on journal");
 
          // We need to guarantee that the journal is frozen for this short time
          // We don't freeze the journal as we compact, only for the short time where we replace records
@@ -1765,7 +1777,7 @@
 
          if (trace)
          {
-            JournalImpl.trace("Finished compacting on journal");
+            JournalImpl.log.debug("Finished compacting on journal");
          }
 
       }
@@ -2305,7 +2317,7 @@
 
          if (compactMinFiles > 0)
          {
-            if (nCleanup > getMinCompact())
+            if (nCleanup > 0 && needsCompact())
             {
                for (JournalFile file : dataFiles)
                {
@@ -2357,16 +2369,9 @@
       return false;
    }
 
-   /**
-    * @return
-    */
-   private float getMinCompact()
+   // This method is public for tests
+   public synchronized void cleanUp(final JournalFile file) throws Exception
    {
-      return compactMinFiles * compactPercentage;
-   }
-
-   private synchronized void cleanUp(final JournalFile file) throws Exception
-   {
       if (state != JournalImpl.STATE_LOADED)
       {
          return;
@@ -2388,7 +2393,7 @@
                JournalImpl.trace("Cleaning up file " + file);
             }
             JournalImpl.log.debug("Cleaning up file " + file);
-
+            
             if (file.getPosCount() == 0)
             {
                // nothing to be done
@@ -2408,6 +2413,10 @@
                   jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
                }
             }
+            
+            currentFile.resetNegCount(file);
+            currentFile.incPosCount();
+            dependencies.add(currentFile);
 
             cleaner = new JournalCleaner(fileFactory, this, records.keySet(), file.getFileID());
          }
@@ -2420,7 +2429,10 @@
 
          cleaner.flush();
 
-         cleaner.fixDependencies(file, dependencies);
+         // pointcut for tests
+         // We need to test concurrent updates on the journal, as the compacting is being performed.
+         // Usually tests will use this to hold the compacting while other structures are being updated.
+         onCompactDone();
 
          for (JournalFile jrnfile : dependencies)
          {
@@ -2437,15 +2449,35 @@
          file.getFile().delete();
          tmpFile.renameTo(cleanedFileName);
          controlFile.delete();
+         
       }
       finally
       {
          compactingLock.readLock().unlock();
-         JournalImpl.log.debug("Clean up on file " + file + " done");
+         JournalImpl.log.info("Clean up on file " + file + " done");
       }
 
    }
+   
+   private boolean needsCompact() throws Exception
+   {
+      JournalFile[] dataFiles = getDataFiles();
 
+      long totalLiveSize = 0;
+
+      for (JournalFile file : dataFiles)
+      {
+         totalLiveSize += file.getLiveSize();
+      }
+
+      long totalBytes = (long)dataFiles.length * (long)fileSize;
+
+      long compactMargin = (long)(totalBytes * compactPercentage);
+
+      return (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+
+   }
+
    private void checkCompact() throws Exception
    {
       if (compactMinFiles == 0)
@@ -2459,21 +2491,8 @@
          return;
       }
 
-      JournalFile[] dataFiles = getDataFiles();
-
-      long totalLiveSize = 0;
-
-      for (JournalFile file : dataFiles)
+      if (needsCompact())
       {
-         totalLiveSize += file.getLiveSize();
-      }
-
-      long totalBytes = (long)dataFiles.length * (long)fileSize;
-
-      long compactMargin = (long)(totalBytes * compactPercentage);
-
-      if (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles)
-      {
          if (!compactorRunning.compareAndSet(false, true))
          {
             return;

Modified: trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java	2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java	2010-07-30 15:00:26 UTC (rev 9488)
@@ -63,20 +63,20 @@
          {
             Reclaimer.trace("posCount on " + currentFile + " = " + posCount);
          }
-
+         
          for (int j = i; j < files.length; j++)
          {
             if (Reclaimer.trace)
             {
                if (files[j].getNegCount(currentFile) != 0)
                {
-                  Reclaimer.trace("Negative from " + files[j] + " = " + files[j].getNegCount(currentFile));
+                  Reclaimer.trace("Negative from " + files[j] + " into " + currentFile + " = " + files[j].getNegCount(currentFile));
                }
             }
 
             totNeg += files[j].getNegCount(currentFile);
          }
-
+         
          currentFile.setCanReclaim(true);
 
          if (posCount <= totNeg)
@@ -101,8 +101,18 @@
                      {
                         Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
                      }
+                     file.setNeedCleanup(true);
 
-                     file.setNeedCleanup(true);
+                     if (file.getTotalNegativeToOthers() == 0)
+                     {
+                        file.setNeedCleanup(true);
+                     }
+                     else
+                     {
+                        // This file can't be cleared as the file has negatives to other files as well
+                        file.setNeedCleanup(false);
+                     }
+
                      currentFile.setCanReclaim(false);
 
                      break;

Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-07-30 15:00:26 UTC (rev 9488)
@@ -950,7 +950,108 @@
 
    }
 
+   
 
+   public void testDeleteWhileCleanup() throws Exception
+   {
+
+      setup(2, 60 * 1024, false);
+
+
+      final ReusableLatch reusableLatchDone = new ReusableLatch();
+      reusableLatchDone.countUp();
+      final ReusableLatch reusableLatchWait = new ReusableLatch();
+      reusableLatchWait.countUp();
+
+      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+      {
+
+         @Override
+         public void onCompactDone()
+         {
+            reusableLatchDone.countDown();
+            System.out.println("Waiting on Compact");
+            try
+            {
+               reusableLatchWait.await();
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            System.out.println("Done");
+         }
+      };
+
+      journal.setAutoReclaim(false);
+
+      startJournal();
+      load();
+
+      
+      Thread tCompact = new Thread()
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               journal.cleanUp(journal.getDataFiles()[0]);
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      for (int i = 0 ; i < 100; i++)
+      {
+         add(i);
+      }
+      
+      journal.forceMoveNextFile();
+      
+      
+      for (int i = 10; i < 90; i++)
+      {
+         delete(i);
+      }
+
+      tCompact.start();
+
+      reusableLatchDone.await();
+
+      // Delete part of the live records while cleanup still working
+      for (int i = 1; i < 5; i++)
+      {
+         delete(i);
+      }
+      
+      reusableLatchWait.countDown();
+      
+      tCompact.join();
+
+      // Delete part of the live records after cleanup is done
+      for (int i = 5; i < 10; i++)
+      {
+         delete(i);
+      }
+      
+      assertEquals(9, journal.getCurrentFile().getNegCount(journal.getDataFiles()[0]));
+
+      journal.forceMoveNextFile();
+      
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
+
+
+
    public void testCompactAddAndUpdateFollowedByADelete5() throws Exception
    {
 

Modified: trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java	2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java	2010-07-30 15:00:26 UTC (rev 9488)
@@ -15,6 +15,7 @@
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -24,7 +25,10 @@
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
@@ -49,9 +53,15 @@
    public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
 
    private volatile boolean running;
-   
+
    private AtomicInteger errors = new AtomicInteger(0);
 
+   private AtomicInteger numberOfRecords = new AtomicInteger(0);
+
+   private AtomicInteger numberOfUpdates = new AtomicInteger(0);
+
+   private AtomicInteger numberOfDeletes = new AtomicInteger(0);
+
    private JournalImpl journal;
 
    ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" + System.identityHashCode(this),
@@ -68,7 +78,7 @@
       super.setUp();
 
       errors.set(0);
-      
+
       File dir = new File(getTemporaryDir());
       dir.mkdirs();
 
@@ -87,8 +97,8 @@
       }
 
       journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
-                                100,
-                                ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES,
+                                10,
+                                15,
                                 ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
                                 factory,
                                 "hornetq-data",
@@ -103,11 +113,22 @@
    @Override
    public void tearDown() throws Exception
    {
-      journal.stop();
+      try
+      {
+         if (journal.isStarted())
+         {
+            journal.stop();
+         }
+      }
+      catch (Exception e)
+      {
+         // don't care :-)
+      }
    }
 
    public void testAppend() throws Exception
    {
+
       running = true;
       SlowAppenderNoTX t1 = new SlowAppenderNoTX();
 
@@ -124,15 +145,28 @@
 
       t1.start();
 
+      Thread.sleep(1000);
+
       for (int i = 0; i < NTHREADS; i++)
       {
          appenders[i].start();
          updaters[i].start();
       }
 
-      // TODO: parametrize this somehow
-      Thread.sleep(TimeUnit.HOURS.toMillis(24));
+      long timeToEnd = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10);
 
+      while (System.currentTimeMillis() < timeToEnd)
+      {
+         System.out.println("Append = " + numberOfRecords +
+                            ", Update = " +
+                            numberOfUpdates +
+                            ", Delete = " +
+                            numberOfDeletes +
+                            ", liveRecords = " +
+                            (numberOfRecords.get() - numberOfDeletes.get()));
+         Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+      }
+
       running = false;
 
       for (Thread t : appenders)
@@ -146,14 +180,39 @@
       }
 
       t1.join();
-      
+
       assertEquals(0, errors.get());
-      
+
       journal.stop();
-      
+
       journal.start();
-      
-      journal.loadInternalOnly();
+
+      ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+      ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+      journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
+      {
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+         }
+      });
+
+      long appends = 0, updates = 0;
+
+      for (RecordInfo record : committedRecords)
+      {
+         if (record.isUpdate)
+         {
+            updates++;
+         }
+         else
+         {
+            appends++;
+         }
+      }
+
+      assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
+
+      journal.stop();
    }
 
    private byte[] generateRecord()
@@ -180,16 +239,16 @@
 
             while (running)
             {
-               int txSize = RandomUtil.randomMax(1000);
+               final int txSize = RandomUtil.randomMax(100);
 
                long txID = JournalSoakTest.idGen.generateID();
 
-               final ArrayList<Long> ids = new ArrayList<Long>();
+               final long ids[] = new long[txSize];
 
                for (int i = 0; i < txSize; i++)
                {
                   long id = JournalSoakTest.idGen.generateID();
-                  ids.add(id);
+                  ids[i] = id;
                   journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
                   Thread.sleep(1);
                }
@@ -203,6 +262,7 @@
 
                   public void done()
                   {
+                     numberOfRecords.addAndGet(txSize);
                      for (Long id : ids)
                      {
                         queue.add(id);
@@ -236,7 +296,7 @@
       {
          try
          {
-            int txSize = RandomUtil.randomMax(1000);
+            int txSize = RandomUtil.randomMax(100);
             int txCount = 0;
             long ids[] = new long[txSize];
 
@@ -248,13 +308,12 @@
                long id = queue.poll(60, TimeUnit.MINUTES);
                ids[txCount] = id;
                journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
-               Thread.sleep(1);
                if (++txCount == txSize)
                {
                   journal.appendCommitRecord(txID, true, ctx);
                   ctx.executeOnCompletion(new DeleteTask(ids));
                   txCount = 0;
-                  txSize = RandomUtil.randomMax(1000);
+                  txSize = RandomUtil.randomMax(100);
                   txID = JournalSoakTest.idGen.generateID();
                   ids = new long[txSize];
                }
@@ -280,11 +339,13 @@
 
       public void done()
       {
+         numberOfUpdates.addAndGet(ids.length);
          try
          {
             for (long id : ids)
             {
-               journal.appendDeleteRecord(id, true);
+               journal.appendDeleteRecord(id, false);
+               numberOfDeletes.incrementAndGet();
             }
          }
          catch (Exception e)
@@ -314,25 +375,23 @@
          {
             while (running)
             {
-               long ids[] = new long[1000];
+               long ids[] = new long[5];
                // Append
-               for (int i = 0; running & i < 1000; i++)
+               for (int i = 0; running & i < ids.length; i++)
                {
+                  System.out.println("append slow");
                   ids[i] = JournalSoakTest.idGen.generateID();
                   journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
-                  Thread.sleep(10);
+                  numberOfRecords.incrementAndGet();
+
+                  Thread.sleep(TimeUnit.SECONDS.toMillis(50));
                }
-               // Update
-               for (int i = 0; running & i < 1000; i++)
-               {
-                  journal.appendUpdateRecord(ids[i], (byte)1, generateRecord(), true);
-                  Thread.sleep(10);
-               }
                // Delete
-               for (int i = 0; running & i < 1000; i++)
+               for (int i = 0; running & i < ids.length; i++)
                {
-                  journal.appendDeleteRecord(ids[i], true);
-                  Thread.sleep(10);
+                  System.out.println("Deleting");
+                  journal.appendDeleteRecord(ids[i], false);
+                  numberOfDeletes.incrementAndGet();
                }
             }
          }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java	2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java	2010-07-30 15:00:26 UTC (rev 9488)
@@ -711,6 +711,8 @@
    private void setupPosNeg(final int fileNumber, final int pos, final int... neg)
    {
       JournalFile file = files[fileNumber];
+      
+      int totalDep = file.getTotalNegativeToOthers();
 
       for (int i = 0; i < pos; i++)
       {
@@ -724,8 +726,11 @@
          for (int j = 0; j < neg[i]; j++)
          {
             file.incNegCount(reclaimable2);
+            totalDep++;
          }
       }
+      
+      assertEquals(totalDep, file.getTotalNegativeToOthers());
    }
 
    private void debugFiles()
@@ -777,6 +782,8 @@
       private boolean canDelete;
 
       private boolean needCleanup;
+      
+      private int totalDep;
 
       public void extendOffset(final int delta)
       {
@@ -822,6 +829,8 @@
          int c = count == null ? 1 : count.intValue() + 1;
 
          negCounts.put(file, c);
+         
+         totalDep++;
       }
 
       public int getPosCount()
@@ -985,5 +994,13 @@
       {
          return 0;
       }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.impl.JournalFile#getTotalNegativeToOthers()
+       */
+      public int getTotalNegativeToOthers()
+      {
+         return totalDep;
+      }
    }
 }



More information about the hornetq-commits mailing list