Author: clebert.suconic(a)jboss.com
Date: 2011-06-05 23:12:33 -0400 (Sun, 05 Jun 2011)
New Revision: 10774
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
HORNETQ-714 - fixing out of ordering issue after compacting
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-06-03
15:21:19 UTC (rev 10773)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-06-06
03:12:33 UTC (rev 10774)
@@ -42,6 +42,10 @@
private static final Logger log = Logger.getLogger(JournalFilesRepository.class);
private static final boolean trace = JournalFilesRepository.log.isTraceEnabled();
+
+ // Used to debug the consistency of the journal ordering.
+ // This is meant to be false as these extra checks would cause performance issues
+ private static final boolean CHECK_CONSISTENCE = false;
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
@@ -56,6 +60,8 @@
// Attributes ----------------------------------------------------
private final SequentialFileFactory fileFactory;
+
+ private final JournalImpl journal;
private final BlockingDeque<JournalFile> dataFiles = new
LinkedBlockingDeque<JournalFile>();
@@ -78,12 +84,30 @@
private final int userVersion;
private Executor openFilesExecutor;
+
+ private Runnable pushOpenRunnable = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ pushOpenedFile();
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.error(e.getMessage(), e);
+ }
+ }
+ };
+
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
+
public JournalFilesRepository(final SequentialFileFactory fileFactory,
+ final JournalImpl journal,
final String filePrefix,
final String fileExtension,
final int userVersion,
@@ -98,6 +122,7 @@
this.minFiles = minFiles;
this.fileSize = fileSize;
this.userVersion = userVersion;
+ this.journal = journal;
}
// Public --------------------------------------------------------
@@ -233,11 +258,96 @@
public void addDataFileOnTop(final JournalFile file)
{
dataFiles.addFirst(file);
+
+ if (CHECK_CONSISTENCE)
+ {
+ checkDataFiles();
+ }
}
+
+ public String debugFiles()
+ {
+ StringBuffer buffer = new StringBuffer();
+
+ buffer.append("**********\nCurrent File = " + journal.getCurrentFile() +
"\n");
+ buffer.append("**********\nDataFiles:\n");
+ for (JournalFile file : dataFiles)
+ {
+ buffer.append(file.toString() + "\n");
+ }
+ buffer.append("*********\nFreeFiles:\n");
+ for (JournalFile file : freeFiles)
+ {
+ buffer.append(file.toString() + "\n");
+ }
+ return buffer.toString();
+ }
+
+ public synchronized void checkDataFiles()
+ {
+ long seq = -1;
+ for (JournalFile file : dataFiles)
+ {
+ if (file.getFileID() <= seq)
+ {
+ log.info("CheckDataFiles:");
+ log.info(debugFiles());
+ log.info("Sequence out of order on journal");
+ System.exit(-1);
+ }
+
+ if (journal.getCurrentFile() != null &&
journal.getCurrentFile().getFileID() <= file.getFileID())
+ {
+ log.info("CheckDataFiles:");
+ log.info(debugFiles());
+ log.info("CurrentFile on the journal is <= the sequence
file.getFileID=" + file.getFileID() + " on the dataFiles");
+ log.info("Currentfile.getFileId=" +
journal.getCurrentFile().getFileID() + " while the file.getFileID()=" +
file.getFileID());
+ log.info("IsSame = (" + (journal.getCurrentFile() == file) +
")");
+
+ // throw new RuntimeException ("Check failure!");
+ }
+
+ if (journal.getCurrentFile() == file)
+ {
+ throw new RuntimeException ("Check failure! Current file listed as data
file!");
+ }
+
+ seq = file.getFileID();
+ }
+
+ long lastFreeId = -1;
+ for (JournalFile file : freeFiles)
+ {
+ if (file.getFileID() <= lastFreeId)
+ {
+ log.info("CheckDataFiles:");
+ log.info(debugFiles());
+ log.info("FreeFileID out of order ");
+
+ throw new RuntimeException ("Check failure!");
+ }
+
+ lastFreeId= file.getFileID();
+
+ if (file.getFileID() < seq)
+ {
+ log.info("CheckDataFiles:");
+ log.info(debugFiles());
+ log.info("A FreeFile is less then the maximum data");
+
+ // throw new RuntimeException ("Check failure!");
+ }
+ }
+ }
public void addDataFileOnBottom(final JournalFile file)
{
dataFiles.add(file);
+
+ if (CHECK_CONSISTENCE)
+ {
+ checkDataFiles();
+ }
}
// Free File Operations ==========================================
@@ -254,6 +364,11 @@
public void addFreeFileNoInit(final JournalFile file)
{
freeFiles.add(file);
+
+ if (CHECK_CONSISTENCE)
+ {
+ checkDataFiles();
+ }
}
/**
@@ -302,6 +417,11 @@
{
file.getFile().delete();
}
+
+ if (CHECK_CONSISTENCE)
+ {
+ checkDataFiles();
+ }
}
public Collection<JournalFile> getFreeFiles()
@@ -333,28 +453,13 @@
JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size="
+ openedFiles.size());
}
- Runnable run = new Runnable()
- {
- public void run()
- {
- try
- {
- pushOpenedFile();
- }
- catch (Exception e)
- {
- JournalFilesRepository.log.error(e.getMessage(), e);
- }
- }
- };
-
if (openFilesExecutor == null)
{
- run.run();
+ pushOpenRunnable.run();
}
else
{
- openFilesExecutor.execute(run);
+ openFilesExecutor.execute(pushOpenRunnable);
}
JournalFile nextFile = null;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-06-03
15:21:19 UTC (rev 10773)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-06-06
03:12:33 UTC (rev 10774)
@@ -292,6 +292,7 @@
this.fileFactory = fileFactory;
filesRepository = new JournalFilesRepository(fileFactory,
+ this,
filePrefix,
fileExtension,
userVersion,
@@ -1484,7 +1485,7 @@
public synchronized JournalLoadInformation load(final List<RecordInfo>
committedRecords,
final
List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback
failureCallback,
- final boolean fixBadTX) throws
Exception
+ final boolean changeData) throws
Exception
{
final Set<Long> recordsToDelete = new HashSet<Long>();
// ArrayList was taking too long to delete elements on checkDeleteSize
@@ -1554,7 +1555,7 @@
failureCallback.failedTransaction(transactionID, records,
recordsToDelete);
}
}
- }, fixBadTX);
+ }, changeData);
for (RecordInfo record : records)
{
@@ -1640,16 +1641,8 @@
try
{
- if (JournalImpl.trace)
- {
- JournalImpl.trace("Starting compacting operation on journal");
- }
+ log.debug("Starting compacting operation on journal");
- if (JournalImpl.TRACE_RECORDS)
- {
- JournalImpl.traceRecord("Starting compacting operation on
journal");
- }
-
onCompactStart();
// We need to guarantee that the journal is frozen for this short time
@@ -1806,16 +1799,8 @@
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
- if (JournalImpl.trace)
- {
- trace("Finished compacting on journal");
- }
-
- if (JournalImpl.TRACE_RECORDS)
- {
- JournalImpl.traceRecord("Finished compacting on journal");
- }
-
+ log.debug("Finished compacting on journal");
+
}
finally
{
@@ -1879,7 +1864,7 @@
return load(loadManager, true);
}
- public synchronized JournalLoadInformation load(final LoaderCallback loadManager,
boolean fixFailingTransactions) throws Exception
+ public synchronized JournalLoadInformation load(final LoaderCallback loadManager,
final boolean changeData) throws Exception
{
if (state != JournalImpl.STATE_STARTED)
{
@@ -2167,8 +2152,11 @@
}
else
{
- // Empty dataFiles with no data
- filesRepository.addFreeFileNoInit(file);
+ if (changeData)
+ {
+ // Empty dataFiles with no data
+ filesRepository.addFreeFile(file, false);
+ }
}
}
@@ -2206,7 +2194,7 @@
JournalImpl.log.warn("Uncommitted transaction with id " +
transaction.transactionID +
" found and discarded");
- if (fixFailingTransactions)
+ if (changeData)
{
// I append a rollback record here, because otherwise compacting will be
throwing messages because of unknown transactions
this.appendRollbackRecord(transaction.transactionID, false);
@@ -2998,7 +2986,7 @@
if (JournalImpl.trace)
{
- JournalImpl.trace("moveNextFile: " + currentFile);
+ log.trace("moveNextFile: " + currentFile);
}
fileFactory.activateBuffer(currentFile.getFile());
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-06-03
15:21:19 UTC (rev 10773)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-06-06
03:12:33 UTC (rev 10774)
@@ -910,11 +910,13 @@
if (message == null)
{
- throw new IllegalStateException("Cannot find message " +
record.id);
+ log.error("Cannot find message " + record.id);
}
+ else
+ {
+ queueMessages.put(messageID, new AddMessageRecord(message));
+ }
- queueMessages.put(messageID, new AddMessageRecord(message));
-
break;
}
case ACKNOWLEDGE_REF:
@@ -929,14 +931,16 @@
if (queueMessages == null)
{
- throw new IllegalStateException("Cannot find queue messages "
+ encoding.queueID);
+ log.error("Cannot find queue messages for queueID=" +
encoding.queueID + " on ack for messageID=" + messageID);
}
-
- AddMessageRecord rec = queueMessages.remove(messageID);
-
- if (rec == null)
+ else
{
- throw new IllegalStateException("Cannot find message " +
messageID);
+ AddMessageRecord rec = queueMessages.remove(messageID);
+
+ if (rec == null)
+ {
+ log.error("Cannot find message " + messageID);
+ }
}
break;
@@ -1008,18 +1012,23 @@
if (queueMessages == null)
{
- throw new IllegalStateException("Cannot find queue messages "
+ encoding.queueID);
+ log.error("Cannot find queue messages " + encoding.queueID +
" for message " + messageID + " while processing scheduled
messages");
}
-
- AddMessageRecord rec = queueMessages.get(messageID);
-
- if (rec == null)
+ else
{
- throw new IllegalStateException("Cannot find message " +
messageID);
+
+ AddMessageRecord rec = queueMessages.get(messageID);
+
+ if (rec == null)
+ {
+ log.error("Cannot find message " + messageID);
+ }
+ else
+ {
+ rec.scheduledDeliveryTime = encoding.scheduledDeliveryTime;
+ }
}
- rec.scheduledDeliveryTime = encoding.scheduledDeliveryTime;
-
break;
}
case DUPLICATE_ID:
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-06-03
15:21:19 UTC (rev 10773)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-06-06
03:12:33 UTC (rev 10774)
@@ -3677,11 +3677,24 @@
}
}
+ public void testLoop() throws Exception
+ {
+ for (int i = 0 ; i < 1000; i++)
+ {
+ log.warn("#test " + i);
+ testDLAOnLargeMessageAndPaging();
+ tearDown();
+ setUp();
+ }
+
+ }
+
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
Configuration config = createDefaultConfig();
+ config.setThreadPoolMaxSize(5);
config.setJournalSyncNonTransactional(false);
@@ -3776,12 +3789,14 @@
assertNotNull("Message " + i + " wasn't received",
message);
message.acknowledge();
+ final AtomicInteger bytesOutput = new AtomicInteger(0);
+
message.setOutputStream(new OutputStream()
{
@Override
public void write(int b) throws IOException
{
-
+ bytesOutput.incrementAndGet();
}
});
@@ -3795,8 +3810,10 @@
}
catch (Throwable e)
{
+ log.info("output bytes = " + bytesOutput);
log.info(threadDump("dump"));
- fail("Couldn't finish large message receiving for id=" +
message.getStringProperty("id") + " with messageID=" +
message.getMessageID());
+ fail("Couldn't finish large message receiving for id=" +
+ message.getStringProperty("id") + " with
messageID=" + message.getMessageID());
}
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-06-03
15:21:19 UTC (rev 10773)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-06-06
03:12:33 UTC (rev 10774)
@@ -779,8 +779,10 @@
}
}
}
+
+ long lastId = idGenerator.generateID();
- add(idGenerator.generateID());
+ add(lastId);
if (createControlFile && deleteControlFile &&
renameFilesAfterCompacting)
{
@@ -791,6 +793,15 @@
createJournal();
startJournal();
loadAndCheck();
+
+ journal.forceMoveNextFile();
+ update(lastId);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
}