[hornetq-commits] JBoss hornetq SVN: r9521 - in trunk: tests/src/org/hornetq/tests/stress/journal and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Aug 9 21:11:14 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-09 21:11:14 -0400 (Mon, 09 Aug 2010)
New Revision: 9521

Modified:
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
HORNETQ-476 moving pending close avoid unecessary locks before close

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-10 00:02:55 UTC (rev 9520)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-10 01:11:14 UTC (rev 9521)
@@ -1551,6 +1551,8 @@
             JournalImpl.trace("Starting compacting operation on journal");
          }
          JournalImpl.log.debug("Starting compacting operation on journal");
+         
+         onCompactStart();
 
          // We need to guarantee that the journal is frozen for this short time
          // We don't freeze the journal as we compact, only for the short time where we replace records
@@ -1571,16 +1573,10 @@
 
             dataFilesToProcess.addAll(dataFiles);
 
-            for (JournalFile file : pendingCloseFiles)
-            {
-               file.getFile().close();
-            }
+            dataFiles.clear();
 
-            dataFilesToProcess.addAll(pendingCloseFiles);
-            pendingCloseFiles.clear();
+            drainClosedFiles();
 
-            dataFiles.clear();
-
             if (dataFilesToProcess.size() == 0)
             {
                return;
@@ -2294,7 +2290,7 @@
       {
          return;
       }
-
+ 
       compactingLock.readLock().lock();
 
       try
@@ -2808,10 +2804,15 @@
    }
 
    /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
+   protected void onCompactStart() throws Exception
+   {
+   }
+
+   /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
    protected void onCompactDone()
    {
    }
-
+   
    // Private
    // -----------------------------------------------------------------------------
 
@@ -3314,6 +3315,8 @@
          {
             try
             {
+               drainClosedFiles();
+
                if (!checkReclaimStatus())
                {
                   checkCompact();
@@ -3384,33 +3387,13 @@
    {
       fileFactory.deactivateBuffer();
       pendingCloseFiles.add(file);
+      dataFiles.add(file);
 
       Runnable run = new Runnable()
       {
          public void run()
          {
-            compactingLock.readLock().lock();
-            try
-            {
-               // The file could be closed by compacting. On this case we need to check if the close still pending
-               // before we add it to dataFiles
-               if (pendingCloseFiles.remove(file))
-               {
-                  dataFiles.add(file);
-                  if (file.getFile().isOpen())
-                  {
-                     file.getFile().close();
-                  }
-               }
-            }
-            catch (Exception e)
-            {
-               JournalImpl.log.warn(e.getMessage(), e);
-            }
-            finally
-            {
-               compactingLock.readLock().unlock();
-            }
+            drainClosedFiles();
          }
       };
 
@@ -3424,7 +3407,24 @@
       }
 
    }
+   
+   private void drainClosedFiles()
+   {
+      JournalFile file;
+      try
+      {
+         while ((file = pendingCloseFiles.poll()) != null)
+         {
+            file.getFile().close();
+         }
+      }
+      catch (Exception e)
+      {
+         JournalImpl.log.warn(e.getMessage(), e);
+      }
 
+   }
+
    private JournalTransaction getTransactionInfo(final long txID)
    {
       JournalTransaction tx = transactions.get(txID);

Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2010-08-10 00:02:55 UTC (rev 9520)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java	2010-08-10 01:11:14 UTC (rev 9521)
@@ -16,6 +16,8 @@
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
@@ -23,6 +25,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.swing.plaf.basic.BasicInternalFrameTitlePane.MoveAction;
+
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.IOAsyncTask;
@@ -52,9 +56,9 @@
 {
 
    public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
-   
-   
+
    private static final int MAX_WRITES = 20000;
+
    // We want to maximize the difference between appends and deles, or we could get out of memory
    public Semaphore maxRecords;
 
@@ -74,17 +78,23 @@
                                                      false,
                                                      JournalCleanupCompactStressTest.class.getClassLoader());
 
-   private final ExecutorService threadPool = Executors.newFixedThreadPool(20, tFactory);
+   private ExecutorService threadPool;
 
-   OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
+   private OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
 
+   Executor testExecutor;
+
    @Override
    public void setUp() throws Exception
    {
       super.setUp();
 
+      threadPool = Executors.newFixedThreadPool(20, tFactory);
+      executorFactory = new OrderedExecutorFactory(threadPool);
+      testExecutor = executorFactory.getExecutor();
+
       maxRecords = new Semaphore(MAX_WRITES);
-      
+
       errors.set(0);
 
       File dir = new File(getTemporaryDir());
@@ -111,8 +121,38 @@
                                 factory,
                                 "hornetq-data",
                                 "hq",
-                                maxAIO);
+                                maxAIO)
+      {
+         protected void onCompactStart() throws Exception
+         {
+            testExecutor.execute(new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     System.out.println("OnCompactSTart enter");
+                     for (int i = 0; i < 20; i++)
+                     {
+                        long id = idGen.generateID();
+                        journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
+                        journal.forceMoveNextFile();
+                        journal.appendDeleteRecord(id, id == 20);
+                     }
+                     System.out.println("OnCompactSTart leave");
+                  }
+                  catch (Exception e)
+                  {
+                     e.printStackTrace();
+                     errors.incrementAndGet();
+                  }
+               }
+            });
 
+         }
+
+      };
+
       journal.start();
       journal.loadInternalOnly();
 
@@ -132,8 +172,10 @@
       {
          // don't care :-)
       }
+
+      threadPool.shutdown();
    }
-   
+
    protected long getTotalTimeMilliseconds()
    {
       return TimeUnit.MINUTES.toMillis(10);
@@ -185,7 +227,6 @@
       // Release Semaphore after setting running to false or the threads may never finish
       maxRecords.release(MAX_WRITES - maxRecords.availablePermits());
 
-
       for (Thread t : appenders)
       {
          t.join();
@@ -198,6 +239,17 @@
 
       t1.join();
 
+      final CountDownLatch latchExecutorDone = new CountDownLatch(1);
+      testExecutor.execute(new Runnable()
+      {
+         public void run()
+         {
+            latchExecutorDone.countDown();
+         }
+      });
+
+      latchExecutorDone.await();
+
       assertEquals(0, errors.get());
 
       journal.stop();
@@ -247,7 +299,7 @@
       LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
 
       OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
-      
+
       public FastAppenderTx()
       {
          super("FastAppenderTX");
@@ -392,12 +444,12 @@
     */
    class SlowAppenderNoTX extends Thread
    {
-      
+
       public SlowAppenderNoTX()
       {
          super("SlowAppender");
       }
-      
+
       @Override
       public void run()
       {



More information about the hornetq-commits mailing list