[hornetq-commits] JBoss hornetq SVN: r9521 - in trunk: tests/src/org/hornetq/tests/stress/journal and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Aug 9 21:11:14 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-09 21:11:14 -0400 (Mon, 09 Aug 2010)
New Revision: 9521
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
HORNETQ-476 moving pending close avoid unecessary locks before close
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-10 00:02:55 UTC (rev 9520)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-10 01:11:14 UTC (rev 9521)
@@ -1551,6 +1551,8 @@
JournalImpl.trace("Starting compacting operation on journal");
}
JournalImpl.log.debug("Starting compacting operation on journal");
+
+ onCompactStart();
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
@@ -1571,16 +1573,10 @@
dataFilesToProcess.addAll(dataFiles);
- for (JournalFile file : pendingCloseFiles)
- {
- file.getFile().close();
- }
+ dataFiles.clear();
- dataFilesToProcess.addAll(pendingCloseFiles);
- pendingCloseFiles.clear();
+ drainClosedFiles();
- dataFiles.clear();
-
if (dataFilesToProcess.size() == 0)
{
return;
@@ -2294,7 +2290,7 @@
{
return;
}
-
+
compactingLock.readLock().lock();
try
@@ -2808,10 +2804,15 @@
}
/** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
+ protected void onCompactStart() throws Exception
+ {
+ }
+
+ /** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
protected void onCompactDone()
{
}
-
+
// Private
// -----------------------------------------------------------------------------
@@ -3314,6 +3315,8 @@
{
try
{
+ drainClosedFiles();
+
if (!checkReclaimStatus())
{
checkCompact();
@@ -3384,33 +3387,13 @@
{
fileFactory.deactivateBuffer();
pendingCloseFiles.add(file);
+ dataFiles.add(file);
Runnable run = new Runnable()
{
public void run()
{
- compactingLock.readLock().lock();
- try
- {
- // The file could be closed by compacting. On this case we need to check if the close still pending
- // before we add it to dataFiles
- if (pendingCloseFiles.remove(file))
- {
- dataFiles.add(file);
- if (file.getFile().isOpen())
- {
- file.getFile().close();
- }
- }
- }
- catch (Exception e)
- {
- JournalImpl.log.warn(e.getMessage(), e);
- }
- finally
- {
- compactingLock.readLock().unlock();
- }
+ drainClosedFiles();
}
};
@@ -3424,7 +3407,24 @@
}
}
+
+ private void drainClosedFiles()
+ {
+ JournalFile file;
+ try
+ {
+ while ((file = pendingCloseFiles.poll()) != null)
+ {
+ file.getFile().close();
+ }
+ }
+ catch (Exception e)
+ {
+ JournalImpl.log.warn(e.getMessage(), e);
+ }
+ }
+
private JournalTransaction getTransactionInfo(final long txID)
{
JournalTransaction tx = transactions.get(txID);
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-10 00:02:55 UTC (rev 9520)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-10 01:11:14 UTC (rev 9521)
@@ -16,6 +16,8 @@
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@@ -23,6 +25,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.swing.plaf.basic.BasicInternalFrameTitlePane.MoveAction;
+
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.IOAsyncTask;
@@ -52,9 +56,9 @@
{
public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
-
-
+
private static final int MAX_WRITES = 20000;
+
// We want to maximize the difference between appends and deles, or we could get out of memory
public Semaphore maxRecords;
@@ -74,17 +78,23 @@
false,
JournalCleanupCompactStressTest.class.getClassLoader());
- private final ExecutorService threadPool = Executors.newFixedThreadPool(20, tFactory);
+ private ExecutorService threadPool;
- OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
+ private OrderedExecutorFactory executorFactory = new OrderedExecutorFactory(threadPool);
+ Executor testExecutor;
+
@Override
public void setUp() throws Exception
{
super.setUp();
+ threadPool = Executors.newFixedThreadPool(20, tFactory);
+ executorFactory = new OrderedExecutorFactory(threadPool);
+ testExecutor = executorFactory.getExecutor();
+
maxRecords = new Semaphore(MAX_WRITES);
-
+
errors.set(0);
File dir = new File(getTemporaryDir());
@@ -111,8 +121,38 @@
factory,
"hornetq-data",
"hq",
- maxAIO);
+ maxAIO)
+ {
+ protected void onCompactStart() throws Exception
+ {
+ testExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ System.out.println("OnCompactSTart enter");
+ for (int i = 0; i < 20; i++)
+ {
+ long id = idGen.generateID();
+ journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
+ journal.forceMoveNextFile();
+ journal.appendDeleteRecord(id, id == 20);
+ }
+ System.out.println("OnCompactSTart leave");
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
+ }
+
+ };
+
journal.start();
journal.loadInternalOnly();
@@ -132,8 +172,10 @@
{
// don't care :-)
}
+
+ threadPool.shutdown();
}
-
+
protected long getTotalTimeMilliseconds()
{
return TimeUnit.MINUTES.toMillis(10);
@@ -185,7 +227,6 @@
// Release Semaphore after setting running to false or the threads may never finish
maxRecords.release(MAX_WRITES - maxRecords.availablePermits());
-
for (Thread t : appenders)
{
t.join();
@@ -198,6 +239,17 @@
t1.join();
+ final CountDownLatch latchExecutorDone = new CountDownLatch(1);
+ testExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ latchExecutorDone.countDown();
+ }
+ });
+
+ latchExecutorDone.await();
+
assertEquals(0, errors.get());
journal.stop();
@@ -247,7 +299,7 @@
LinkedBlockingDeque<Long> queue = new LinkedBlockingDeque<Long>();
OperationContextImpl ctx = new OperationContextImpl(executorFactory.getExecutor());
-
+
public FastAppenderTx()
{
super("FastAppenderTX");
@@ -392,12 +444,12 @@
*/
class SlowAppenderNoTX extends Thread
{
-
+
public SlowAppenderNoTX()
{
super("SlowAppender");
}
-
+
@Override
public void run()
{
More information about the hornetq-commits
mailing list