[hornetq-commits] JBoss hornetq SVN: r9538 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Aug 13 01:48:44 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-13 01:48:44 -0400 (Fri, 13 Aug 2010)
New Revision: 9538
Added:
trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java
trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
HORNETQ-440 - Fixing Invalid records when using Rollback
Modified: trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -25,7 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
+import org.hornetq.core.journal.impl.JournalRecord;
import org.hornetq.utils.Base64;
/**
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -27,13 +27,13 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.hornetq.core.logging.Logger;
/**
@@ -43,7 +43,7 @@
*
*
*/
-public class JournalCompactor extends AbstractJournalUpdateTask
+public class JournalCompactor extends AbstractJournalUpdateTask implements JournalRecordProvider
{
private static final Logger log = Logger.getLogger(JournalCompactor.class);
@@ -263,7 +263,7 @@
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (pendingTransactions.get(transactionID) != null)
+ if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id))
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -279,11 +279,6 @@
writeEncoder(record);
}
- else
- {
- // Will try it as a regular record, the method addRecord will validate if this is a live record or not
- onReadAddRecord(info);
- }
}
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
@@ -295,6 +290,20 @@
throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
" for an already committed transaction during compacting");
}
+ else
+ {
+ JournalTransaction newTransaction = newTransactions.remove(transactionID);
+ if (newTransaction != null)
+ {
+ JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, transactionID, null);
+
+ checkSize(commitRecord.getEncodeSize());
+
+ writeEncoder(commitRecord, newTransaction.getCounter(currentFile));
+
+ newTransaction.commit(currentFile);
+ }
+ }
}
public void onReadDeleteRecord(final long recordID) throws Exception
@@ -359,6 +368,22 @@
throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
" for an already rolled back transaction during compacting");
}
+ else
+ {
+ JournalTransaction newTransaction = newTransactions.remove(transactionID);
+ if (newTransaction != null)
+ {
+
+ JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID);
+
+ checkSize(rollbackRecord.getEncodeSize());
+
+ writeEncoder(rollbackRecord);
+
+ newTransaction.rollback(currentFile);
+ }
+
+ }
}
public void onReadUpdateRecord(final RecordInfo info) throws Exception
@@ -390,7 +415,7 @@
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (pendingTransactions.get(transactionID) != null)
+ if (pendingTransactions.get(transactionID) != null || lookupRecord(info.id))
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -421,7 +446,7 @@
JournalTransaction newTransaction = newTransactions.get(transactionID);
if (newTransaction == null)
{
- newTransaction = new JournalTransaction(transactionID, journal);
+ newTransaction = new JournalTransaction(transactionID, this);
newTransactions.put(transactionID, newTransaction);
}
return newTransaction;
@@ -538,4 +563,20 @@
}
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalRecordsProvider#getCompactor()
+ */
+ public JournalCompactor getCompactor()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalRecordsProvider#getRecords()
+ */
+ public Map<Long, JournalRecord> getRecords()
+ {
+ return newRecords;
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -79,7 +79,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class JournalImpl implements TestableJournal
+public class JournalImpl implements TestableJournal, JournalRecordProvider
{
// Constants -----------------------------------------------------
@@ -1668,6 +1668,8 @@
for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
{
+ newTransaction.replaceRecordProvider(this);
+
if (JournalImpl.trace)
{
JournalImpl.trace("Merging pending transaction " + newTransaction + " after compacting the journal");
@@ -3569,80 +3571,6 @@
// Inner classes
// ---------------------------------------------------------------------------
- /**
- * This holds the relationship a record has with other files in regard to reference counting.
- * Note: This class used to be called PosFiles
- *
- * Used on the ref-count for reclaiming */
- public static class JournalRecord
- {
- private final JournalFile addFile;
-
- private final int size;
-
- private List<Pair<JournalFile, Integer>> updateFiles;
-
- JournalRecord(final JournalFile addFile, final int size)
- {
- this.addFile = addFile;
-
- this.size = size;
-
- addFile.incPosCount();
-
- addFile.addSize(size);
- }
-
- void addUpdateFile(final JournalFile updateFile, final int size)
- {
- if (updateFiles == null)
- {
- updateFiles = new ArrayList<Pair<JournalFile, Integer>>();
- }
-
- updateFiles.add(new Pair<JournalFile, Integer>(updateFile, size));
-
- updateFile.incPosCount();
-
- updateFile.addSize(size);
- }
-
- void delete(final JournalFile file)
- {
- file.incNegCount(addFile);
- addFile.decSize(size);
-
- if (updateFiles != null)
- {
- for (Pair<JournalFile, Integer> updFile : updateFiles)
- {
- file.incNegCount(updFile.a);
- updFile.a.decSize(updFile.b);
- }
- }
- }
-
- public String toString()
- {
- StringBuffer buffer = new StringBuffer();
- buffer.append("JournalRecord(add=" + addFile.getFile().getFileName());
-
- if (updateFiles != null)
- {
-
- for (Pair<JournalFile, Integer> update : updateFiles)
- {
- buffer.append(", update=" + update.a.getFile().getFileName());
- }
-
- }
-
- buffer.append(")");
-
- return buffer.toString();
- }
- }
-
private static class NullEncoding implements EncodingSupport
{
Added: trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalRecord.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.api.core.Pair;
+
+/**
+ * This holds the relationship a record has with other files in regard to reference counting.
+ * Note: This class used to be called PosFiles
+ *
+ * Used on the ref-count for reclaiming
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * */
+public class JournalRecord
+{
+ private final JournalFile addFile;
+
+ private final int size;
+
+ private List<Pair<JournalFile, Integer>> updateFiles;
+
+ public JournalRecord(final JournalFile addFile, final int size)
+ {
+ this.addFile = addFile;
+
+ this.size = size;
+
+ addFile.incPosCount();
+
+ addFile.addSize(size);
+ }
+
+ void addUpdateFile(final JournalFile updateFile, final int size)
+ {
+ if (updateFiles == null)
+ {
+ updateFiles = new ArrayList<Pair<JournalFile, Integer>>();
+ }
+
+ updateFiles.add(new Pair<JournalFile, Integer>(updateFile, size));
+
+ updateFile.incPosCount();
+
+ updateFile.addSize(size);
+ }
+
+ void delete(final JournalFile file)
+ {
+ file.incNegCount(addFile);
+ addFile.decSize(size);
+
+ if (updateFiles != null)
+ {
+ for (Pair<JournalFile, Integer> updFile : updateFiles)
+ {
+ file.incNegCount(updFile.a);
+ updFile.a.decSize(updFile.b);
+ }
+ }
+ }
+
+ public String toString()
+ {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("JournalRecord(add=" + addFile.getFile().getFileName());
+
+ if (updateFiles != null)
+ {
+
+ for (Pair<JournalFile, Integer> update : updateFiles)
+ {
+ buffer.append(", update=" + update.a.getFile().getFileName());
+ }
+
+ }
+
+ buffer.append(")");
+
+ return buffer.toString();
+ }
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.Map;
+
+/**
+ * This is an interface used only internally.
+ *
+ * During a TX.commit, the JournalTransaction needs to get a valid list of records from either the JournalImpl or JournalCompactor.
+ *
+ * when a commit is read, the JournalTransaction will inquire the JournalCompactor about the existent records
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JournalRecordProvider
+{
+ JournalCompactor getCompactor();
+
+ Map<Long, JournalRecord> getRecords();
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -34,7 +34,7 @@
public class JournalTransaction
{
- private final JournalImpl journal;
+ private JournalRecordProvider journal;
private List<JournalUpdate> pos;
@@ -56,11 +56,16 @@
private final AtomicInteger counter = new AtomicInteger();
- public JournalTransaction(final long id, final JournalImpl journal)
+ public JournalTransaction(final long id, final JournalRecordProvider journal)
{
this.id = id;
this.journal = journal;
}
+
+ public void replaceRecordProvider(JournalRecordProvider provider)
+ {
+ this.journal = provider;
+ }
/**
* @return the id
@@ -291,7 +296,7 @@
{
for (JournalUpdate trUpdate : pos)
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.id);
+ JournalRecord posFiles = journal.getRecords().get(trUpdate.id);
if (compactor != null && compactor.lookupRecord(trUpdate.id))
{
@@ -302,7 +307,7 @@
}
else if (posFiles == null)
{
- posFiles = new JournalImpl.JournalRecord(trUpdate.file, trUpdate.size);
+ posFiles = new JournalRecord(trUpdate.file, trUpdate.size);
journal.getRecords().put(trUpdate.id, posFiles);
}
@@ -323,7 +328,7 @@
}
else
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
+ JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
if (posFiles != null)
{
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -1042,7 +1042,7 @@
}
}
}
-
+
if (perfBlastPages != -1)
{
messageJournal.perfBlast(perfBlastPages);
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-12 20:31:54 UTC (rev 9537)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-08-13 05:48:44 UTC (rev 9538)
@@ -185,58 +185,83 @@
{
internalCompactTest(false, false, true, true, false, false, false, false, false, false, true, true, true);
}
-
+
public void testCompactFirstFileReclaimed() throws Exception
{
setup(2, 60 * 1024, false);
final byte recordType = (byte)0;
-
+
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
-
+
journal.start();
-
+
journal.loadInternalOnly();
-
+
journal.appendAddRecord(1, recordType, "test".getBytes(), true);
-
+
journal.forceMoveNextFile();
-
-
+
journal.appendUpdateRecord(1, recordType, "update".getBytes(), true);
-
+
journal.appendDeleteRecord(1, true);
-
+
journal.appendAddRecord(2, recordType, "finalRecord".getBytes(), true);
-
- for (int i = 10 ; i < 100; i++)
+ for (int i = 10; i < 100; i++)
{
journal.appendAddRecord(i, recordType, ("tst" + i).getBytes(), true);
journal.forceMoveNextFile();
journal.appendUpdateRecord(i, recordType, ("uptst" + i).getBytes(), true);
journal.appendDeleteRecord(i, true);
}
-
+
journal.compact();
-
+
journal.stop();
-
+
List<RecordInfo> records = new ArrayList<RecordInfo>();
-
+
List<PreparedTransactionInfo> preparedRecords = new ArrayList<PreparedTransactionInfo>();
-
+
journal.start();
journal.load(records, preparedRecords, null);
-
+
assertEquals(1, records.size());
-
-
}
+ public void testOnRollback() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ createJournal();
+
+ startJournal();
+
+ load();
+
+ add(1);
+
+ updateTx(2, 1);
+
+ rollback(2);
+
+ journal.compact();
+
+ stopJournal();
+
+ startJournal();
+
+ loadAndCheck();
+
+ stopJournal();
+
+ }
+
private void internalCompactTest(final boolean preXA, // prepare before compact
final boolean postXA, // prepare after compact
final boolean regularAdd,
@@ -562,15 +587,14 @@
loadAndCheck();
}
-
+
public void testCompactAddAndUpdateFollowedByADelete() throws Exception
{
setup(2, 60 * 1024, false);
-
+
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
final ReusableLatch reusableLatchDone = new ReusableLatch();
reusableLatchDone.countUp();
final ReusableLatch reusableLatchWait = new ReusableLatch();
@@ -602,15 +626,15 @@
load();
long consumerTX = idGen.generateID();
-
+
long firstID = idGen.generateID();
-
+
long appendTX = idGen.generateID();
-
+
long addedRecord = idGen.generateID();
-
+
addTx(consumerTX, firstID);
-
+
Thread tCompact = new Thread()
{
@Override
@@ -627,36 +651,34 @@
}
};
-
tCompact.start();
-
reusableLatchDone.await();
-
+
addTx(appendTX, addedRecord);
commit(appendTX);
updateTx(consumerTX, addedRecord);
-
+
commit(consumerTX);
-
+
delete(addedRecord);
-
+
reusableLatchWait.countDown();
-
+
tCompact.join();
journal.forceMoveNextFile();
-
+
long newRecord = idGen.generateID();
add(newRecord);
update(newRecord);
journal.compact();
-
+
System.out.println("Debug after compact\n" + journal.debug());
-
+
stopJournal();
createJournal();
startJournal();
@@ -668,10 +690,9 @@
{
setup(2, 60 * 1024, false);
-
+
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
final ReusableLatch reusableLatchDone = new ReusableLatch();
reusableLatchDone.countUp();
final ReusableLatch reusableLatchWait = new ReusableLatch();
@@ -701,18 +722,17 @@
startJournal();
load();
-
+
long firstID = idGen.generateID();
long consumerTX = idGen.generateID();
-
+
long appendTX = idGen.generateID();
-
+
long addedRecord = idGen.generateID();
-
+
addTx(consumerTX, firstID);
-
Thread tCompact = new Thread()
{
@Override
@@ -729,30 +749,29 @@
}
};
-
tCompact.start();
reusableLatchDone.await();
-
+
addTx(appendTX, addedRecord);
commit(appendTX);
updateTx(consumerTX, addedRecord);
commit(consumerTX);
-
+
long deleteTXID = idGen.generateID();
-
+
deleteTx(deleteTXID, addedRecord);
commit(deleteTXID);
-
+
reusableLatchWait.countDown();
-
+
tCompact.join();
journal.forceMoveNextFile();
-
+
journal.compact();
-
+
stopJournal();
createJournal();
startJournal();
@@ -764,10 +783,9 @@
{
setup(2, 60 * 1024, false);
-
+
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
final ReusableLatch reusableLatchDone = new ReusableLatch();
reusableLatchDone.countUp();
final ReusableLatch reusableLatchWait = new ReusableLatch();
@@ -797,18 +815,17 @@
startJournal();
load();
-
+
long firstID = idGen.generateID();
long consumerTX = idGen.generateID();
-
+
long addedRecord = idGen.generateID();
-
+
add(firstID);
updateTx(consumerTX, firstID);
-
Thread tCompact = new Thread()
{
@Override
@@ -825,22 +842,20 @@
}
};
-
tCompact.start();
-
reusableLatchDone.await();
-
+
addTx(consumerTX, addedRecord);
commit(consumerTX);
delete(addedRecord);
-
+
reusableLatchWait.countDown();
-
+
tCompact.join();
journal.compact();
-
+
stopJournal();
createJournal();
startJournal();
@@ -848,15 +863,13 @@
}
-
public void testCompactAddAndUpdateFollowedByADelete4() throws Exception
{
setup(2, 60 * 1024, false);
-
+
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
-
final ReusableLatch reusableLatchDone = new ReusableLatch();
reusableLatchDone.countUp();
final ReusableLatch reusableLatchWait = new ReusableLatch();
@@ -888,13 +901,13 @@
load();
long consumerTX = idGen.generateID();
-
+
long firstID = idGen.generateID();
-
+
long appendTX = idGen.generateID();
-
+
long addedRecord = idGen.generateID();
-
+
Thread tCompact = new Thread()
{
@Override
@@ -911,38 +924,36 @@
}
};
-
tCompact.start();
-
reusableLatchDone.await();
-
+
addTx(consumerTX, firstID);
-
+
addTx(appendTX, addedRecord);
commit(appendTX);
updateTx(consumerTX, addedRecord);
-
+
commit(consumerTX);
-
+
delete(addedRecord);
-
+
reusableLatchWait.countDown();
-
+
tCompact.join();
journal.forceMoveNextFile();
-
+
long newRecord = idGen.generateID();
add(newRecord);
update(newRecord);
journal.compact();
-
+
System.out.println("Debug after compact\n" + journal.debug());
-
+
stopJournal();
createJournal();
startJournal();
@@ -950,14 +961,11 @@
}
-
-
public void testDeleteWhileCleanup() throws Exception
{
setup(2, 60 * 1024, false);
-
final ReusableLatch reusableLatchDone = new ReusableLatch();
reusableLatchDone.countUp();
final ReusableLatch reusableLatchWait = new ReusableLatch();
@@ -988,7 +996,6 @@
startJournal();
load();
-
Thread tCompact = new Thread()
{
@Override
@@ -1005,14 +1012,13 @@
}
};
- for (int i = 0 ; i < 100; i++)
+ for (int i = 0; i < 100; i++)
{
add(i);
}
-
+
journal.forceMoveNextFile();
-
-
+
for (int i = 10; i < 90; i++)
{
delete(i);
@@ -1027,9 +1033,9 @@
{
delete(i);
}
-
+
reusableLatchWait.countDown();
-
+
tCompact.join();
// Delete part of the live records after cleanup is done
@@ -1037,11 +1043,11 @@
{
delete(i);
}
-
+
assertEquals(9, journal.getCurrentFile().getNegCount(journal.getDataFiles()[0]));
journal.forceMoveNextFile();
-
+
stopJournal();
createJournal();
startJournal();
@@ -1049,15 +1055,11 @@
}
-
-
-
public void testCompactAddAndUpdateFollowedByADelete5() throws Exception
{
setup(2, 60 * 1024, false);
-
SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
final ReusableLatch reusableLatchDone = new ReusableLatch();
@@ -1106,34 +1108,32 @@
}
};
-
long appendTX = idGen.generateID();
long appendOne = idGen.generateID();
long appendTwo = idGen.generateID();
-
+
long updateTX = idGen.generateID();
-
+
addTx(appendTX, appendOne);
-
tCompact.start();
reusableLatchDone.await();
-
+
addTx(appendTX, appendTwo);
commit(appendTX);
-
+
updateTx(updateTX, appendOne);
updateTx(updateTX, appendTwo);
-
+
commit(updateTX);
- //delete(appendTwo);
-
+ // delete(appendTwo);
+
reusableLatchWait.countDown();
tCompact.join();
journal.compact();
-
+
stopJournal();
createJournal();
startJournal();
@@ -1141,7 +1141,6 @@
}
-
public void testSimpleCompacting() throws Exception
{
setup(2, 60 * 1024, false);
More information about the hornetq-commits
mailing list