[hornetq-commits] JBoss hornetq SVN: r9488 - in trunk: src/main/org/hornetq/core/journal/impl and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Jul 30 11:00:28 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-07-30 11:00:26 -0400 (Fri, 30 Jul 2010)
New Revision: 9488
Modified:
trunk/src/main/org/hornetq/core/journal/TestableJournal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 Changes after stress tests, making sure about the integrity of the journal
Modified: trunk/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -54,6 +54,11 @@
boolean isAutoReclaim();
void compact() throws Exception;
+
+ void cleanUp(final JournalFile file) throws Exception;
+
+ JournalFile getCurrentFile();
+
/** This method is called automatically when a new file is opened.
* @return true if it needs to re-check due to cleanup or other factors */
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -176,20 +176,6 @@
}
}
- /**
- * Read files that depend on this file.
- * Commits and rollbacks are also counted as negatives. We need to fix those also.
- * @param dependencies
- */
- public void fixDependencies(final JournalFile originalFile, final ArrayList<JournalFile> dependencies) throws Exception
- {
- for (JournalFile dependency : dependencies)
- {
- fixDependency(originalFile, dependency);
- }
-
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -220,32 +206,7 @@
}
// Private -------------------------------------------------------
- private void fixDependency(final JournalFile originalFile, final JournalFile dependency) throws Exception
- {
- JournalReaderCallback txfix = new JournalReaderCallbackAbstract()
- {
- @Override
- public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
- {
- if (transactionCounter.containsKey(transactionID))
- {
- dependency.incNegCount(originalFile);
- }
- }
- @Override
- public void onReadRollbackRecord(final long transactionID) throws Exception
- {
- if (transactionCounter.containsKey(transactionID))
- {
- dependency.incNegCount(originalFile);
- }
- }
- };
-
- JournalImpl.readJournalFile(fileFactory, dependency, txfix);
- }
-
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -45,6 +45,9 @@
void decSize(int bytes);
int getLiveSize();
+
+ /** The total number of deletes this file has */
+ int getTotalNegativeToOthers();
void setCanReclaim(boolean canDelete);
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -49,6 +49,9 @@
private boolean needCleanup;
+ private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
+
+
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
public JournalFileImpl(final SequentialFile file, final long fileID)
@@ -65,6 +68,7 @@
negCounts.clear();
posCount.set(0);
liveBytes.set(0);
+ totalNegativeToOthers.set(0);
}
public int getPosCount()
@@ -94,6 +98,10 @@
public void incNegCount(final JournalFile file)
{
+ if (file != this)
+ {
+ totalNegativeToOthers.incrementAndGet();
+ }
getOrCreateNegCount(file).incrementAndGet();
}
@@ -219,5 +227,12 @@
{
return liveBytes.get();
}
+
+ public int getTotalNegativeToOthers()
+ {
+ return totalNegativeToOthers.get();
+ }
+
+
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -464,59 +464,70 @@
" sequence = " +
file.getFileID());
- JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+ listJournalFile(out, fileFactory, file);
+ }
+ }
+
+ /**
+ * @param out
+ * @param fileFactory
+ * @param file
+ * @throws Exception
+ */
+ public static void listJournalFile(final PrintStream out, SequentialFileFactory fileFactory, JournalFile file) throws Exception
+ {
+ JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+ {
+
+ public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
{
+ out.println("ReadUpdateTX, txID=" + transactionID + ", " + recordInfo);
+ }
- public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- out.println("ReadUpdateTX, txID=" + transactionID + ", " + recordInfo);
- }
+ public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ {
+ out.println("ReadUpdate " + recordInfo);
+ }
- public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
- {
- out.println("ReadUpdate " + recordInfo);
- }
+ public void onReadRollbackRecord(long transactionID) throws Exception
+ {
+ out.println("Rollback txID=" + transactionID);
+ }
- public void onReadRollbackRecord(long transactionID) throws Exception
- {
- out.println("Rollback txID=" + transactionID);
- }
+ public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ {
+ out.println("Prepare txID=" + transactionID + ", numberOfRecords=" + numberOfRecords);
+ }
- public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
- {
- out.println("Prepare txID=" + transactionID);
- }
+ public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("DeleteRecordTX txID=" + transactionID + ", " + recordInfo);
+ }
- public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- out.println("DeleteRecordTX txID=" + transactionID + ", " + recordInfo);
- }
+ public void onReadDeleteRecord(long recordID) throws Exception
+ {
+ out.println("DeleteRecord id=" + recordID);
+ }
- public void onReadDeleteRecord(long recordID) throws Exception
- {
- out.println("DeleteRecord id=" + recordID);
- }
+ public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ {
+ out.println("CommitRecord txID=" + transactionID + ", numberOfRecords=" + numberOfRecords);
+ }
- public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
- {
- out.println("CommitRecord txID=" + transactionID);
- }
+ public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("AddRecordTX, txID=" + transactionID + ", " + recordInfo);
+ }
- public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
- {
- out.println("AddRecordTX, txID=" + transactionID + ", " + recordInfo);
- }
+ public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+ {
+ out.println("AddRecord " + recordInfo);
+ }
- public void onReadAddRecord(RecordInfo recordInfo) throws Exception
- {
- out.println("AddRecord " + recordInfo);
- }
-
- public void markAsDataFile(JournalFile file)
- {
- }
- });
- }
+ public void markAsDataFile(JournalFile file)
+ {
+ }
+ });
}
@@ -1621,6 +1632,7 @@
{
JournalImpl.trace("Starting compacting operation on journal");
}
+ JournalImpl.log.debug("Starting compacting operation on journal");
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
@@ -1765,7 +1777,7 @@
if (trace)
{
- JournalImpl.trace("Finished compacting on journal");
+ JournalImpl.log.debug("Finished compacting on journal");
}
}
@@ -2305,7 +2317,7 @@
if (compactMinFiles > 0)
{
- if (nCleanup > getMinCompact())
+ if (nCleanup > 0 && needsCompact())
{
for (JournalFile file : dataFiles)
{
@@ -2357,16 +2369,9 @@
return false;
}
- /**
- * @return
- */
- private float getMinCompact()
+ // This method is public for tests
+ public synchronized void cleanUp(final JournalFile file) throws Exception
{
- return compactMinFiles * compactPercentage;
- }
-
- private synchronized void cleanUp(final JournalFile file) throws Exception
- {
if (state != JournalImpl.STATE_LOADED)
{
return;
@@ -2388,7 +2393,7 @@
JournalImpl.trace("Cleaning up file " + file);
}
JournalImpl.log.debug("Cleaning up file " + file);
-
+
if (file.getPosCount() == 0)
{
// nothing to be done
@@ -2408,6 +2413,10 @@
jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
}
}
+
+ currentFile.resetNegCount(file);
+ currentFile.incPosCount();
+ dependencies.add(currentFile);
cleaner = new JournalCleaner(fileFactory, this, records.keySet(), file.getFileID());
}
@@ -2420,7 +2429,10 @@
cleaner.flush();
- cleaner.fixDependencies(file, dependencies);
+ // pointcut for tests
+ // We need to test concurrent updates on the journal, as the compacting is being performed.
+ // Usually tests will use this to hold the compacting while other structures are being updated.
+ onCompactDone();
for (JournalFile jrnfile : dependencies)
{
@@ -2437,15 +2449,35 @@
file.getFile().delete();
tmpFile.renameTo(cleanedFileName);
controlFile.delete();
+
}
finally
{
compactingLock.readLock().unlock();
- JournalImpl.log.debug("Clean up on file " + file + " done");
+ JournalImpl.log.info("Clean up on file " + file + " done");
}
}
+
+ private boolean needsCompact() throws Exception
+ {
+ JournalFile[] dataFiles = getDataFiles();
+ long totalLiveSize = 0;
+
+ for (JournalFile file : dataFiles)
+ {
+ totalLiveSize += file.getLiveSize();
+ }
+
+ long totalBytes = (long)dataFiles.length * (long)fileSize;
+
+ long compactMargin = (long)(totalBytes * compactPercentage);
+
+ return (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles);
+
+ }
+
private void checkCompact() throws Exception
{
if (compactMinFiles == 0)
@@ -2459,21 +2491,8 @@
return;
}
- JournalFile[] dataFiles = getDataFiles();
-
- long totalLiveSize = 0;
-
- for (JournalFile file : dataFiles)
+ if (needsCompact())
{
- totalLiveSize += file.getLiveSize();
- }
-
- long totalBytes = (long)dataFiles.length * (long)fileSize;
-
- long compactMargin = (long)(totalBytes * compactPercentage);
-
- if (totalLiveSize < compactMargin && !compactorRunning.get() && dataFiles.length > compactMinFiles)
- {
if (!compactorRunning.compareAndSet(false, true))
{
return;
Modified: trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -63,20 +63,20 @@
{
Reclaimer.trace("posCount on " + currentFile + " = " + posCount);
}
-
+
for (int j = i; j < files.length; j++)
{
if (Reclaimer.trace)
{
if (files[j].getNegCount(currentFile) != 0)
{
- Reclaimer.trace("Negative from " + files[j] + " = " + files[j].getNegCount(currentFile));
+ Reclaimer.trace("Negative from " + files[j] + " into " + currentFile + " = " + files[j].getNegCount(currentFile));
}
}
totNeg += files[j].getNegCount(currentFile);
}
-
+
currentFile.setCanReclaim(true);
if (posCount <= totNeg)
@@ -101,8 +101,18 @@
{
Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
}
+ file.setNeedCleanup(true);
- file.setNeedCleanup(true);
+ if (file.getTotalNegativeToOthers() == 0)
+ {
+ file.setNeedCleanup(true);
+ }
+ else
+ {
+ // This file can't be cleared as the file has negatives to other files as well
+ file.setNeedCleanup(false);
+ }
+
currentFile.setCanReclaim(false);
break;
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -950,7 +950,108 @@
}
+
+ public void testDeleteWhileCleanup() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+
+ final ReusableLatch reusableLatchDone = new ReusableLatch();
+ reusableLatchDone.countUp();
+ final ReusableLatch reusableLatchWait = new ReusableLatch();
+ reusableLatchWait.countUp();
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ reusableLatchDone.countDown();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ reusableLatchWait.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+
+ Thread tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.cleanUp(journal.getDataFiles()[0]);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ add(i);
+ }
+
+ journal.forceMoveNextFile();
+
+
+ for (int i = 10; i < 90; i++)
+ {
+ delete(i);
+ }
+
+ tCompact.start();
+
+ reusableLatchDone.await();
+
+ // Delete part of the live records while cleanup still working
+ for (int i = 1; i < 5; i++)
+ {
+ delete(i);
+ }
+
+ reusableLatchWait.countDown();
+
+ tCompact.join();
+
+ // Delete part of the live records after cleanup is done
+ for (int i = 5; i < 10; i++)
+ {
+ delete(i);
+ }
+
+ assertEquals(9, journal.getCurrentFile().getNegCount(journal.getDataFiles()[0]));
+
+ journal.forceMoveNextFile();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+
+
+
public void testCompactAddAndUpdateFollowedByADelete5() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/JournalSoakTest.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -15,6 +15,7 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -24,7 +25,10 @@
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
@@ -49,9 +53,15 @@
public static SimpleIDGenerator idGen = new SimpleIDGenerator(1);
private volatile boolean running;
-
+
private AtomicInteger errors = new AtomicInteger(0);
+ private AtomicInteger numberOfRecords = new AtomicInteger(0);
+
+ private AtomicInteger numberOfUpdates = new AtomicInteger(0);
+
+ private AtomicInteger numberOfDeletes = new AtomicInteger(0);
+
private JournalImpl journal;
ThreadFactory tFactory = new HornetQThreadFactory("SoakTest" + System.identityHashCode(this),
@@ -68,7 +78,7 @@
super.setUp();
errors.set(0);
-
+
File dir = new File(getTemporaryDir());
dir.mkdirs();
@@ -87,8 +97,8 @@
}
journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
- 100,
- ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES,
+ 10,
+ 15,
ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
factory,
"hornetq-data",
@@ -103,11 +113,22 @@
@Override
public void tearDown() throws Exception
{
- journal.stop();
+ try
+ {
+ if (journal.isStarted())
+ {
+ journal.stop();
+ }
+ }
+ catch (Exception e)
+ {
+ // don't care :-)
+ }
}
public void testAppend() throws Exception
{
+
running = true;
SlowAppenderNoTX t1 = new SlowAppenderNoTX();
@@ -124,15 +145,28 @@
t1.start();
+ Thread.sleep(1000);
+
for (int i = 0; i < NTHREADS; i++)
{
appenders[i].start();
updaters[i].start();
}
- // TODO: parametrize this somehow
- Thread.sleep(TimeUnit.HOURS.toMillis(24));
+ long timeToEnd = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(10);
+ while (System.currentTimeMillis() < timeToEnd)
+ {
+ System.out.println("Append = " + numberOfRecords +
+ ", Update = " +
+ numberOfUpdates +
+ ", Delete = " +
+ numberOfDeletes +
+ ", liveRecords = " +
+ (numberOfRecords.get() - numberOfDeletes.get()));
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ }
+
running = false;
for (Thread t : appenders)
@@ -146,14 +180,39 @@
}
t1.join();
-
+
assertEquals(0, errors.get());
-
+
journal.stop();
-
+
journal.start();
-
- journal.loadInternalOnly();
+
+ ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+ ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+ journal.load(committedRecords, preparedTransactions, new TransactionFailureCallback()
+ {
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+ });
+
+ long appends = 0, updates = 0;
+
+ for (RecordInfo record : committedRecords)
+ {
+ if (record.isUpdate)
+ {
+ updates++;
+ }
+ else
+ {
+ appends++;
+ }
+ }
+
+ assertEquals(numberOfRecords.get() - numberOfDeletes.get(), appends);
+
+ journal.stop();
}
private byte[] generateRecord()
@@ -180,16 +239,16 @@
while (running)
{
- int txSize = RandomUtil.randomMax(1000);
+ final int txSize = RandomUtil.randomMax(100);
long txID = JournalSoakTest.idGen.generateID();
- final ArrayList<Long> ids = new ArrayList<Long>();
+ final long ids[] = new long[txSize];
for (int i = 0; i < txSize; i++)
{
long id = JournalSoakTest.idGen.generateID();
- ids.add(id);
+ ids[i] = id;
journal.appendAddRecordTransactional(txID, id, (byte)0, generateRecord());
Thread.sleep(1);
}
@@ -203,6 +262,7 @@
public void done()
{
+ numberOfRecords.addAndGet(txSize);
for (Long id : ids)
{
queue.add(id);
@@ -236,7 +296,7 @@
{
try
{
- int txSize = RandomUtil.randomMax(1000);
+ int txSize = RandomUtil.randomMax(100);
int txCount = 0;
long ids[] = new long[txSize];
@@ -248,13 +308,12 @@
long id = queue.poll(60, TimeUnit.MINUTES);
ids[txCount] = id;
journal.appendUpdateRecordTransactional(txID, id, (byte)0, generateRecord());
- Thread.sleep(1);
if (++txCount == txSize)
{
journal.appendCommitRecord(txID, true, ctx);
ctx.executeOnCompletion(new DeleteTask(ids));
txCount = 0;
- txSize = RandomUtil.randomMax(1000);
+ txSize = RandomUtil.randomMax(100);
txID = JournalSoakTest.idGen.generateID();
ids = new long[txSize];
}
@@ -280,11 +339,13 @@
public void done()
{
+ numberOfUpdates.addAndGet(ids.length);
try
{
for (long id : ids)
{
- journal.appendDeleteRecord(id, true);
+ journal.appendDeleteRecord(id, false);
+ numberOfDeletes.incrementAndGet();
}
}
catch (Exception e)
@@ -314,25 +375,23 @@
{
while (running)
{
- long ids[] = new long[1000];
+ long ids[] = new long[5];
// Append
- for (int i = 0; running & i < 1000; i++)
+ for (int i = 0; running & i < ids.length; i++)
{
+ System.out.println("append slow");
ids[i] = JournalSoakTest.idGen.generateID();
journal.appendAddRecord(ids[i], (byte)1, generateRecord(), true);
- Thread.sleep(10);
+ numberOfRecords.incrementAndGet();
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(50));
}
- // Update
- for (int i = 0; running & i < 1000; i++)
- {
- journal.appendUpdateRecord(ids[i], (byte)1, generateRecord(), true);
- Thread.sleep(10);
- }
// Delete
- for (int i = 0; running & i < 1000; i++)
+ for (int i = 0; running & i < ids.length; i++)
{
- journal.appendDeleteRecord(ids[i], true);
- Thread.sleep(10);
+ System.out.println("Deleting");
+ journal.appendDeleteRecord(ids[i], false);
+ numberOfDeletes.incrementAndGet();
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-07-30 08:03:33 UTC (rev 9487)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-07-30 15:00:26 UTC (rev 9488)
@@ -711,6 +711,8 @@
private void setupPosNeg(final int fileNumber, final int pos, final int... neg)
{
JournalFile file = files[fileNumber];
+
+ int totalDep = file.getTotalNegativeToOthers();
for (int i = 0; i < pos; i++)
{
@@ -724,8 +726,11 @@
for (int j = 0; j < neg[i]; j++)
{
file.incNegCount(reclaimable2);
+ totalDep++;
}
}
+
+ assertEquals(totalDep, file.getTotalNegativeToOthers());
}
private void debugFiles()
@@ -777,6 +782,8 @@
private boolean canDelete;
private boolean needCleanup;
+
+ private int totalDep;
public void extendOffset(final int delta)
{
@@ -822,6 +829,8 @@
int c = count == null ? 1 : count.intValue() + 1;
negCounts.put(file, c);
+
+ totalDep++;
}
public int getPosCount()
@@ -985,5 +994,13 @@
{
return 0;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#getTotalNegativeToOthers()
+ */
+ public int getTotalNegativeToOthers()
+ {
+ return totalDep;
+ }
}
}
More information about the hornetq-commits
mailing list