[hornetq-commits] JBoss hornetq SVN: r9556 - in trunk: src/main/org/hornetq/utils and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Aug 17 22:45:13 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-17 22:45:12 -0400 (Tue, 17 Aug 2010)
New Revision: 9556

Added:
   trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java
Modified:
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/utils/ReusableLatch.java
   trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
Log:
Adding a few more tests on compacting & minor changes just making sure about everything is all right

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-18 02:45:12 UTC (rev 9556)
@@ -1570,8 +1570,8 @@
             }
             catch (Throwable e)
             {
-               log.warn("Error on reading compacting for "  + file);
-               throw new Exception("Error on reading compacting for "  + file, e);
+               log.warn("Error on reading compacting for " + file);
+               throw new Exception("Error on reading compacting for " + file, e);
             }
          }
 
@@ -1644,6 +1644,10 @@
                {
                   liveTransaction.merge(newTransaction);
                }
+               else
+               {
+                  log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
+               }
             }
          }
          finally
@@ -2114,6 +2118,12 @@
     */
    public boolean checkReclaimStatus() throws Exception
    {
+
+      if (compactorRunning.get())
+      {
+         return false;
+      }
+
       // We can't start reclaim while compacting is working
       compactingLock.readLock().lock();
       try
@@ -2191,6 +2201,12 @@
                      }
                      return true;
                   }
+                  else
+                  {
+                     // We only cleanup the first files
+                     // if a middle file needs cleanup it will be done through compacting
+                     break;
+                  }
                }
             }
          }
@@ -2298,10 +2314,10 @@
          controlFile.delete();
 
          final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
