Author: clebert.suconic(a)jboss.com
Date: 2011-03-26 17:39:32 -0400 (Sat, 26 Mar 2011)
New Revision: 10375
Modified:
trunk/src/main/org/hornetq/core/journal/TestableJournal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
Log:
r10373:10374 from Branch_2_2_eap
Modified: trunk/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2011-03-26 21:27:28 UTC
(rev 10374)
+++ trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2011-03-26 21:39:32 UTC
(rev 10375)
@@ -53,7 +53,7 @@
boolean isAutoReclaim();
- void compact() throws Exception;
+ void testCompact() throws Exception;
JournalFile getCurrentFile();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-03-26
21:27:28 UTC (rev 10374)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-03-26
21:39:32 UTC (rev 10375)
@@ -79,7 +79,9 @@
private final int userVersion;
- private Executor filesExecutor;
+ private Executor openFilesExecutor;
+
+ private Executor closeFilesExecutor;
// Static --------------------------------------------------------
@@ -104,9 +106,10 @@
// Public --------------------------------------------------------
- public void setExecutor(final Executor executor)
+ public void setExecutor(final Executor fileExecutor, final Executor closeExecutor)
{
- filesExecutor = executor;
+ this.openFilesExecutor = fileExecutor;
+ this.closeFilesExecutor = closeExecutor;
}
public void clear()
@@ -358,13 +361,13 @@
}
};
- if (filesExecutor == null)
+ if (openFilesExecutor == null)
{
run.run();
}
else
{
- filesExecutor.execute(run);
+ openFilesExecutor.execute(run);
}
JournalFile nextFile = null;
@@ -417,13 +420,15 @@
}
};
- if (filesExecutor == null)
+ // We can't close files while the compactor is running
+ // as we may be closing files that are being read by the compactor
+ if (closeFilesExecutor == null)
{
run.run();
}
else
{
- filesExecutor.execute(run);
+ closeFilesExecutor.execute(run);
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-03-26 21:27:28 UTC
(rev 10374)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-03-26 21:39:32 UTC
(rev 10375)
@@ -32,6 +32,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
@@ -1551,13 +1552,67 @@
return info;
}
+
+
+ public void testCompact() throws Exception
+ {
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ compactorRunning.set(true);
+
+ // We can't use the executor for the compacting... or we would dead lock
because of file open and creation
+ // operations (that will use the executor)
+ compactorExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ JournalImpl.this.compact();
+ }
+ catch (Throwable e)
+ {
+ errors.incrementAndGet();
+ JournalImpl.log.error(e.getMessage(), e);
+ e.printStackTrace();
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ }
+ });
+
+ try
+ {
+ if (!latch.await(60, TimeUnit.SECONDS))
+ {
+ throw new RuntimeException("Didn't finish compact timely");
+ }
+
+ if (errors.get() > 0)
+ {
+ throw new RuntimeException("Error during testCompact, look at the
logs");
+ }
+ }
+ finally
+ {
+ compactorRunning.set(false);
+ }
+ }
+
/**
*
* Note: This method can't be called from the main executor, as it will invoke
other methods depending on it.
*
+ * Note: only synchronized methods on journal are methods responsible for the
life-cycle such as stop, start
+ * records will still come as this is being executed
+ *
*/
- public synchronized void compact() throws Exception
+ protected synchronized void compact() throws Exception
{
if (compactor != null)
@@ -2489,7 +2544,7 @@
}
});
- filesRepository.setExecutor(filesExecutor);
+ filesRepository.setExecutor(filesExecutor, compactorExecutor);
fileFactory.start();
@@ -2521,7 +2576,7 @@
filesExecutor.shutdown();
- filesRepository.setExecutor(null);
+ filesRepository.setExecutor(null, null);
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
{
@@ -2892,7 +2947,7 @@
callback = null;
}
- if (sync)
+ if (sync && !compactorRunning.get())
{
// In an edge case the transaction could still have pending data from
previous files.
// This shouldn't cause any blocking issues, as this is here to guarantee
we cover all possibilities
@@ -2956,7 +3011,7 @@
if (autoReclaim && !compactorRunning.get())
{
- filesExecutor.execute(new Runnable()
+ compactorExecutor.execute(new Runnable()
{
public void run()
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-03-26
21:27:28 UTC (rev 10374)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2011-03-26
21:39:32 UTC (rev 10375)
@@ -174,10 +174,14 @@
return read(bytes, null);
}
- public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws Exception
+ public synchronized int read(final ByteBuffer bytes, final IOAsyncTask callback)
throws Exception
{
try
{
+ if (channel == null)
+ {
+ throw new Exception("File " + this.getFileName() + " has a
null channel");
+ }
int bytesRead = channel.read(bytes);
if (callback != null)
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-03-26
21:27:28 UTC (rev 10374)
+++
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-03-26
21:39:32 UTC (rev 10375)
@@ -131,6 +131,19 @@
Assert.assertFalse(iterNewFiles.hasNext());
}
+
+// public void testRepeat() throws Exception
+// {
+// int i = 0 ;
+//
+// while (true)
+// {
+// System.out.println("#test (" + (i++) + ")");
+// testCrashRenamingFiles();
+// tearDown();
+// setUp();
+// }
+// }
public void testCrashRenamingFiles() throws Exception
{
@@ -229,7 +242,7 @@
journal.appendDeleteRecord(i, true);
}
- journal.compact();
+ journal.testCompact();
journal.stop();
@@ -277,7 +290,7 @@
finishCompact();
- journal.compact();
+ journal.testCompact();
stopJournal();
@@ -316,7 +329,7 @@
finishCompact();
- journal.compact();
+ journal.testCompact();
stopJournal();
@@ -347,7 +360,7 @@
finishCompact();
- journal.compact();
+ journal.testCompact();
stopJournal();
@@ -367,6 +380,8 @@
startJournal();
+ journal.setAutoReclaim(false);
+
load();
add(1);
@@ -375,7 +390,7 @@
rollback(2);
- journal.compact();
+ journal.testCompact();
stopJournal();
@@ -408,7 +423,7 @@
journal.forceMoveNextFile();
- journal.compact();
+ journal.testCompact();
add(10);
@@ -437,9 +452,9 @@
updateTx(2, 1);
- journal.compact();
+ journal.testCompact();
- journal.compact();
+ journal.testCompact();
commit(2);
@@ -610,7 +625,7 @@
{
try
{
- journal.compact();
+ journal.testCompact();
}
catch (Exception e)
{
@@ -769,7 +784,7 @@
if (createControlFile && deleteControlFile &&
renameFilesAfterCompacting)
{
- journal.compact();
+ journal.testCompact();
}
stopJournal();
@@ -822,7 +837,7 @@
add(newRecord);
update(newRecord);
- journal.compact();
+ journal.testCompact();
System.out.println("Debug after compact\n" + journal.debug());
@@ -874,7 +889,7 @@
journal.forceMoveNextFile();
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -915,7 +930,7 @@
finishCompact();
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -966,7 +981,7 @@
add(newRecord);
update(newRecord);
- journal.compact();
+ journal.testCompact();
System.out.println("Debug after compact\n" + journal.debug());
@@ -1025,7 +1040,7 @@
journal.forceMoveNextFile();
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -1117,7 +1132,7 @@
finishCompact();
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -1201,11 +1216,11 @@
System.out.println(journal.debug());
System.out.println("*****************************************");
- journal.compact();
+ journal.testCompact();
add(idGenerator.generateID());
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -1326,11 +1341,11 @@
journal.forceMoveNextFile();
// This operation used to be journal.cleanup(journal.getDataFiles()[0]); when
cleanup was still in place
- journal.compact();
+ journal.testCompact();
journal.checkReclaimStatus();
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -1384,7 +1399,7 @@
ExportJournal.exportJournal(getTestDir(), filePrefix, fileExtension, 2,
this.fileSize, "/tmp/out4.dmp");
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -1432,7 +1447,7 @@
journal.checkReclaimStatus();
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -1492,7 +1507,7 @@
journal.checkReclaimStatus();
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -1552,7 +1567,7 @@
journal.checkReclaimStatus();
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -1619,7 +1634,7 @@
journal.forceMoveNextFile();
- journal.compact();
+ journal.testCompact();
stopJournal();
createJournal();
@@ -1829,7 +1844,7 @@
{
Thread.sleep(500);
System.out.println("Compacting");
- ((JournalImpl)storage.getMessageJournal()).compact();
+ ((JournalImpl)storage.getMessageJournal()).testCompact();
((JournalImpl)storage.getMessageJournal()).checkReclaimStatus();
}
}
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java 2011-03-26
21:27:28 UTC (rev 10374)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java 2011-03-26
21:39:32 UTC (rev 10375)
@@ -200,7 +200,7 @@
}
if (secondCompactAt == currentOperation)
{
- journal.compact();
+ journal.testCompact();
}
currentOperation++;
@@ -236,7 +236,7 @@
{
try
{
- journal.compact();
+ journal.testCompact();
}
catch (Exception e)
{
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2011-03-26
21:27:28 UTC (rev 10374)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2011-03-26
21:39:32 UTC (rev 10375)
@@ -185,7 +185,7 @@
{
try
{
- journal.compact();
+ journal.testCompact();
}
catch (Throwable e)
{