[hornetq-commits] JBoss hornetq SVN: r9552 - in trunk: src/main/org/hornetq/core/journal/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Aug 16 19:52:29 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-16 19:52:29 -0400 (Mon, 16 Aug 2010)
New Revision: 9552
Modified:
trunk/src/main/org/hornetq/core/journal/TestableJournal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - another fix
Modified: trunk/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-16 14:31:38 UTC (rev 9551)
+++ trunk/src/main/org/hornetq/core/journal/TestableJournal.java 2010-08-16 23:52:29 UTC (rev 9552)
@@ -49,6 +49,8 @@
void forceMoveNextFile() throws Exception;
+ void forceMoveNextFile(boolean synchronous) throws Exception;
+
void setAutoReclaim(boolean autoReclaim);
boolean isAutoReclaim();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-16 14:31:38 UTC (rev 9551)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-16 23:52:29 UTC (rev 9552)
@@ -23,8 +23,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,6 +33,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -105,7 +104,7 @@
// Journal
private static final void trace(final String message)
{
- log.trace(message);
+ log.info(message);
}
// The sizes of primitive types
@@ -191,7 +190,7 @@
private final LinkedBlockingDeque<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
- private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
+ private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
@@ -651,11 +650,13 @@
// checkSize by some sort of calculated hash)
if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
- JournalImpl.trace("Record at position " + pos +
+ JournalImpl.trace("Record at position " + pos +
" recordType = " +
recordType +
- " possible transactionID = " + transactionID +
- " possible recordID = " + recordID +
+ " possible transactionID = " +
+ transactionID +
+ " possible recordID = " +
+ recordID +
" file:" +
file.getFile().getFileName() +
" is corrupted and it is being ignored (III)");
@@ -763,7 +764,7 @@
catch (Throwable e)
{
log.warn(e.getMessage(), e);
- throw new Exception (e.getMessage(), e);
+ throw new Exception(e.getMessage(), e);
}
finally
{
@@ -828,7 +829,11 @@
{
if (JournalImpl.LOAD_TRACE)
{
- JournalImpl.trace("appendAddRecord id = " + id + ", recordType = " + recordType + " compacting = " + (compactor != null));
+ JournalImpl.trace("appendAddRecord id = " + id +
+ ", recordType = " +
+ recordType +
+ " compacting = " +
+ (compactor != null));
}
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
@@ -969,12 +974,11 @@
}
compactingLock.readLock().lock();
-
try
{
JournalRecord record = null;
-
+
if (compactor == null)
{
record = records.remove(id);
@@ -1049,12 +1053,13 @@
{
if (JournalImpl.LOAD_TRACE)
{
- JournalImpl.trace("appendAddRecordTransactional txID " + txID +
+ JournalImpl.trace("appendAddRecordTransactional txID " + txID +
", id = " +
id +
", recordType = " +
recordType +
- ", compacting " + (this.compactor != null));
+ ", compacting " +
+ (this.compactor != null));
}
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
@@ -1107,7 +1112,9 @@
", id = " +
id +
", recordType = " +
- recordType + ", compacting = " + (compactor != null));
+ recordType +
+ ", compacting = " +
+ (compactor != null));
}
JournalInternalRecord updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
@@ -1462,7 +1469,7 @@
private void checkDeleteSize()
{
- // HORNETQ-482 - Flush deletes only if memory is critical
+ // HORNETQ-482 - Flush deletes only if memory is critical
if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() < (runtime.maxMemory() * 0.2)))
{
log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
@@ -1480,7 +1487,7 @@
}
recordsToDelete.clear();
-
+
log.debug("flush delete done");
}
}
@@ -1568,6 +1575,8 @@
return;
}
+ onCompactLock();
+
setAutoReclaim(false);
// We need to move to the next file, as we need a clear start for negatives and positives counts
@@ -1612,7 +1621,15 @@
// well
for (final JournalFile file : dataFilesToProcess)
{
- JournalImpl.readJournalFile(fileFactory, file, compactor);
+ try
+ {
+ JournalImpl.readJournalFile(fileFactory, file, compactor);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on reading compacting for " + file);
+ throw new Exception("Error on reading compacting for " + file, e);
+ }
}
compactor.flush();
@@ -1634,6 +1651,8 @@
// Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
compactor = null;
+ onCompactLock();
+
newDatafiles = localCompactor.getNewDataFiles();
// Restore newRecords created during compacting
@@ -1664,7 +1683,7 @@
{
newTransaction.replaceRecordProvider(this);
}
-
+
localCompactor.replayPendingCommands();
// Merge transactions back after compacting
@@ -2300,13 +2319,11 @@
return;
}
- compactingLock.readLock().lock();
-
try
{
JournalCleaner cleaner = null;
ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
- lockAppend.lock();
+ compactingLock.writeLock().lock();
try
{
@@ -2345,11 +2362,21 @@
}
finally
{
- lockAppend.unlock();
+ compactingLock.writeLock().unlock();
}
- JournalImpl.readJournalFile(fileFactory, file, cleaner);
+ compactingLock.readLock().lock();
+ try
+ {
+ JournalImpl.readJournalFile(fileFactory, file, cleaner);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error reading cleanup on " + file, e);
+ throw new Exception("Error reading cleanup on " + file, e);
+ }
+
cleaner.flush();
// pointcut for tests
@@ -2379,6 +2406,11 @@
controlFile.delete();
final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
+
+ if (trace)
+ {
+ trace("Adding free file back from cleanup" + retJournalfile);
+ }
filesExecutor.execute(new Runnable()
{
@@ -2616,14 +2648,20 @@
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
{
+ forceMoveNextFile(true);
+ }
+
+ // In some tests we need to force the journal to move to a next file
+ public void forceMoveNextFile(final boolean synchronous) throws Exception
+ {
compactingLock.readLock().lock();
try
{
lockAppend.lock();
try
{
- moveNextFile(true);
- if (autoReclaim)
+ moveNextFile(synchronous);
+ if (autoReclaim && !synchronous)
{
checkReclaimStatus();
}
@@ -2660,10 +2698,24 @@
throw new IllegalStateException("Journal is not stopped");
}
- filesExecutor = Executors.newSingleThreadExecutor();
+ filesExecutor = Executors.newSingleThreadExecutor(new ThreadFactory()
+ {
- compactorExecutor = Executors.newCachedThreadPool();
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "JournalImpl::FilesExecutor");
+ }
+ });
+ compactorExecutor = Executors.newCachedThreadPool(new ThreadFactory()
+ {
+
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "JournalImpl::CompactorExecutor");
+ }
+ });
+
fileFactory.start();
state = JournalImpl.STATE_STARTED;
@@ -2818,6 +2870,12 @@
{
}
+ /** This is an interception point for testcases, when the compacted files are written, to be called
+ * as soon as the compactor gets a writeLock */
+ protected void onCompactLock() throws Exception
+ {
+ }
+
/** This is an interception point for testcases, when the compacted files are written, before replacing the data structures */
protected void onCompactDone()
{
@@ -2843,6 +2901,11 @@
{
// Re-initialise it
+ if (trace)
+ {
+ trace("Adding free file " + file);
+ }
+
JournalFile jf = reinitializeFile(file);
if (renameTmp)
@@ -3182,6 +3245,11 @@
sequentialFile.close();
+ if (JournalImpl.trace)
+ {
+ JournalImpl.trace("Renaming file " + tmpFileName + " as " + fileName);
+ }
+
sequentialFile.renameTo(fileName);
if (keepOpened)
@@ -3289,7 +3357,7 @@
filesExecutor.execute(run);
}
- if (autoReclaim && !synchronous)
+ if (!synchronous)
{
scheduleReclaim();
}
@@ -3320,26 +3388,28 @@
{
return;
}
-
- filesExecutor.execute(new Runnable()
+
+ if (autoReclaim && !compactorRunning.get())
{
- public void run()
+ filesExecutor.execute(new Runnable()
{
- try
+ public void run()
{
- drainClosedFiles();
-
- if (!checkReclaimStatus())
+ try
{
- checkCompact();
+ drainClosedFiles();
+ if (!checkReclaimStatus())
+ {
+ checkCompact();
+ }
}
+ catch (Exception e)
+ {
+ JournalImpl.log.error(e.getMessage(), e);
+ }
}
- catch (Exception e)
- {
- JournalImpl.log.error(e.getMessage(), e);
- }
- }
- });
+ });
+ }
}
/**
@@ -3368,25 +3438,21 @@
final boolean tmpCompactExtension) throws Exception
{
JournalFile nextOpenedFile = null;
- try
+
+ nextOpenedFile = freeFiles.poll();
+
+ if (nextOpenedFile == null)
{
- nextOpenedFile = freeFiles.remove();
+ nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ }
+ else
+ {
if (tmpCompactExtension)
{
SequentialFile sequentialFile = nextOpenedFile.getFile();
sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
}
- }
- catch (NoSuchElementException ignored)
- {
- }
- if (nextOpenedFile == null)
- {
- nextOpenedFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
- }
- else
- {
if (keepOpened)
{
openFile(nextOpenedFile, multiAIO);
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-16 14:31:38 UTC (rev 9551)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/JournalCleanupCompactStressTest.java 2010-08-16 23:52:29 UTC (rev 9552)
@@ -121,6 +121,13 @@
"hq",
maxAIO)
{
+ protected void onCompactLock() throws Exception
+ {
+ System.out.println("OnCompactLock");
+ journal.forceMoveNextFile(false);
+ System.out.println("OnCompactLock done");
+ }
+
protected void onCompactStart() throws Exception
{
testExecutor.execute(new Runnable()
@@ -134,7 +141,7 @@
{
long id = idGen.generateID();
journal.appendAddRecord(id, (byte)0, new byte[] { 1, 2, 3 }, false);
- journal.forceMoveNextFile();
+ journal.forceMoveNextFile(false);
journal.appendDeleteRecord(id, id == 20);
}
System.out.println("OnCompactStart leave");
@@ -176,7 +183,7 @@
protected long getTotalTimeMilliseconds()
{
- return TimeUnit.MINUTES.toMillis(10);
+ return TimeUnit.MINUTES.toMillis(1);
}
public void testAppend() throws Exception
More information about the hornetq-commits
mailing list