- 
+
          if (trace)
          {
-             trace("Adding free file back from cleanup" + retJournalfile);
+            trace("Adding free file back from cleanup" + retJournalfile);
          }
 
          filesExecutor.execute(new Runnable()
@@ -3280,7 +3296,7 @@
       {
          return;
       }
- 
+
       if (autoReclaim && !compactorRunning.get())
       {
          filesExecutor.execute(new Runnable()

Modified: trunk/src/main/org/hornetq/utils/ReusableLatch.java
===================================================================
--- trunk/src/main/org/hornetq/utils/ReusableLatch.java	2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/src/main/org/hornetq/utils/ReusableLatch.java	2010-08-18 02:45:12 UTC (rev 9556)
@@ -50,6 +50,11 @@
       {
          return getState();
       }
+      
+      public void setCount(final int count)
+      {
+         setState(count);
+      }
 
       @Override
       public int tryAcquireShared(final int numberOfAqcquires)
@@ -107,6 +112,11 @@
    {
       return control.getCount();
    }
+   
+   public void setCount(final int count)
+   {
+      control.setCount(count);
+   }
 
    public void countUp()
    {

Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2010-08-18 02:45:12 UTC (rev 9556)
@@ -237,57 +237,93 @@
    {
 
       setup(2, 60 * 1024, false);
-      
+
       createJournal();
-      
+
       startJournal();
-      
+
       load();
-      
+
       add(1);
-      
+
       updateTx(2, 1);
-      
+
       rollback(2);
+
+      journal.compact();
+
+      stopJournal();
+
+      startJournal();
+
+      loadAndCheck();
+
+      stopJournal();
+
+   }
+
+   public void testCompactSecondFileReclaimed() throws Exception
+   {
+
+      setup(2, 60 * 1024, false);
+
+      createJournal();
+
+      startJournal();
+
+      load();
+
+      addTx(1, 1, 2, 3, 4);
+
+      journal.forceMoveNextFile();
+
+      addTx(1, 5, 6, 7, 8);
       
+      commit(1);
+      
+      journal.forceMoveNextFile();
+      
       journal.compact();
       
+      add(10);
+
       stopJournal();
-      
+
       startJournal();
-      
+
       loadAndCheck();
-      
+
       stopJournal();
 
    }
 
-
    public void testIncompleteTXDuringcompact() throws Exception
    {
 
       setup(2, 60 * 1024, false);
-      
+
       createJournal();
-      
+
       startJournal();
-      
+
       load();
-      
+
       add(1);
-      
+
       updateTx(2, 1);
-      
+
       journal.compact();
-      
+
+      journal.compact();
+
       commit(2);
-      
+
       stopJournal();
-      
+
       startJournal();
-      
+
       loadAndCheck();
-      
+
       stopJournal();
 
    }
@@ -625,31 +661,7 @@
 
       SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
 
-      final ReusableLatch reusableLatchDone = new ReusableLatch();
-      reusableLatchDone.countUp();
-      final ReusableLatch reusableLatchWait = new ReusableLatch();
-      reusableLatchWait.countUp();
-
-      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
-      {
-
-         @Override
-         public void onCompactDone()
-         {
-            reusableLatchDone.countDown();
-            System.out.println("Waiting on Compact");
-            try
-            {
-               reusableLatchWait.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-            System.out.println("Done");
-         }
-      };
-
+      createJournal();
       journal.setAutoReclaim(false);
 
       startJournal();
@@ -665,26 +677,8 @@
 
       addTx(consumerTX, firstID);
 
-      Thread tCompact = new Thread()
-      {
-         @Override
-         public void run()
-         {
-            try
-            {
-               journal.compact();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      };
+      startCompact();
 
-      tCompact.start();
-
-      reusableLatchDone.await();
-
       addTx(appendTX, addedRecord);
 
       commit(appendTX);
@@ -695,10 +689,8 @@
 
       delete(addedRecord);
 
-      reusableLatchWait.countDown();
+      finishCompact();
 
-      tCompact.join();
-
       journal.forceMoveNextFile();
 
       long newRecord = idGen.generateID();
@@ -723,31 +715,8 @@
 
       SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
 
-      final ReusableLatch reusableLatchDone = new ReusableLatch();
-      reusableLatchDone.countUp();
-      final ReusableLatch reusableLatchWait = new ReusableLatch();
-      reusableLatchWait.countUp();
+      createJournal();
 
-      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
-      {
-
-         @Override
-         public void onCompactDone()
-         {
-            reusableLatchDone.countDown();
-            System.out.println("Waiting on Compact");
-            try
-            {
-               reusableLatchWait.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-            System.out.println("Done");
-         }
-      };
-
       journal.setAutoReclaim(false);
 
       startJournal();
@@ -763,26 +732,8 @@
 
       addTx(consumerTX, firstID);
 
-      Thread tCompact = new Thread()
-      {
-         @Override
-         public void run()
-         {
-            try
-            {
-               journal.compact();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      };
+      startCompact();
 
-      tCompact.start();
-
-      reusableLatchDone.await();
-
       addTx(appendTX, addedRecord);
       commit(appendTX);
       updateTx(consumerTX, addedRecord);
@@ -794,10 +745,8 @@
 
       commit(deleteTXID);
 
-      reusableLatchWait.countDown();
+      finishCompact();
 
-      tCompact.join();
-
       journal.forceMoveNextFile();
 
       journal.compact();
@@ -816,31 +765,8 @@
 
       SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
 
-      final ReusableLatch reusableLatchDone = new ReusableLatch();
-      reusableLatchDone.countUp();
-      final ReusableLatch reusableLatchWait = new ReusableLatch();
-      reusableLatchWait.countUp();
+      createJournal();
 
-      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
-      {
-
-         @Override
-         public void onCompactDone()
-         {
-            reusableLatchDone.countDown();
-            System.out.println("Waiting on Compact");
-            try
-            {
-               reusableLatchWait.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-            System.out.println("Done");
-         }
-      };
-
       journal.setAutoReclaim(false);
 
       startJournal();
@@ -856,34 +782,14 @@
 
       updateTx(consumerTX, firstID);
 
-      Thread tCompact = new Thread()
-      {
-         @Override
-         public void run()
-         {
-            try
-            {
-               journal.compact();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      };
+      startCompact();
 
-      tCompact.start();
-
-      reusableLatchDone.await();
-
       addTx(consumerTX, addedRecord);
       commit(consumerTX);
       delete(addedRecord);
 
-      reusableLatchWait.countDown();
+      finishCompact();
 
-      tCompact.join();
-
       journal.compact();
 
       stopJournal();
@@ -991,6 +897,63 @@
 
    }
 
+   public void testCompactAddAndUpdateFollowedByADelete6() throws Exception
+   {
+
+      setup(2, 60 * 1024, false);
+
+      SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+      createJournal();
+
+      journal.setAutoReclaim(false);
+
+      startJournal();
+      load();
+
+      long tx0 = idGen.generateID();
+
+      long tx1 = idGen.generateID();
+
+      long add1 = idGen.generateID();
+
+      long add2 = idGen.generateID();
+
+      startCompact();
+
+      addTx(tx0, add1);
+
+      rollback(tx0);
+
+      addTx(tx1, add1, add2);
+      commit(tx1);
+
+      finishCompact();
+
+      long tx2 = idGen.generateID();
+
+      updateTx(tx2, add1, add2);
+      commit(tx2);
+
+      delete(add1);
+
+      startCompact();
+
+      delete(add2);
+
+      finishCompact();
+
+      journal.forceMoveNextFile();
+
+      journal.compact();
+
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+
+   }
+
    public void testDeleteWhileCleanup() throws Exception
    {
 

Modified: trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java	2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java	2010-08-18 02:45:12 UTC (rev 9556)
@@ -13,16 +13,7 @@
 
 package org.hornetq.tests.stress.journal;
 
-import java.io.File;
-import java.io.FilenameFilter;
 
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
-import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.ReusableLatch;
-
 /**
  * A NIORandomCompactTest
  *
@@ -30,167 +21,9 @@
  *
  *
  */
-public class AllPossibilitiesCompactStressTest extends JournalImplTestBase
+public class AllPossibilitiesCompactStressTest extends MixupCompactorBase
 {
 
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private ReusableLatch startedCompactingLatch = null;
-
-   private ReusableLatch releaseCompactingLatch = null;
-
-   private Thread tCompact = null;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-   @Override
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-
-      tCompact = null;
-
-      startedCompactingLatch = new ReusableLatch();
-
-      releaseCompactingLatch = new ReusableLatch();
-
-      File file = new File(getTestDir());
-
-      deleteDirectory(file);
-
-      file.mkdir();
-   }
-
-   protected void tearDown() throws Exception
-   {
-
-      File testDir = new File(getTestDir());
-
-      File files[] = testDir.listFiles(new FilenameFilter()
-      {
-
-         public boolean accept(File dir, String name)
-         {
-            return name.startsWith(filePrefix) && name.endsWith(fileExtension);
-         }
-      });
-
-      for (File file : files)
-      {
-         assertEquals("File " + file + " doesn't have the expected number of bytes", fileSize, file.length());
-      }
-
-      super.tearDown();
-   }
-
-   int startCompactAt;
-
-   int joinCompactAt;
-
-   int secondCompactAt;
-
-   int currentOperation;
-
-   SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
-   
-   public void createJournal() throws Exception
-   {
-      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
-      {
-
-         @Override
-         public void onCompactDone()
-         {
-            startedCompactingLatch.countDown();
-            try
-            {
-               releaseCompactingLatch.await();
-            }
-            catch (InterruptedException e)
-            {
-               e.printStackTrace();
-            }
-         }
-      };
-
-      journal.setAutoReclaim(false);
-   }
-
-
-   public void testMixOperations() throws Exception
-   {
-
-      setup(2, 60 * 1024, false);
-      
-      startCompactAt = joinCompactAt = secondCompactAt = -1;
-      
-      currentOperation = 0;
-      internalTest();
-      int MAX_OPERATIONS = currentOperation;
-      
-      System.out.println("Using MAX_OPERATIONS = " + MAX_OPERATIONS);
-
-      for (startCompactAt = 0; startCompactAt < MAX_OPERATIONS; startCompactAt++)
-      {
-         for (joinCompactAt = startCompactAt; joinCompactAt < MAX_OPERATIONS; joinCompactAt++)
-         {
-            for (secondCompactAt = joinCompactAt; secondCompactAt < MAX_OPERATIONS; secondCompactAt++)
-            {
-               System.out.println("start=" + startCompactAt + ", join=" + joinCompactAt + ", second=" + secondCompactAt);
-               
-               currentOperation = 0;
-               try
-               {
-                  tearDown();
-                  setUp();
-                  internalTest();
-               }
-               catch (Throwable e)
-               {
-                  throw new Exception("Error at compact=" + startCompactAt +
-                                      ", joinCompactAt=" +
-                                      joinCompactAt +
-                                      ", secondCompactAt=" +
-                                      secondCompactAt, e);
-               }
-            }
-         }
-      }
-   }
-
-   protected void beforeJournalOperation() throws Exception
-   {
-      checkJournalOperation();
-   }
-
-   /**
-    * @throws InterruptedException
-    * @throws Exception
-    */
-   private void checkJournalOperation() throws InterruptedException, Exception
-   {
-      if (startCompactAt == currentOperation)
-      {
-         threadCompact();
-      }
-      if (joinCompactAt == currentOperation)
-      {
-         joinCompact();
-      }
-      if (secondCompactAt == currentOperation)
-      {
-         journal.compact();
-      }
-
-      currentOperation++;
-   }
-
    public void internalTest() throws Exception
    {
       createJournal();
@@ -276,63 +109,4 @@
       stopJournal();
    }
 
-   /**
-    * @param releaseCompactingLatch
-    * @param tCompact
-    * @throws InterruptedException
-    */
-   private void joinCompact() throws InterruptedException
-   {
-      releaseCompactingLatch.countDown();
-
-      tCompact.join();
-
-      tCompact = null;
-   }
-
-   /**
-    * @param startedCompactingLatch
-    * @return
-    * @throws InterruptedException
-    */
-   private void threadCompact() throws InterruptedException
-   {
-      tCompact = new Thread()
-      {
-         @Override
-         public void run()
-         {
-            try
-            {
-               journal.compact();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
-         }
-      };
-
-      tCompact.start();
-
-      startedCompactingLatch.await();
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
-    */
-   @Override
-   protected SequentialFileFactory getFileFactory() throws Exception
-   {
-      return new NIOSequentialFileFactory(getTestDir());
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
 }

Added: trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java	2010-08-18 02:45:12 UTC (rev 9556)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+/**
+ * A NIORandomCompactTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class AllPossibilitiesCompactWithAddDeleteStressTest extends MixupCompactorBase
+{
+
+   public void internalTest() throws Exception
+   {
+      createJournal();
+
+      startJournal();
+
+      loadAndCheck();
+
+      long tx0 = idGen.generateID();
+
+      long tx1 = idGen.generateID();
+
+      long add1 = idGen.generateID();
+
+      long add2 = idGen.generateID();
+
+      addTx(tx0, add1);
+
+      rollback(tx0);
+
+      addTx(tx1, add1, add2);
+
+      commit(tx1);
+
+      long tx2 = idGen.generateID();
+
+      updateTx(tx2, add1, add2);
+      
+      commit(tx2);
+
+      delete(add1);
+
+      delete(add2);
+
+      checkJournalOperation();
+
+      stopJournal();
+
+      createJournal();
+
+      startJournal();
+
+      loadAndCheck();
+
+      stopJournal();
+
+   }
+}

Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2010-08-18 02:45:12 UTC (rev 9556)
@@ -112,7 +112,7 @@
          maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
       }
 
-      journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
+      journal = new JournalImpl(50 * 1024,
                                 20,
                                 15,
                                 ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
@@ -127,7 +127,7 @@
             journal.forceMoveNextFile(false);
             System.out.println("OnCompactLock done");
          }
-      
+
          protected void onCompactStart() throws Exception
          {
             testExecutor.execute(new Runnable()
@@ -325,7 +325,7 @@
                long rollbackTXID = JournalCleanupCompactStressTest.idGen.generateID();
 
                final long ids[] = new long[txSize];
-               
+
                for (int i = 0; i < txSize; i++)
                {
                   ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
@@ -341,7 +341,7 @@
                   maxRecords.acquire();
                }
                journal.appendCommitRecord(txID, true, ctx);
-               
+
                ctx.executeOnCompletion(new IOAsyncTask()
                {
 
@@ -408,6 +408,11 @@
                   ids = new long[txSize];
                }
             }
+
+            if (txCount > 0)
+            {
+               journal.appendCommitRecord(txID, true);
+            }
          }
          catch (Exception e)
          {

Added: trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java	2010-08-18 02:45:12 UTC (rev 9556)
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.hornetq.utils.ReusableLatch;
+import org.hornetq.utils.SimpleIDGenerator;
+
+/**
+ * This class will control mix up compactor between each operation of a test
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class MixupCompactorBase extends JournalImplTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private ReusableLatch startedCompactingLatch = null;
+
+   private ReusableLatch releaseCompactingLatch = null;
+
+   private Thread tCompact = null;
+
+   int startCompactAt;
+
+   int joinCompactAt;
+
+   int secondCompactAt;
+
+   int currentOperation;
+
+   SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      tCompact = null;
+
+      startedCompactingLatch = new ReusableLatch(1);
+
+      releaseCompactingLatch = new ReusableLatch(1);
+
+      File file = new File(getTestDir());
+
+      deleteDirectory(file);
+
+      file.mkdir();
+
+      setup(2, 60 * 1024, false);
+
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+
+      File testDir = new File(getTestDir());
+
+      File files[] = testDir.listFiles(new FilenameFilter()
+      {
+
+         public boolean accept(final File dir, final String name)
+         {
+            return name.startsWith(filePrefix) && name.endsWith(fileExtension);
+         }
+      });
+
+      for (File file : files)
+      {
+         Assert.assertEquals("File " + file + " doesn't have the expected number of bytes", fileSize, file.length());
+      }
+
+      super.tearDown();
+   }
+
+   @Override
+   public void createJournal() throws Exception
+   {
+      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+      {
+
+         @Override
+         public void onCompactDone()
+         {
+            startedCompactingLatch.countDown();
+            try
+            {
+               releaseCompactingLatch.await();
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      journal.setAutoReclaim(false);
+   }
+
+   public void testMixOperations() throws Exception
+   {
+
+
+      currentOperation = 0;
+      internalTest();
+      int MAX_OPERATIONS = testMix(-1, -1, -1);
+
+      System.out.println("Using MAX_OPERATIONS = " + MAX_OPERATIONS);
+
+      for (int startAt = 0; startAt < MAX_OPERATIONS; startAt++)
+      {
+         for (int joinAt = startAt; joinAt < MAX_OPERATIONS; joinAt++)
+         {
+            for (int secondAt = joinAt ; secondAt < MAX_OPERATIONS; secondAt++)
+            {
+               System.out.println("start=" + startAt + ", join=" + joinAt + ", second=" + secondAt);
+
+               try
+               {
+                  tearDown();
+                  setUp();
+                  testMix(startAt, joinAt, secondAt);
+               }
+               catch (Throwable e)
+               {
+                  throw new Exception("Error at compact=" + startCompactAt +
+                                      ", joinCompactAt=" +
+                                      joinCompactAt +
+                                      ", secondCompactAt=" +
+                                      secondCompactAt, e);
+               }
+            }
+         }
+      }
+   }
+   
+   protected int testMix(final int startAt, final int joinAt, final int secondAt) throws Exception
+   {
+      startCompactAt = startAt;
+      joinCompactAt = joinAt;
+      secondCompactAt = secondAt;
+      
+      currentOperation = 0;
+      
+      internalTest();
+      
+      return currentOperation;
+   }
+
+   @Override
+   protected void beforeJournalOperation() throws Exception
+   {
+      checkJournalOperation();
+   }
+
+   /**
+    * @throws InterruptedException
+    * @throws Exception
+    */
+   protected void checkJournalOperation() throws InterruptedException, Exception
+   {
+      if (startCompactAt == currentOperation)
+      {
+         threadCompact();
+      }
+      if (joinCompactAt == currentOperation)
+      {
+         joinCompact();
+      }
+      if (secondCompactAt == currentOperation)
+      {
+         journal.compact();
+      }
+
+      currentOperation++;
+   }
+
+   protected abstract void internalTest() throws Exception;
+
+   /**
+    * @param releaseCompactingLatch
+    * @param tCompact
+    * @throws InterruptedException
+    */
+   private void joinCompact() throws InterruptedException
+   {
+      releaseCompactingLatch.countDown();
+
+      tCompact.join();
+
+      tCompact = null;
+   }
+
+   /**
+    * @param startedCompactingLatch
+    * @return
+    * @throws InterruptedException
+    */
+   private void threadCompact() throws InterruptedException
+   {
+      tCompact = new Thread()
+      {
+         @Override
+         public void run()
+         {
+            try
+            {
+               journal.compact();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      tCompact.start();
+
+      startedCompactingLatch.await();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+    */
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception
+   {
+      return new NIOSequentialFileFactory(getTestDir());
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2010-08-18 02:45:12 UTC (rev 9556)
@@ -35,6 +35,7 @@
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ReusableLatch;
 
 /**
  * 
@@ -69,6 +70,12 @@
 
    protected SequentialFileFactory fileFactory;
 
+   private ReusableLatch latchDone = new ReusableLatch(0);
+
+   private ReusableLatch latchWait = new ReusableLatch(0);
+
+   private Thread compactThread;
+
    @Override
    protected void setUp() throws Exception
    {
@@ -144,10 +151,60 @@
 
    public void createJournal() throws Exception
    {
-      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
+      journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+      {
+         @Override
+         public void onCompactDone()
+         {
+            latchDone.countDown();
+            System.out.println("Waiting on Compact");
+            try
+            {
+               latchWait.await();
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+            System.out.println("Waiting on Compact Done");
+         }
+      };
+
       journal.setAutoReclaim(false);
    }
 
+   // It will start compacting, but it will let the thread in wait mode at onCompactDone, so we can validate command
+   // executions
+   protected void startCompact() throws Exception
+   {
+      latchDone.setCount(1);
+      latchWait.setCount(1);
+      this.compactThread = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               journal.compact();
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+
+      this.compactThread.start();
+
+      latchDone.await();
+   }
+
+   protected void finishCompact() throws Exception
+   {
+      latchWait.countDown();
+      compactThread.join();
+   }
+
    protected void startJournal() throws Exception
    {
       journal.start();
@@ -211,8 +268,6 @@
                                   getTestDir() + "/output.log");
    }
 
-
-   
    protected void loadAndCheck() throws Exception
    {
       loadAndCheck(false);
@@ -258,7 +313,7 @@
    {
       journal.load(null, null, null);
    }
-   
+
    protected void beforeJournalOperation() throws Exception
    {
    }



More information about the hornetq-commits mailing list