[hornetq-commits] JBoss hornetq SVN: r9556 - in trunk: src/main/org/hornetq/utils and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Aug 17 22:45:13 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-17 22:45:12 -0400 (Tue, 17 Aug 2010)
New Revision: 9556
Added:
trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/utils/ReusableLatch.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
Log:
Adding a few more tests on compacting & minor changes just making sure about everything is all right
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -1570,8 +1570,8 @@
}
catch (Throwable e)
{
- log.warn("Error on reading compacting for " + file);
- throw new Exception("Error on reading compacting for " + file, e);
+ log.warn("Error on reading compacting for " + file);
+ throw new Exception("Error on reading compacting for " + file, e);
}
}
@@ -1644,6 +1644,10 @@
{
liveTransaction.merge(newTransaction);
}
+ else
+ {
+ log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
+ }
}
}
finally
@@ -2114,6 +2118,12 @@
*/
public boolean checkReclaimStatus() throws Exception
{
+
+ if (compactorRunning.get())
+ {
+ return false;
+ }
+
// We can't start reclaim while compacting is working
compactingLock.readLock().lock();
try
@@ -2191,6 +2201,12 @@
}
return true;
}
+ else
+ {
+ // We only cleanup the first files
+ // if a middle file needs cleanup it will be done through compacting
+ break;
+ }
}
}
}
@@ -2298,10 +2314,10 @@
controlFile.delete();
final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
-
+
if (trace)
{
- trace("Adding free file back from cleanup" + retJournalfile);
+ trace("Adding free file back from cleanup" + retJournalfile);
}
filesExecutor.execute(new Runnable()
@@ -3280,7 +3296,7 @@
{
return;
}
-
+
if (autoReclaim && !compactorRunning.get())
{
filesExecutor.execute(new Runnable()
Modified: trunk/src/main/org/hornetq/utils/ReusableLatch.java
===================================================================
--- trunk/src/main/org/hornetq/utils/ReusableLatch.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/src/main/org/hornetq/utils/ReusableLatch.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -50,6 +50,11 @@
{
return getState();
}
+
+ public void setCount(final int count)
+ {
+ setState(count);
+ }
@Override
public int tryAcquireShared(final int numberOfAqcquires)
@@ -107,6 +112,11 @@
{
return control.getCount();
}
+
+ public void setCount(final int count)
+ {
+ control.setCount(count);
+ }
public void countUp()
{
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -237,57 +237,93 @@
{
setup(2, 60 * 1024, false);
-
+
createJournal();
-
+
startJournal();
-
+
load();
-
+
add(1);
-
+
updateTx(2, 1);
-
+
rollback(2);
+
+ journal.compact();
+
+ stopJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ stopJournal();
+
+ }
+
+ public void testCompactSecondFileReclaimed() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ addTx(1, 1, 2, 3, 4);
+
+ journal.forceMoveNextFile();
+
+ addTx(1, 5, 6, 7, 8);
+ commit(1);
+
+ journal.forceMoveNextFile();
+
journal.compact();
+ add(10);
+
stopJournal();
-
+
startJournal();
-
+
loadAndCheck();
-
+
stopJournal();
}
-
public void testIncompleteTXDuringcompact() throws Exception
{
setup(2, 60 * 1024, false);
-
+
createJournal();
-
+
startJournal();
-
+
load();
-
+
add(1);
-
+
updateTx(2, 1);
-
+
journal.compact();
-
+
+ journal.compact();
+
commit(2);
-
+
stopJournal();
-
+
startJournal();
-
+
loadAndCheck();
-
+
stopJournal();
}
@@ -625,31 +661,7 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
-
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
+ createJournal();
journal.setAutoReclaim(false);
startJournal();
@@ -665,26 +677,8 @@
addTx(consumerTX, firstID);
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
+ startCompact();
- tCompact.start();
-
- reusableLatchDone.await();
-
addTx(appendTX, addedRecord);
commit(appendTX);
@@ -695,10 +689,8 @@
delete(addedRecord);
- reusableLatchWait.countDown();
+ finishCompact();
- tCompact.join();
-
journal.forceMoveNextFile();
long newRecord = idGen.generateID();
@@ -723,31 +715,8 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
+ createJournal();
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
journal.setAutoReclaim(false);
startJournal();
@@ -763,26 +732,8 @@
addTx(consumerTX, firstID);
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
+ startCompact();
- tCompact.start();
-
- reusableLatchDone.await();
-
addTx(appendTX, addedRecord);
commit(appendTX);
updateTx(consumerTX, addedRecord);
@@ -794,10 +745,8 @@
commit(deleteTXID);
- reusableLatchWait.countDown();
+ finishCompact();
- tCompact.join();
-
journal.forceMoveNextFile();
journal.compact();
@@ -816,31 +765,8 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
+ createJournal();
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
journal.setAutoReclaim(false);
startJournal();
@@ -856,34 +782,14 @@
updateTx(consumerTX, firstID);
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
+ startCompact();
- tCompact.start();
-
- reusableLatchDone.await();
-
addTx(consumerTX, addedRecord);
commit(consumerTX);
delete(addedRecord);
- reusableLatchWait.countDown();
+ finishCompact();
- tCompact.join();
-
journal.compact();
stopJournal();
@@ -991,6 +897,63 @@
}
+ public void testCompactAddAndUpdateFollowedByADelete6() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+ createJournal();
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+ long tx0 = idGen.generateID();
+
+ long tx1 = idGen.generateID();
+
+ long add1 = idGen.generateID();
+
+ long add2 = idGen.generateID();
+
+ startCompact();
+
+ addTx(tx0, add1);
+
+ rollback(tx0);
+
+ addTx(tx1, add1, add2);
+ commit(tx1);
+
+ finishCompact();
+
+ long tx2 = idGen.generateID();
+
+ updateTx(tx2, add1, add2);
+ commit(tx2);
+
+ delete(add1);
+
+ startCompact();
+
+ delete(add2);
+
+ finishCompact();
+
+ journal.forceMoveNextFile();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
public void testDeleteWhileCleanup() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -13,16 +13,7 @@
package org.hornetq.tests.stress.journal;
-import java.io.File;
-import java.io.FilenameFilter;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
-import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.ReusableLatch;
-
/**
* A NIORandomCompactTest
*
@@ -30,167 +21,9 @@
*
*
*/
-public class AllPossibilitiesCompactStressTest extends JournalImplTestBase
+public class AllPossibilitiesCompactStressTest extends MixupCompactorBase
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private ReusableLatch startedCompactingLatch = null;
-
- private ReusableLatch releaseCompactingLatch = null;
-
- private Thread tCompact = null;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- tCompact = null;
-
- startedCompactingLatch = new ReusableLatch();
-
- releaseCompactingLatch = new ReusableLatch();
-
- File file = new File(getTestDir());
-
- deleteDirectory(file);
-
- file.mkdir();
- }
-
- protected void tearDown() throws Exception
- {
-
- File testDir = new File(getTestDir());
-
- File files[] = testDir.listFiles(new FilenameFilter()
- {
-
- public boolean accept(File dir, String name)
- {
- return name.startsWith(filePrefix) && name.endsWith(fileExtension);
- }
- });
-
- for (File file : files)
- {
- assertEquals("File " + file + " doesn't have the expected number of bytes", fileSize, file.length());
- }
-
- super.tearDown();
- }
-
- int startCompactAt;
-
- int joinCompactAt;
-
- int secondCompactAt;
-
- int currentOperation;
-
- SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
-
- public void createJournal() throws Exception
- {
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- startedCompactingLatch.countDown();
- try
- {
- releaseCompactingLatch.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- };
-
- journal.setAutoReclaim(false);
- }
-
-
- public void testMixOperations() throws Exception
- {
-
- setup(2, 60 * 1024, false);
-
- startCompactAt = joinCompactAt = secondCompactAt = -1;
-
- currentOperation = 0;
- internalTest();
- int MAX_OPERATIONS = currentOperation;
-
- System.out.println("Using MAX_OPERATIONS = " + MAX_OPERATIONS);
-
- for (startCompactAt = 0; startCompactAt < MAX_OPERATIONS; startCompactAt++)
- {
- for (joinCompactAt = startCompactAt; joinCompactAt < MAX_OPERATIONS; joinCompactAt++)
- {
- for (secondCompactAt = joinCompactAt; secondCompactAt < MAX_OPERATIONS; secondCompactAt++)
- {
- System.out.println("start=" + startCompactAt + ", join=" + joinCompactAt + ", second=" + secondCompactAt);
-
- currentOperation = 0;
- try
- {
- tearDown();
- setUp();
- internalTest();
- }
- catch (Throwable e)
- {
- throw new Exception("Error at compact=" + startCompactAt +
- ", joinCompactAt=" +
- joinCompactAt +
- ", secondCompactAt=" +
- secondCompactAt, e);
- }
- }
- }
- }
- }
-
- protected void beforeJournalOperation() throws Exception
- {
- checkJournalOperation();
- }
-
- /**
- * @throws InterruptedException
- * @throws Exception
- */
- private void checkJournalOperation() throws InterruptedException, Exception
- {
- if (startCompactAt == currentOperation)
- {
- threadCompact();
- }
- if (joinCompactAt == currentOperation)
- {
- joinCompact();
- }
- if (secondCompactAt == currentOperation)
- {
- journal.compact();
- }
-
- currentOperation++;
- }
-
public void internalTest() throws Exception
{
createJournal();
@@ -276,63 +109,4 @@
stopJournal();
}
- /**
- * @param releaseCompactingLatch
- * @param tCompact
- * @throws InterruptedException
- */
- private void joinCompact() throws InterruptedException
- {
- releaseCompactingLatch.countDown();
-
- tCompact.join();
-
- tCompact = null;
- }
-
- /**
- * @param startedCompactingLatch
- * @return
- * @throws InterruptedException
- */
- private void threadCompact() throws InterruptedException
- {
- tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
- tCompact.start();
-
- startedCompactingLatch.await();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
- */
- @Override
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- return new NIOSequentialFileFactory(getTestDir());
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Added: trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+/**
+ * A NIORandomCompactTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class AllPossibilitiesCompactWithAddDeleteStressTest extends MixupCompactorBase
+{
+
+ public void internalTest() throws Exception
+ {
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ long tx0 = idGen.generateID();
+
+ long tx1 = idGen.generateID();
+
+ long add1 = idGen.generateID();
+
+ long add2 = idGen.generateID();
+
+ addTx(tx0, add1);
+
+ rollback(tx0);
+
+ addTx(tx1, add1, add2);
+
+ commit(tx1);
+
+ long tx2 = idGen.generateID();
+
+ updateTx(tx2, add1, add2);
+
+ commit(tx2);
+
+ delete(add1);
+
+ delete(add2);
+
+ checkJournalOperation();
+
+ stopJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ stopJournal();
+
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -112,7 +112,7 @@
maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
}
- journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
+ journal = new JournalImpl(50 * 1024,
20,
15,
ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
@@ -127,7 +127,7 @@
journal.forceMoveNextFile(false);
System.out.println("OnCompactLock done");
}
-
+
protected void onCompactStart() throws Exception
{
testExecutor.execute(new Runnable()
@@ -325,7 +325,7 @@
long rollbackTXID = JournalCleanupCompactStressTest.idGen.generateID();
final long ids[] = new long[txSize];
-
+
for (int i = 0; i < txSize; i++)
{
ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
@@ -341,7 +341,7 @@
maxRecords.acquire();
}
journal.appendCommitRecord(txID, true, ctx);
-
+
ctx.executeOnCompletion(new IOAsyncTask()
{
@@ -408,6 +408,11 @@
ids = new long[txSize];
}
}
+
+ if (txCount > 0)
+ {
+ journal.appendCommitRecord(txID, true);
+ }
}
catch (Exception e)
{
Added: trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.hornetq.utils.ReusableLatch;
+import org.hornetq.utils.SimpleIDGenerator;
+
+/**
+ * This class will control mix up compactor between each operation of a test
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class MixupCompactorBase extends JournalImplTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private ReusableLatch startedCompactingLatch = null;
+
+ private ReusableLatch releaseCompactingLatch = null;
+
+ private Thread tCompact = null;
+
+ int startCompactAt;
+
+ int joinCompactAt;
+
+ int secondCompactAt;
+
+ int currentOperation;
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ tCompact = null;
+
+ startedCompactingLatch = new ReusableLatch(1);
+
+ releaseCompactingLatch = new ReusableLatch(1);
+
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ setup(2, 60 * 1024, false);
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+
+ File testDir = new File(getTestDir());
+
+ File files[] = testDir.listFiles(new FilenameFilter()
+ {
+
+ public boolean accept(final File dir, final String name)
+ {
+ return name.startsWith(filePrefix) && name.endsWith(fileExtension);
+ }
+ });
+
+ for (File file : files)
+ {
+ Assert.assertEquals("File " + file + " doesn't have the expected number of bytes", fileSize, file.length());
+ }
+
+ super.tearDown();
+ }
+
+ @Override
+ public void createJournal() throws Exception
+ {
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ startedCompactingLatch.countDown();
+ try
+ {
+ releaseCompactingLatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ journal.setAutoReclaim(false);
+ }
+
+ public void testMixOperations() throws Exception
+ {
+
+
+ currentOperation = 0;
+ internalTest();
+ int MAX_OPERATIONS = testMix(-1, -1, -1);
+
+ System.out.println("Using MAX_OPERATIONS = " + MAX_OPERATIONS);
+
+ for (int startAt = 0; startAt < MAX_OPERATIONS; startAt++)
+ {
+ for (int joinAt = startAt; joinAt < MAX_OPERATIONS; joinAt++)
+ {
+ for (int secondAt = joinAt ; secondAt < MAX_OPERATIONS; secondAt++)
+ {
+ System.out.println("start=" + startAt + ", join=" + joinAt + ", second=" + secondAt);
+
+ try
+ {
+ tearDown();
+ setUp();
+ testMix(startAt, joinAt, secondAt);
+ }
+ catch (Throwable e)
+ {
+ throw new Exception("Error at compact=" + startCompactAt +
+ ", joinCompactAt=" +
+ joinCompactAt +
+ ", secondCompactAt=" +
+ secondCompactAt, e);
+ }
+ }
+ }
+ }
+ }
+
+ protected int testMix(final int startAt, final int joinAt, final int secondAt) throws Exception
+ {
+ startCompactAt = startAt;
+ joinCompactAt = joinAt;
+ secondCompactAt = secondAt;
+
+ currentOperation = 0;
+
+ internalTest();
+
+ return currentOperation;
+ }
+
+ @Override
+ protected void beforeJournalOperation() throws Exception
+ {
+ checkJournalOperation();
+ }
+
+ /**
+ * @throws InterruptedException
+ * @throws Exception
+ */
+ protected void checkJournalOperation() throws InterruptedException, Exception
+ {
+ if (startCompactAt == currentOperation)
+ {
+ threadCompact();
+ }
+ if (joinCompactAt == currentOperation)
+ {
+ joinCompact();
+ }
+ if (secondCompactAt == currentOperation)
+ {
+ journal.compact();
+ }
+
+ currentOperation++;
+ }
+
+ protected abstract void internalTest() throws Exception;
+
+ /**
+ * @param releaseCompactingLatch
+ * @param tCompact
+ * @throws InterruptedException
+ */
+ private void joinCompact() throws InterruptedException
+ {
+ releaseCompactingLatch.countDown();
+
+ tCompact.join();
+
+ tCompact = null;
+ }
+
+ /**
+ * @param startedCompactingLatch
+ * @return
+ * @throws InterruptedException
+ */
+ private void threadCompact() throws InterruptedException
+ {
+ tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ tCompact.start();
+
+ startedCompactingLatch.await();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+ */
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new NIOSequentialFileFactory(getTestDir());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -35,6 +35,7 @@
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ReusableLatch;
/**
*
@@ -69,6 +70,12 @@
protected SequentialFileFactory fileFactory;
+ private ReusableLatch latchDone = new ReusableLatch(0);
+
+ private ReusableLatch latchWait = new ReusableLatch(0);
+
+ private Thread compactThread;
+
@Override
protected void setUp() throws Exception
{
@@ -144,10 +151,60 @@
public void createJournal() throws Exception
{
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+ @Override
+ public void onCompactDone()
+ {
+ latchDone.countDown();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ latchWait.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Waiting on Compact Done");
+ }
+ };
+
journal.setAutoReclaim(false);
}
+ // It will start compacting, but it will let the thread in wait mode at onCompactDone, so we can validate command
+ // executions
+ protected void startCompact() throws Exception
+ {
+ latchDone.setCount(1);
+ latchWait.setCount(1);
+ this.compactThread = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ this.compactThread.start();
+
+ latchDone.await();
+ }
+
+ protected void finishCompact() throws Exception
+ {
+ latchWait.countDown();
+ compactThread.join();
+ }
+
protected void startJournal() throws Exception
{
journal.start();
@@ -211,8 +268,6 @@
getTestDir() + "/output.log");
}
-
-
protected void loadAndCheck() throws Exception
{
loadAndCheck(false);
@@ -258,7 +313,7 @@
{
journal.load(null, null, null);
}
-
+
protected void beforeJournalOperation() throws Exception
{
}
More information about the hornetq-commits
mailing list