JBoss hornetq SVN: r9560 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-18 01:42:32 -0400 (Wed, 18 Aug 2010)
New Revision: 9560
Added:
tags/HornetQ_2_1_2_Final/
Removed:
tags/Pending_HornetQ_2_1_2_Final/
Log:
HornetQ 2.1.2 release
Copied: tags/HornetQ_2_1_2_Final (from rev 9559, tags/Pending_HornetQ_2_1_2_Final)
13 years, 9 months
JBoss hornetq SVN: r9559 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-18 00:36:11 -0400 (Wed, 18 Aug 2010)
New Revision: 9559
Added:
tags/Pending_HornetQ_2_1_2_Final/
Log:
Tagging release
Copied: tags/Pending_HornetQ_2_1_2_Final (from rev 9558, trunk)
13 years, 9 months
JBoss hornetq SVN: r9558 - trunk/docs.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-17 23:55:09 -0400 (Tue, 17 Aug 2010)
New Revision: 9558
Modified:
trunk/docs/README.html
Log:
Updating Readme
Modified: trunk/docs/README.html
===================================================================
--- trunk/docs/README.html 2010-08-18 03:27:48 UTC (rev 9557)
+++ trunk/docs/README.html 2010-08-18 03:55:09 UTC (rev 9558)
@@ -3,22 +3,22 @@
<head>
<meta content="text/html; charset=ISO-8859-1"
http-equiv="content-type">
- <title>HornetQ 2.1.0 Final Release Notes</title>
+ <title>HornetQ 2.1.2 Final Release Notes</title>
</head>
<body>
-<h1>Release Notes - HornetQ - Version 2.1.0 Final</h1>
+<h1>Release Notes - HornetQ - Version 2.1.2 Final</h1>
<br>
-<h2>15th June 2010</h2>
+<h2>17th Aug 2010</h2>
-These are the release notes for HornetQ 2.1.1 Final<br><br>
+These are the release notes for HornetQ 2.1.2 Final<br><br>
For full description of the contents please see the
-<a href="https://jira.jboss.org/secure/ConfigureReport.jspa?atl_token=2rfg6S5p_f&v...">HornetQ project JIRA</a>.<br><br>
+<a href="https://jira.jboss.org/secure/ReleaseNote.jspa?atl_token=09f0HuW8mF&versi...">HornetQ project JIRA</a>.<br><br>
-This release contains minor fixes required for the Application Server integration what would fix a few minor testcases.
+This release contains minor fixes required for the Application Server integration, and we have also fixed a few journal bugs improving the stability for persistent messages.
<br>
13 years, 9 months
JBoss hornetq SVN: r9557 - trunk/tests/src/org/hornetq/tests/stress/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-17 23:27:48 -0400 (Tue, 17 Aug 2010)
New Revision: 9557
Modified:
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
Removing System.out lines
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-18 02:45:12 UTC (rev 9556)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-18 03:27:48 UTC (rev 9557)
@@ -123,9 +123,9 @@
{
protected void onCompactLock() throws Exception
{
- System.out.println("OnCompactLock");
+ // System.out.println("OnCompactLock");
journal.forceMoveNextFile(false);
- System.out.println("OnCompactLock done");
+ // System.out.println("OnCompactLock done");
}
protected void onCompactStart() throws Exception
@@ -136,7 +136,7 @@
{
try
{
- System.out.println("OnCompactStart enter");
+ // System.out.println("OnCompactStart enter");
if (running)
{
long id = idGen.generateID();
@@ -144,7 +144,7 @@
journal.forceMoveNextFile(false);
journal.appendDeleteRecord(id, id == 20);
}
- System.out.println("OnCompactStart leave");
+ // System.out.println("OnCompactStart leave");
}
catch (Exception e)
{
@@ -481,7 +481,7 @@
// Append
for (int i = 0; running & i < ids.length; i++)
{
- System.out.println("append slow");
+ // System.out.println("append slow");
ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
maxRecords.acquire();
journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
@@ -492,7 +492,7 @@
// Delete
for (int i = 0; running & i < ids.length; i++)
{
- System.out.println("Deleting");
+ // System.out.println("Deleting");
maxRecords.release();
journal.appendDeleteRecord(ids[i], false);
numberOfDeletes.incrementAndGet();
13 years, 9 months
JBoss hornetq SVN: r9556 - in trunk: src/main/org/hornetq/utils and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-17 22:45:12 -0400 (Tue, 17 Aug 2010)
New Revision: 9556
Added:
trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/utils/ReusableLatch.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
Log:
Adding a few more tests on compacting & minor changes just making sure about everything is all right
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -1570,8 +1570,8 @@
}
catch (Throwable e)
{
- log.warn("Error on reading compacting for " + file);
- throw new Exception("Error on reading compacting for " + file, e);
+ log.warn("Error on reading compacting for " + file);
+ throw new Exception("Error on reading compacting for " + file, e);
}
}
@@ -1644,6 +1644,10 @@
{
liveTransaction.merge(newTransaction);
}
+ else
+ {
+ log.warn("Couldn't find tx=" + newTransaction.getId() + " to merge after compacting");
+ }
}
}
finally
@@ -2114,6 +2118,12 @@
*/
public boolean checkReclaimStatus() throws Exception
{
+
+ if (compactorRunning.get())
+ {
+ return false;
+ }
+
// We can't start reclaim while compacting is working
compactingLock.readLock().lock();
try
@@ -2191,6 +2201,12 @@
}
return true;
}
+ else
+ {
+ // We only cleanup the first files
+ // if a middle file needs cleanup it will be done through compacting
+ break;
+ }
}
}
}
@@ -2298,10 +2314,10 @@
controlFile.delete();
final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
-
+
if (trace)
{
- trace("Adding free file back from cleanup" + retJournalfile);
+ trace("Adding free file back from cleanup" + retJournalfile);
}
filesExecutor.execute(new Runnable()
@@ -3280,7 +3296,7 @@
{
return;
}
-
+
if (autoReclaim && !compactorRunning.get())
{
filesExecutor.execute(new Runnable()
Modified: trunk/src/main/org/hornetq/utils/ReusableLatch.java
===================================================================
--- trunk/src/main/org/hornetq/utils/ReusableLatch.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/src/main/org/hornetq/utils/ReusableLatch.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -50,6 +50,11 @@
{
return getState();
}
+
+ public void setCount(final int count)
+ {
+ setState(count);
+ }
@Override
public int tryAcquireShared(final int numberOfAqcquires)
@@ -107,6 +112,11 @@
{
return control.getCount();
}
+
+ public void setCount(final int count)
+ {
+ control.setCount(count);
+ }
public void countUp()
{
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -237,57 +237,93 @@
{
setup(2, 60 * 1024, false);
-
+
createJournal();
-
+
startJournal();
-
+
load();
-
+
add(1);
-
+
updateTx(2, 1);
-
+
rollback(2);
+
+ journal.compact();
+
+ stopJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ stopJournal();
+
+ }
+
+ public void testCompactSecondFileReclaimed() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ addTx(1, 1, 2, 3, 4);
+
+ journal.forceMoveNextFile();
+
+ addTx(1, 5, 6, 7, 8);
+ commit(1);
+
+ journal.forceMoveNextFile();
+
journal.compact();
+ add(10);
+
stopJournal();
-
+
startJournal();
-
+
loadAndCheck();
-
+
stopJournal();
}
-
public void testIncompleteTXDuringcompact() throws Exception
{
setup(2, 60 * 1024, false);
-
+
createJournal();
-
+
startJournal();
-
+
load();
-
+
add(1);
-
+
updateTx(2, 1);
-
+
journal.compact();
-
+
+ journal.compact();
+
commit(2);
-
+
stopJournal();
-
+
startJournal();
-
+
loadAndCheck();
-
+
stopJournal();
}
@@ -625,31 +661,7 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
-
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
+ createJournal();
journal.setAutoReclaim(false);
startJournal();
@@ -665,26 +677,8 @@
addTx(consumerTX, firstID);
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
+ startCompact();
- tCompact.start();
-
- reusableLatchDone.await();
-
addTx(appendTX, addedRecord);
commit(appendTX);
@@ -695,10 +689,8 @@
delete(addedRecord);
- reusableLatchWait.countDown();
+ finishCompact();
- tCompact.join();
-
journal.forceMoveNextFile();
long newRecord = idGen.generateID();
@@ -723,31 +715,8 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
+ createJournal();
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
journal.setAutoReclaim(false);
startJournal();
@@ -763,26 +732,8 @@
addTx(consumerTX, firstID);
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
+ startCompact();
- tCompact.start();
-
- reusableLatchDone.await();
-
addTx(appendTX, addedRecord);
commit(appendTX);
updateTx(consumerTX, addedRecord);
@@ -794,10 +745,8 @@
commit(deleteTXID);
- reusableLatchWait.countDown();
+ finishCompact();
- tCompact.join();
-
journal.forceMoveNextFile();
journal.compact();
@@ -816,31 +765,8 @@
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
- final ReusableLatch reusableLatchDone = new ReusableLatch();
- reusableLatchDone.countUp();
- final ReusableLatch reusableLatchWait = new ReusableLatch();
- reusableLatchWait.countUp();
+ createJournal();
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- reusableLatchDone.countDown();
- System.out.println("Waiting on Compact");
- try
- {
- reusableLatchWait.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("Done");
- }
- };
-
journal.setAutoReclaim(false);
startJournal();
@@ -856,34 +782,14 @@
updateTx(consumerTX, firstID);
- Thread tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
+ startCompact();
- tCompact.start();
-
- reusableLatchDone.await();
-
addTx(consumerTX, addedRecord);
commit(consumerTX);
delete(addedRecord);
- reusableLatchWait.countDown();
+ finishCompact();
- tCompact.join();
-
journal.compact();
stopJournal();
@@ -991,6 +897,63 @@
}
+ public void testCompactAddAndUpdateFollowedByADelete6() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+ createJournal();
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+ long tx0 = idGen.generateID();
+
+ long tx1 = idGen.generateID();
+
+ long add1 = idGen.generateID();
+
+ long add2 = idGen.generateID();
+
+ startCompact();
+
+ addTx(tx0, add1);
+
+ rollback(tx0);
+
+ addTx(tx1, add1, add2);
+ commit(tx1);
+
+ finishCompact();
+
+ long tx2 = idGen.generateID();
+
+ updateTx(tx2, add1, add2);
+ commit(tx2);
+
+ delete(add1);
+
+ startCompact();
+
+ delete(add2);
+
+ finishCompact();
+
+ journal.forceMoveNextFile();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
public void testDeleteWhileCleanup() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactStressTest.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -13,16 +13,7 @@
package org.hornetq.tests.stress.journal;
-import java.io.File;
-import java.io.FilenameFilter;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
-import org.hornetq.utils.SimpleIDGenerator;
-import org.hornetq.utils.ReusableLatch;
-
/**
* A NIORandomCompactTest
*
@@ -30,167 +21,9 @@
*
*
*/
-public class AllPossibilitiesCompactStressTest extends JournalImplTestBase
+public class AllPossibilitiesCompactStressTest extends MixupCompactorBase
{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private ReusableLatch startedCompactingLatch = null;
-
- private ReusableLatch releaseCompactingLatch = null;
-
- private Thread tCompact = null;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- tCompact = null;
-
- startedCompactingLatch = new ReusableLatch();
-
- releaseCompactingLatch = new ReusableLatch();
-
- File file = new File(getTestDir());
-
- deleteDirectory(file);
-
- file.mkdir();
- }
-
- protected void tearDown() throws Exception
- {
-
- File testDir = new File(getTestDir());
-
- File files[] = testDir.listFiles(new FilenameFilter()
- {
-
- public boolean accept(File dir, String name)
- {
- return name.startsWith(filePrefix) && name.endsWith(fileExtension);
- }
- });
-
- for (File file : files)
- {
- assertEquals("File " + file + " doesn't have the expected number of bytes", fileSize, file.length());
- }
-
- super.tearDown();
- }
-
- int startCompactAt;
-
- int joinCompactAt;
-
- int secondCompactAt;
-
- int currentOperation;
-
- SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
-
- public void createJournal() throws Exception
- {
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
- {
-
- @Override
- public void onCompactDone()
- {
- startedCompactingLatch.countDown();
- try
- {
- releaseCompactingLatch.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- }
- };
-
- journal.setAutoReclaim(false);
- }
-
-
- public void testMixOperations() throws Exception
- {
-
- setup(2, 60 * 1024, false);
-
- startCompactAt = joinCompactAt = secondCompactAt = -1;
-
- currentOperation = 0;
- internalTest();
- int MAX_OPERATIONS = currentOperation;
-
- System.out.println("Using MAX_OPERATIONS = " + MAX_OPERATIONS);
-
- for (startCompactAt = 0; startCompactAt < MAX_OPERATIONS; startCompactAt++)
- {
- for (joinCompactAt = startCompactAt; joinCompactAt < MAX_OPERATIONS; joinCompactAt++)
- {
- for (secondCompactAt = joinCompactAt; secondCompactAt < MAX_OPERATIONS; secondCompactAt++)
- {
- System.out.println("start=" + startCompactAt + ", join=" + joinCompactAt + ", second=" + secondCompactAt);
-
- currentOperation = 0;
- try
- {
- tearDown();
- setUp();
- internalTest();
- }
- catch (Throwable e)
- {
- throw new Exception("Error at compact=" + startCompactAt +
- ", joinCompactAt=" +
- joinCompactAt +
- ", secondCompactAt=" +
- secondCompactAt, e);
- }
- }
- }
- }
- }
-
- protected void beforeJournalOperation() throws Exception
- {
- checkJournalOperation();
- }
-
- /**
- * @throws InterruptedException
- * @throws Exception
- */
- private void checkJournalOperation() throws InterruptedException, Exception
- {
- if (startCompactAt == currentOperation)
- {
- threadCompact();
- }
- if (joinCompactAt == currentOperation)
- {
- joinCompact();
- }
- if (secondCompactAt == currentOperation)
- {
- journal.compact();
- }
-
- currentOperation++;
- }
-
public void internalTest() throws Exception
{
createJournal();
@@ -276,63 +109,4 @@
stopJournal();
}
- /**
- * @param releaseCompactingLatch
- * @param tCompact
- * @throws InterruptedException
- */
- private void joinCompact() throws InterruptedException
- {
- releaseCompactingLatch.countDown();
-
- tCompact.join();
-
- tCompact = null;
- }
-
- /**
- * @param startedCompactingLatch
- * @return
- * @throws InterruptedException
- */
- private void threadCompact() throws InterruptedException
- {
- tCompact = new Thread()
- {
- @Override
- public void run()
- {
- try
- {
- journal.compact();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
-
- tCompact.start();
-
- startedCompactingLatch.await();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
- */
- @Override
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- return new NIOSequentialFileFactory(getTestDir());
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Added: trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AllPossibilitiesCompactWithAddDeleteStressTest.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+/**
+ * A NIORandomCompactTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class AllPossibilitiesCompactWithAddDeleteStressTest extends MixupCompactorBase
+{
+
+ public void internalTest() throws Exception
+ {
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ long tx0 = idGen.generateID();
+
+ long tx1 = idGen.generateID();
+
+ long add1 = idGen.generateID();
+
+ long add2 = idGen.generateID();
+
+ addTx(tx0, add1);
+
+ rollback(tx0);
+
+ addTx(tx1, add1, add2);
+
+ commit(tx1);
+
+ long tx2 = idGen.generateID();
+
+ updateTx(tx2, add1, add2);
+
+ commit(tx2);
+
+ delete(add1);
+
+ delete(add2);
+
+ checkJournalOperation();
+
+ stopJournal();
+
+ createJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ stopJournal();
+
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -112,7 +112,7 @@
maxAIO = ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO;
}
- journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
+ journal = new JournalImpl(50 * 1024,
20,
15,
ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
@@ -127,7 +127,7 @@
journal.forceMoveNextFile(false);
System.out.println("OnCompactLock done");
}
-
+
protected void onCompactStart() throws Exception
{
testExecutor.execute(new Runnable()
@@ -325,7 +325,7 @@
long rollbackTXID = JournalCleanupCompactStressTest.idGen.generateID();
final long ids[] = new long[txSize];
-
+
for (int i = 0; i < txSize; i++)
{
ids[i] = JournalCleanupCompactStressTest.idGen.generateID();
@@ -341,7 +341,7 @@
maxRecords.acquire();
}
journal.appendCommitRecord(txID, true, ctx);
-
+
ctx.executeOnCompletion(new IOAsyncTask()
{
@@ -408,6 +408,11 @@
ids = new long[txSize];
}
}
+
+ if (txCount > 0)
+ {
+ journal.appendCommitRecord(txID, true);
+ }
}
catch (Exception e)
{
Added: trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/MixupCompactorBase.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -0,0 +1,270 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.io.File;
+import java.io.FilenameFilter;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.hornetq.utils.ReusableLatch;
+import org.hornetq.utils.SimpleIDGenerator;
+
+/**
+ * This class will control mix up compactor between each operation of a test
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class MixupCompactorBase extends JournalImplTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private ReusableLatch startedCompactingLatch = null;
+
+ private ReusableLatch releaseCompactingLatch = null;
+
+ private Thread tCompact = null;
+
+ int startCompactAt;
+
+ int joinCompactAt;
+
+ int secondCompactAt;
+
+ int currentOperation;
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ tCompact = null;
+
+ startedCompactingLatch = new ReusableLatch(1);
+
+ releaseCompactingLatch = new ReusableLatch(1);
+
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ setup(2, 60 * 1024, false);
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+
+ File testDir = new File(getTestDir());
+
+ File files[] = testDir.listFiles(new FilenameFilter()
+ {
+
+ public boolean accept(final File dir, final String name)
+ {
+ return name.startsWith(filePrefix) && name.endsWith(fileExtension);
+ }
+ });
+
+ for (File file : files)
+ {
+ Assert.assertEquals("File " + file + " doesn't have the expected number of bytes", fileSize, file.length());
+ }
+
+ super.tearDown();
+ }
+
+ @Override
+ public void createJournal() throws Exception
+ {
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ startedCompactingLatch.countDown();
+ try
+ {
+ releaseCompactingLatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ journal.setAutoReclaim(false);
+ }
+
+ public void testMixOperations() throws Exception
+ {
+
+
+ currentOperation = 0;
+ internalTest();
+ int MAX_OPERATIONS = testMix(-1, -1, -1);
+
+ System.out.println("Using MAX_OPERATIONS = " + MAX_OPERATIONS);
+
+ for (int startAt = 0; startAt < MAX_OPERATIONS; startAt++)
+ {
+ for (int joinAt = startAt; joinAt < MAX_OPERATIONS; joinAt++)
+ {
+ for (int secondAt = joinAt ; secondAt < MAX_OPERATIONS; secondAt++)
+ {
+ System.out.println("start=" + startAt + ", join=" + joinAt + ", second=" + secondAt);
+
+ try
+ {
+ tearDown();
+ setUp();
+ testMix(startAt, joinAt, secondAt);
+ }
+ catch (Throwable e)
+ {
+ throw new Exception("Error at compact=" + startCompactAt +
+ ", joinCompactAt=" +
+ joinCompactAt +
+ ", secondCompactAt=" +
+ secondCompactAt, e);
+ }
+ }
+ }
+ }
+ }
+
+ protected int testMix(final int startAt, final int joinAt, final int secondAt) throws Exception
+ {
+ startCompactAt = startAt;
+ joinCompactAt = joinAt;
+ secondCompactAt = secondAt;
+
+ currentOperation = 0;
+
+ internalTest();
+
+ return currentOperation;
+ }
+
+ @Override
+ protected void beforeJournalOperation() throws Exception
+ {
+ checkJournalOperation();
+ }
+
+ /**
+ * @throws InterruptedException
+ * @throws Exception
+ */
+ protected void checkJournalOperation() throws InterruptedException, Exception
+ {
+ if (startCompactAt == currentOperation)
+ {
+ threadCompact();
+ }
+ if (joinCompactAt == currentOperation)
+ {
+ joinCompact();
+ }
+ if (secondCompactAt == currentOperation)
+ {
+ journal.compact();
+ }
+
+ currentOperation++;
+ }
+
+ protected abstract void internalTest() throws Exception;
+
+ /**
+ * @param releaseCompactingLatch
+ * @param tCompact
+ * @throws InterruptedException
+ */
+ private void joinCompact() throws InterruptedException
+ {
+ releaseCompactingLatch.countDown();
+
+ tCompact.join();
+
+ tCompact = null;
+ }
+
+ /**
+ * @param startedCompactingLatch
+ * @return
+ * @throws InterruptedException
+ */
+ private void threadCompact() throws InterruptedException
+ {
+ tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ tCompact.start();
+
+ startedCompactingLatch.await();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.tests.unit.core.journal.impl.JournalImplTestBase#getFileFactory()
+ */
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new NIOSequentialFileFactory(getTestDir());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-17 05:56:48 UTC (rev 9555)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2010-08-18 02:45:12 UTC (rev 9556)
@@ -35,6 +35,7 @@
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ReusableLatch;
/**
*
@@ -69,6 +70,12 @@
protected SequentialFileFactory fileFactory;
+ private ReusableLatch latchDone = new ReusableLatch(0);
+
+ private ReusableLatch latchWait = new ReusableLatch(0);
+
+ private Thread compactThread;
+
@Override
protected void setUp() throws Exception
{
@@ -144,10 +151,60 @@
public void createJournal() throws Exception
{
- journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+ @Override
+ public void onCompactDone()
+ {
+ latchDone.countDown();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ latchWait.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Waiting on Compact Done");
+ }
+ };
+
journal.setAutoReclaim(false);
}
+ // It will start compacting, but it will let the thread in wait mode at onCompactDone, so we can validate command
+ // executions
+ protected void startCompact() throws Exception
+ {
+ latchDone.setCount(1);
+ latchWait.setCount(1);
+ this.compactThread = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ this.compactThread.start();
+
+ latchDone.await();
+ }
+
+ protected void finishCompact() throws Exception
+ {
+ latchWait.countDown();
+ compactThread.join();
+ }
+
protected void startJournal() throws Exception
{
journal.start();
@@ -211,8 +268,6 @@
getTestDir() + "/output.log");
}
-
-
protected void loadAndCheck() throws Exception
{
loadAndCheck(false);
@@ -258,7 +313,7 @@
{
journal.load(null, null, null);
}
-
+
protected void beforeJournalOperation() throws Exception
{
}
13 years, 9 months
JBoss hornetq SVN: r9555 - trunk/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-17 01:56:48 -0400 (Tue, 17 Aug 2010)
New Revision: 9555
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
Fixing a test exclusive method that was breaking a stress-test
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-17 03:10:18 UTC (rev 9554)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-17 05:56:48 UTC (rev 9555)
@@ -2467,7 +2467,7 @@
{
// Send something to the closingExecutor, just to make sure we went
// until its end
- final CountDownLatch latch = new CountDownLatch(2);
+ final CountDownLatch latch = new CountDownLatch(1);
filesExecutor.execute(new Runnable()
{
@@ -2476,14 +2476,6 @@
latch.countDown();
}
});
-
- compactorExecutor.execute(new Runnable()
- {
- public void run()
- {
- latch.countDown();
- }
- });
latch.await();
}
13 years, 9 months
JBoss hornetq SVN: r9554 - in trunk: tests/src/org/hornetq/tests/integration/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-16 23:10:18 -0400 (Mon, 16 Aug 2010)
New Revision: 9554
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
Log:
fixing a stress test and removing unused code
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-17 02:28:27 UTC (rev 9553)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-17 03:10:18 UTC (rev 9554)
@@ -96,15 +96,12 @@
private static final boolean trace = log.isTraceEnabled();
- /** This is to be set to true at DEBUG & development only */
- private static final boolean LOAD_TRACE = false;
-
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
private static final void trace(final String message)
{
- log.info(message);
+ log.trace(message);
}
// The sizes of primitive types
@@ -827,15 +824,6 @@
try
{
- if (JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("appendAddRecord id = " + id +
- ", recordType = " +
- recordType +
- " compacting = " +
- (compactor != null));
- }
-
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
if (callback != null)
@@ -893,10 +881,6 @@
final boolean sync,
final IOCompletion callback) throws Exception
{
- if (JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("appendUpdateRecord id = " + id + ", recordType = " + recordType);
- }
if (state != JournalImpl.STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -964,10 +948,6 @@
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception
{
- if (JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("appendDeleteRecord id = " + id);
- }
if (state != JournalImpl.STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -1051,17 +1031,6 @@
try
{
- if (JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("appendAddRecordTransactional txID " + txID +
- ", id = " +
- id +
- ", recordType = " +
- recordType +
- ", compacting " +
- (this.compactor != null));
- }
-
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
JournalTransaction tx = getTransactionInfo(txID);
@@ -1106,17 +1075,6 @@
try
{
- if (JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("appendUpdateRecordTransactional txID " + txID +
- ", id = " +
- id +
- ", recordType = " +
- recordType +
- ", compacting = " +
- (compactor != null));
- }
-
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
JournalTransaction tx = getTransactionInfo(txID);
@@ -1146,11 +1104,6 @@
public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
{
- if (JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("appendDeleteRecordTransactional txID " + txID + ", id = " + id);
- }
-
if (state != JournalImpl.STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
@@ -1246,11 +1199,6 @@
try
{
- if (JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("appendPrepareRecord txID " + txID + ", compacting = " + (compactor != null));
- }
-
JournalInternalRecord prepareRecord = new JournalCompleteRecordTX(false, txID, transactionData);
if (callback != null)
@@ -1319,11 +1267,6 @@
try
{
- if (JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("appendCommitRecord txID " + txID + ", compacting = " + (compactor != null));
- }
-
if (tx == null)
{
throw new IllegalStateException("Cannot find tx with id " + txID);
@@ -1697,13 +1640,8 @@
JournalImpl.trace("Merging pending transaction " + newTransaction + " after compacting the journal");
}
JournalTransaction liveTransaction = transactions.get(newTransaction.getId());
- if (liveTransaction == null)
+ if (liveTransaction != null)
{
- JournalImpl.log.warn("Inconsistency: Can't merge transaction " + newTransaction.getId() +
- " back into JournalTransactions");
- }
- else
- {
liveTransaction.merge(newTransaction);
}
}
@@ -1828,11 +1766,6 @@
public void onReadAddRecord(final RecordInfo info) throws Exception
{
- if (JournalImpl.trace && JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("AddRecord: " + info);
- }
-
checkID(info.id);
hasData.set(true);
@@ -1844,11 +1777,6 @@
public void onReadUpdateRecord(final RecordInfo info) throws Exception
{
- if (JournalImpl.trace && JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("UpdateRecord: " + info);
- }
-
checkID(info.id);
hasData.set(true);
@@ -1869,10 +1797,6 @@
public void onReadDeleteRecord(final long recordID) throws Exception
{
- if (JournalImpl.trace && JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("DeleteRecord: " + recordID);
- }
hasData.set(true);
loadManager.deleteRecord(recordID);
@@ -1893,13 +1817,6 @@
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (JournalImpl.trace && JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace((info.isUpdate ? "updateRecordTX: " : "addRecordTX: ") + info +
- ", txid = " +
- transactionID);
- }
-
checkID(info.id);
hasData.set(true);
@@ -1929,11 +1846,6 @@
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (JournalImpl.trace && JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("DeleteRecordTX: " + transactionID + " info = " + info);
- }
-
hasData.set(true);
TransactionHolder tx = loadTransactions.get(transactionID);
@@ -1962,11 +1874,6 @@
public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
{
- if (JournalImpl.trace && JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("prepareRecordTX: txid = " + transactionID);
- }
-
hasData.set(true);
TransactionHolder tx = loadTransactions.get(transactionID);
@@ -2008,11 +1915,6 @@
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
{
- if (JournalImpl.trace && JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("commitRecord: txid = " + transactionID);
- }
-
TransactionHolder tx = loadTransactions.remove(transactionID);
// The commit could be alone on its own journal-file and the
@@ -2069,11 +1971,6 @@
public void onReadRollbackRecord(final long transactionID) throws Exception
{
- if (JournalImpl.trace && JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("rollbackRecord: txid = " + transactionID);
- }
-
TransactionHolder tx = loadTransactions.remove(transactionID);
// The rollback could be alone on its own journal-file and the
@@ -2099,11 +1996,6 @@
public void markAsDataFile(final JournalFile file)
{
- if (JournalImpl.trace && JournalImpl.LOAD_TRACE)
- {
- JournalImpl.trace("Marking " + file + " as data file");
- }
-
hasData.set(true);
}
@@ -2575,7 +2467,7 @@
{
// Send something to the closingExecutor, just to make sure we went
// until its end
- final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(2);
filesExecutor.execute(new Runnable()
{
@@ -2584,6 +2476,14 @@
latch.countDown();
}
});
+
+ compactorExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ latch.countDown();
+ }
+ });
latch.await();
}
@@ -2661,7 +2561,7 @@
try
{
moveNextFile(synchronous);
- if (autoReclaim && !synchronous)
+ if (autoReclaim && synchronous)
{
checkReclaimStatus();
}
@@ -2707,7 +2607,7 @@
}
});
- compactorExecutor = Executors.newCachedThreadPool(new ThreadFactory()
+ compactorExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
{
public Thread newThread(Runnable r)
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-17 02:28:27 UTC (rev 9553)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-17 03:10:18 UTC (rev 9554)
@@ -262,6 +262,36 @@
}
+
+ public void testIncompleteTXDuringcompact() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ add(1);
+
+ updateTx(2, 1);
+
+ journal.compact();
+
+ commit(2);
+
+ stopJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ stopJournal();
+
+ }
+
private void internalCompactTest(final boolean preXA, // prepare before compact
final boolean postXA, // prepare after compact
final boolean regularAdd,
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2010-08-17 02:28:27 UTC (rev 9553)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java 2010-08-17 03:10:18 UTC (rev 9554)
@@ -117,6 +117,10 @@
{
System.out.println("DataFiles = " + journal.getDataFilesCount());
journal.forceMoveNextFile();
+ if (journal.getDataFilesCount() != 0)
+ {
+ System.out.println("DebugJournal:" + journal.debug());
+ }
Assert.assertEquals(0, journal.getDataFilesCount());
}
13 years, 9 months
JBoss hornetq SVN: r9553 - trunk/tests/src/org/hornetq/tests/stress/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-16 22:28:27 -0400 (Mon, 16 Aug 2010)
New Revision: 9553
Modified:
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
reverting test change done by mistake
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-16 23:52:29 UTC (rev 9552)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-17 02:28:27 UTC (rev 9553)
@@ -183,7 +183,7 @@
protected long getTotalTimeMilliseconds()
{
- return TimeUnit.MINUTES.toMillis(1);
+ return TimeUnit.MINUTES.toMillis(10);
}
public void testAppend() throws Exception
13 years, 9 months
JBoss hornetq SVN: r9552 - in trunk: src/main/org/hornetq/core/journal/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-08-16 19:52:29 -0400 (Mon, 16 Aug 2010)
New Revision: 9552
Modified:
trunk/src/main/org/hornetq/core/journal/TestableJournal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - another fix
Modified: trunk/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-16 14:31:38 UTC (rev 9551)
+++ trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-16 23:52:29 UTC (rev 9552)
@@ -49,6 +49,8 @@
void forceMoveNextFile() throws Exception;
+ void forceMoveNextFile(boolean synchronous) throws Exception;
+
void setAutoReclaim(boolean autoReclaim);
boolean isAutoReclaim();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-16 14:31:38 UTC (rev 9551)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-16 23:52:29 UTC (rev 9552)
@@ -23,8 +23,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,6 +33,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -105,7 +104,7 @@
// Journal
private static final void trace(final String message)
{
- log.trace(message);
+ log.info(message);
}
// The sizes of primitive types
@@ -191,7 +190,7 @@
private final LinkedBlockingDeque<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
- private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+ private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
@@ -651,11 +650,13 @@
// checkSize by some sort of calculated hash)
if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
- JournalImpl.trace("Record at position " + pos +
+ JournalImpl.trace("Record at position " + pos +
" recordType = " +
recordType +
- " possible transactionID = " + transactionID +
- " possible recordID = " + recordID +
+ " possible transactionID = " +
+ transactionID +
+ " possible recordID = " +
+ recordID +
" file:" +
file.getFile().getFileName() +
" is corrupted and it is being ignored (III)");
@@ -763,7 +764,7 @@
catch (Throwable e)
{
log.warn(e.getMessage(), e);
- throw new Exception (e.getMessage(), e);
+ throw new Exception(e.getMessage(), e);
}
finally
{
@@ -828,7 +829,11 @@
{
if (JournalImpl.LOAD_TRACE)
{
- JournalImpl.trace("appendAddRecord id = " + id + ", recordType = " + recordType + " compacting = " + (compactor != null));
+ JournalImpl.trace("appendAddRecord id = " + id +
+ ", recordType = " +
+ recordType +
+ " compacting = " +
+ (compactor != null));
}
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
@@ -969,12 +974,11 @@
}
compactingLock.readLock().lock();
-
try
{
JournalRecord record = null;
-
+
if (compactor == null)
{
record = records.remove(id);
@@ -1049,12 +1053,13 @@
{
if (JournalImpl.LOAD_TRACE)
{
- JournalImpl.trace("appendAddRecordTransactional txID " + txID +
+ JournalImpl.trace("appendAddRecordTransactional txID " + txID +
", id = " +
id +
", recordType = " +
recordType +
- ", compacting " + (this.compactor != null));
+ ", compacting " +
+ (this.compactor != null));
}
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
@@ -1107,7 +1112,9 @@
", id = " +
id +
", recordType = " +
- recordType + ", compacting = " + (compactor != null));
+ recordType +
+ ", compacting = " +
+ (compactor != null));
}
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
@@ -1462,7 +1469,7 @@
private void checkDeleteSize()
{
- // HORNETQ-482 - Flush deletes only if memory is critical
+ // HORNETQ-482 - Flush deletes only if memory is critical
if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() < (runtime.maxMemory() * 0.2)))
{
log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
@@ -1480,7 +1487,7 @@
}
recordsToDelete.clear();
-
+
log.debug("flush delete done");
}
}
@@ -1568,6 +1575,8 @@
return;
}
+ onCompactLock();
+
setAutoReclaim(false);
// We need to move to the next file, as we need a clear start for negatives and positives counts
@@ -1612,7 +1621,15 @@
// well
for (final JournalFile file : dataFilesToProcess)
{
- JournalImpl.readJournalFile(fileFactory, file, compactor);
+ try
+ {
+ JournalImpl.readJournalFile(fileFactory, file, compactor);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on reading compacting for " + file);
+ throw new Exception("Error on reading compacting for " + file, e);
+ }
}
compactor.flush();
@@ -1634,6 +1651,8 @@
// Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
compactor = null;
+ onCompactLock();
+
newDatafiles = localCompactor.getNewDataFiles();
// Restore newRecords created during compacting
@@ -1664,7 +1683,7 @@
{
newTransaction.replaceRecordProvider(this);
}
-
+
localCompactor.replayPendingCommands();
// Merge transactions back after compacting
@@ -2300,13 +2319,11 @@
return;
}
- compactingLock.readLock().lock();
-
try
{
JournalCleaner cleaner = null;
ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
- lockAppend.lock();
+ compactingLock.writeLock().lock();
try
{
@@ -2345,11 +2362,21 @@
}
finally
{
- lockAppend.unlock();
+ compactingLock.writeLock().unlock();
}
- JournalImpl.readJournalFile(fileFactory, file, cleaner);
+ compactingLock.readLock().lock();
+ try
+ {
+ JournalImpl.readJournalFile(fileFactory, file, cleaner);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error reading cleanup on " + file, e);
+ throw new Exception("Error reading cleanup on " + file, e);
+ }
+
cleaner.flush();
// pointcut for tests
@@ -2379,6 +2406,11 @@
controlFile.delete();
final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
+
+ if (trace)
+ {
+ trace("Adding free file back from cleanup" + retJournalfile);
+ }
filesExecutor.execute(new Runnable()
{
@@ -2616,14 +2648,20 @@
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
{
+ forceMoveNextFile(true);
+ }
+
+ // In some tests we need to force the journal to move to a next file
+ public void forceMoveNextFile(final boolean synchronous) throws Exception
+ {
compactingLock.readLock().lock();
try
{
lockAppend.lock();
try
{
- moveNextFile(true);
- if (autoReclaim)
+ moveNextFile(synchronous);
+ if (autoReclaim && !synchronous)
{
checkReclaimStatus();
}
@@ -2660,10 +2698,24 @@
throw new IllegalStateException("Journal is not stopped");
}
- filesExecutor = Executors.newSingleThreadExecutor();
+ filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
+ {
- compactorExecutor = Executors.newCachedThreadPool();
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "JournalImpl::FilesExecutor");
+ }
+ });
+ compactorExecutor = Executors.newCachedThreadPool(new ThreadFactory()
+ {
+
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "JournalImpl::CompactorExecutor");
+ }
+ });
+
fileFactory.start();
state = JournalImpl.STATE_STARTED;
@@ -2818,6 +2870,12 @@
{
}
+ /** This is an interception point for testcases, when the compacted files are written, to be called
+ * as soon as the compactor gets a writeLock */
+ protected void onCompactLock() throws Exception
+ {
+ }
+
/** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
protected void onCompactDone()
{
@@ -2843,6 +2901,11 @@
{
// Re-initialise it
+ if (trace)
+ {
+ trace("Adding free file " + file);
+ }
+
JournalFile jf = reinitializeFile(file);
if (renameTmp)
@@ -3182,6 +3245,11 @@
sequentialFile.close();
+ if (JournalImpl.trace)
+ {
+ JournalImpl.trace("Renaming file " + tmpFileName + " as " + fileName);
+ }
+
sequentialFile.renameTo(fileName);
if (keepOpened)
@@ -3289,7 +3357,7 @@
filesExecutor.execute(run);
}
- if (autoReclaim && !synchronous)
+ if (!synchronous)
{
scheduleReclaim();
}
@@ -3320,26 +3388,28 @@
{
return;
}
-
- filesExecutor.execute(new Runnable()
+
+ if (autoReclaim && !compactorRunning.get())
{
- public void run()
+ filesExecutor.execute(new Runnable()
{
- try
+ public void run()
{
- drainClosedFiles();
-
- if (!checkReclaimStatus())
+ try
{
- checkCompact();
+ drainClosedFiles();
+ if (!checkReclaimStatus())
+ {
+ checkCompact();
+ }
}
+ catch (Exception e)
+ {
+ JournalImpl.log.error(e.getMessage(), e);
+ }
}
- catch (Exception e)
- {
- JournalImpl.log.error(e.getMessage(), e);
- }
- }
- });
+ });
+ }
}
/**
@@ -3368,25 +3438,21 @@
final boolean tmpCompactExtension) throws Exception
{
JournalFile nextOpenedFile = null;
- try
+
+ nextOpenedFile = freeFiles.poll();
+
+ if (nextOpenedFile == null)
{
- nextOpenedFile = freeFiles.remove();
+ nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ }
+ else
+ {
if (tmpCompactExtension)
{
SequentialFile sequentialFile = nextOpenedFile.getFile();
sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
}
- }
- catch (NoSuchElementException ignored)
- {
- }
- if (nextOpenedFile == null)
- {
- nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
- }
- else
- {
if (keepOpened)
{
openFile(nextOpenedFile, multiAIO);
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-16 14:31:38 UTC (rev 9551)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-16 23:52:29 UTC (rev 9552)
@@ -121,6 +121,13 @@
"hq",
maxAIO)
{
+ protected void onCompactLock() throws Exception
+ {
+ System.out.println("OnCompactLock");
+ journal.forceMoveNextFile(false);
+ System.out.println("OnCompactLock done");
+ }
+
protected void onCompactStart() throws Exception
{
testExecutor.execute(new Runnable()
@@ -134,7 +141,7 @@
{
long id = idGen.generateID();
journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
- journal.forceMoveNextFile();
+ journal.forceMoveNextFile(false);
journal.appendDeleteRecord(id, id == 20);
}
System.out.println("OnCompactStart leave");
@@ -176,7 +183,7 @@
protected long getTotalTimeMilliseconds()
{
- return TimeUnit.MINUTES.toMillis(10);
+ return TimeUnit.MINUTES.toMillis(1);
}
public void testAppend() throws Exception
13 years, 9 months
JBoss hornetq SVN: r9551 - in trunk: tests/src/org/hornetq/tests/integration/client and 1 other directory.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-08-16 10:31:38 -0400 (Mon, 16 Aug 2010)
New Revision: 9551
Modified:
trunk/src/main/org/hornetq/core/postoffice/impl/AddressImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
Log:
HORNETQ-477 - Queues bound to wildcard addresses canoot be deleted
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/AddressImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/AddressImpl.java 2010-08-16 12:53:50 UTC (rev 9550)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/AddressImpl.java 2010-08-16 14:31:38 UTC (rev 9551)
@@ -62,7 +62,10 @@
public void addLinkedAddress(final Address address)
{
- linkedAddresses.add(address);
+ if(!linkedAddresses.contains(address))
+ {
+ linkedAddresses.add(address);
+ }
}
public void removeLinkedAddress(final Address actualAddress)
Modified: trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2010-08-16 12:53:50 UTC (rev 9550)
+++ trunk/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java 2010-08-16 14:31:38 UTC (rev 9551)
@@ -181,7 +181,7 @@
}
- public void _testQueueWithWildcard() throws Exception
+ public void testQueueWithWildcard() throws Exception
{
session.createQueue("a.b", "queue1");
session.createTemporaryQueue("a.#", "queue2");
@@ -221,7 +221,7 @@
}
- public void _testQueueWithWildcard2() throws Exception
+ public void testQueueWithWildcard2() throws Exception
{
session.createQueue("a.b", "queue1");
session.createTemporaryQueue("a.#", "queue2");
13 years, 9 months