Author: clebert.suconic(a)jboss.com
Date: 2010-07-16 00:47:08 -0400 (Fri, 16 Jul 2010)
New Revision: 9404
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - Fix on the journal
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-15 18:23:17 UTC
(rev 9403)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-16 04:47:08 UTC
(rev 9404)
@@ -904,15 +904,24 @@
try
{
- JournalRecord record = records.remove(id);
+ JournalRecord record = null;
+
+ if (compactor == null)
+ {
+ record = records.remove(id);
- if (record == null)
- {
- if (!(compactor != null && compactor.lookupRecord(id)))
+ if (record == null)
{
throw new IllegalStateException("Cannot find add info " + id);
}
}
+ else
+ {
+ if (!records.containsKey(id) && !compactor.lookupRecord(id))
+ {
+ throw new IllegalStateException("Cannot find add info " + id +
" on compactor or current records");
+ }
+ }
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
@@ -1464,7 +1473,7 @@
return;
}
- autoReclaim = false;
+ setAutoReclaim(false);
// We need to move to the next file, as we need a clear start for negatives
and positives counts
moveNextFile(true);
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-07-15
18:23:17 UTC (rev 9403)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-07-16
04:47:08 UTC (rev 9404)
@@ -317,18 +317,18 @@
{
for (JournalUpdate trDelete : neg)
{
- JournalImpl.JournalRecord posFiles =
journal.getRecords().remove(trDelete.id);
-
- if (posFiles != null)
+ if (compactor != null)
{
- posFiles.delete(trDelete.file);
+ compactor.addCommandDelete(trDelete.id, trDelete.file);
}
- else if (compactor != null &&
compactor.lookupRecord(trDelete.id))
+ else
{
- // This is a case where the transaction was opened after compacting was
started,
- // but the commit arrived while compacting was working
- // We need to cache the counter update, so compacting will take the
correct files when it is done
- compactor.addCommandDelete(trDelete.id, trDelete.file);
+ JournalImpl.JournalRecord posFiles =
journal.getRecords().remove(trDelete.id);
+
+ if (posFiles != null)
+ {
+ posFiles.delete(trDelete.file);
+ }
}
}
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-15
18:23:17 UTC (rev 9403)
+++
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-16
04:47:08 UTC (rev 9404)
@@ -563,8 +563,7 @@
}
- // This test is under investigation... disabled for now
- public void _testCompactAddAndUpdateFollowedByADelete() throws Exception
+ public void testCompactAddAndUpdateFollowedByADelete() throws Exception
{
setup(2, 60 * 1024, false);
@@ -601,15 +600,210 @@
startJournal();
load();
+
+ long consumerTX = idGen.generateID();
long firstID = idGen.generateID();
+
+ long appendTX = idGen.generateID();
+
+ long addedRecord = idGen.generateID();
+
+ addTx(consumerTX, firstID);
+
+ Thread tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ tCompact.start();
+
+
+ reusableLatchDone.waitCompletion();
+
+ addTx(appendTX, addedRecord);
+
+ commit(appendTX);
+
+ updateTx(consumerTX, addedRecord);
+
+ commit(consumerTX);
+
+ delete(addedRecord);
+
+ reusableLatchWait.down();
+
+ tCompact.join();
+
+ journal.forceMoveNextFile();
+
+ long newRecord = idGen.generateID();
+ add(newRecord);
+ update(newRecord);
+
+ journal.compact();
+
+ System.out.println("Debug after compact\n" + journal.debug());
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testCompactAddAndUpdateFollowedByADelete2() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+
+ final VariableLatch reusableLatchDone = new VariableLatch();
+ reusableLatchDone.up();
+ final VariableLatch reusableLatchWait = new VariableLatch();
+ reusableLatchWait.up();
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix,
fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ reusableLatchDone.down();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ reusableLatchWait.waitCompletion();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+ long firstID = idGen.generateID();
+
long consumerTX = idGen.generateID();
long appendTX = idGen.generateID();
long addedRecord = idGen.generateID();
+ addTx(consumerTX, firstID);
+
+
+ Thread tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+
+ tCompact.start();
+
+ reusableLatchDone.waitCompletion();
+
+ addTx(appendTX, addedRecord);
+ commit(appendTX);
+ updateTx(consumerTX, addedRecord);
+ commit(consumerTX);
+
+ long deleteTXID = idGen.generateID();
+
+ deleteTx(deleteTXID, addedRecord);
+
+ commit(deleteTXID);
+
+ reusableLatchWait.down();
+
+ tCompact.join();
+
+ journal.forceMoveNextFile();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testCompactAddAndUpdateFollowedByADelete3() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+
+ final VariableLatch reusableLatchDone = new VariableLatch();
+ reusableLatchDone.up();
+ final VariableLatch reusableLatchWait = new VariableLatch();
+ reusableLatchWait.up();
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix,
fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ reusableLatchDone.down();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ reusableLatchWait.waitCompletion();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+ long firstID = idGen.generateID();
+
+ long consumerTX = idGen.generateID();
+
+ long addedRecord = idGen.generateID();
+
add(firstID);
updateTx(consumerTX, firstID);
@@ -637,11 +831,102 @@
reusableLatchDone.waitCompletion();
+ addTx(consumerTX, addedRecord);
+ commit(consumerTX);
+ delete(addedRecord);
+
+ reusableLatchWait.down();
+
+ tCompact.join();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+
+ public void testCompactAddAndUpdateFollowedByADelete4() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+
+ final VariableLatch reusableLatchDone = new VariableLatch();
+ reusableLatchDone.up();
+ final VariableLatch reusableLatchWait = new VariableLatch();
+ reusableLatchWait.up();
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix,
fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ reusableLatchDone.down();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ reusableLatchWait.waitCompletion();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+ long consumerTX = idGen.generateID();
+
+ long firstID = idGen.generateID();
+
+ long appendTX = idGen.generateID();
+
+ long addedRecord = idGen.generateID();
+
+ Thread tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+
+ tCompact.start();
+
+
+ reusableLatchDone.waitCompletion();
+
+ addTx(consumerTX, firstID);
+
addTx(appendTX, addedRecord);
- updateTx(appendTX, addedRecord);
+
commit(appendTX);
+
updateTx(consumerTX, addedRecord);
+
commit(consumerTX);
+
delete(addedRecord);
reusableLatchWait.down();
@@ -656,6 +941,8 @@
journal.compact();
+ System.out.println("Debug after compact\n" + journal.debug());
+
stopJournal();
createJournal();
startJournal();