[hornetq-commits] JBoss hornetq SVN: r9471 - in trunk: tests/src/org/hornetq/tests/soak/journal and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Jul 27 15:14:33 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-07-27 15:14:32 -0400 (Tue, 27 Jul 2010)
New Revision: 9471
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java
trunk/tests/src/org/hornetq/tests/util/ListJournal.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - Making sure freed files during compacting won't go out of order + adding a few methods to help debug the journal.
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-07-27 19:14:32 UTC (rev 9471)
@@ -161,7 +161,7 @@
{
try
{
- return "JournalFileImpl: " + file.getFileName();
+ return "JournalFileImpl: (" + file.getFileName() + " id = " + this.fileID + ", recordID = " + recordID + ")";
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-27 19:14:32 UTC (rev 9471)
@@ -13,6 +13,7 @@
package org.hornetq.core.journal.impl;
+import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -94,7 +95,7 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = JournalImpl.log.isTraceEnabled();
+ private static final boolean trace = log.isTraceEnabled();
/** This is to be set to true at DEBUG & development only */
private static final boolean LOAD_TRACE = false;
@@ -104,7 +105,7 @@
// Journal
private static final void trace(final String message)
{
- JournalImpl.log.trace(message);
+ log.trace(message);
}
// The sizes of primitive types
@@ -385,6 +386,142 @@
return compactor;
}
+
+
+ /** this method is used internally only however tools may use it to maintenance.
+ * It won't be part of the interface as the tools should be specific to the implementation */
+ public List<JournalFile> orderFiles() throws Exception
+ {
+ List<String> fileNames = fileFactory.listFiles(fileExtension);
+
+ List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
+
+ for (String fileName : fileNames)
+ {
+ SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
+
+ file.open(1, false);
+
+ try
+ {
+ long fileID = readFileHeader(file);
+
+ orderedFiles.add(new JournalFileImpl(file, fileID));
+ }
+ finally
+ {
+ file.close();
+ }
+ }
+
+ // Now order them by ordering id - we can't use the file name for ordering
+ // since we can re-use dataFiles
+
+ Collections.sort(orderedFiles, new JournalFileComparator());
+
+ return orderedFiles;
+ }
+
+ private void calculateNextfileID(List<JournalFile> files)
+ {
+
+ for (JournalFile file : files)
+ {
+ long fileID = file.getFileID();
+ if (nextFileID.get() < fileID)
+ {
+ nextFileID.set(fileID);
+ }
+
+ long fileNameID = getFileNameID(file.getFile().getFileName());
+
+ // The compactor could create a fileName but use a previously assigned ID.
+ // Because of that we need to take both parts into account
+ if (nextFileID.get() < fileNameID)
+ {
+ nextFileID.set(fileNameID);
+ }
+ }
+
+
+ }
+
+
+ /**
+ * @param fileFactory
+ * @param journal
+ * @throws Exception
+ */
+ public static void listJournalFiles(final PrintStream out, final JournalImpl journal) throws Exception
+ {
+ List<JournalFile> files = journal.orderFiles();
+
+ SequentialFileFactory fileFactory = journal.fileFactory;
+
+ for (JournalFile file : files)
+ {
+ out.println("####### listing file " + file.getFile().getFileName() +
+ " sequence = " +
+ file.getFileID());
+
+ JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+ {
+
+ public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("ReadUpdateTX, txID=" + transactionID + ", " + recordInfo);
+ }
+
+ public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ {
+ out.println("ReadUpdate " + recordInfo);
+ }
+
+ public void onReadRollbackRecord(long transactionID) throws Exception
+ {
+ out.println("Rollback txID=" + transactionID);
+ }
+
+ public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ {
+ out.println("Prepare txID=" + transactionID);
+ }
+
+ public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("DeleteRecordTX txID=" + transactionID + ", " + recordInfo);
+ }
+
+ public void onReadDeleteRecord(long recordID) throws Exception
+ {
+ out.println("DeleteRecord id=" + recordID);
+ }
+
+ public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ {
+ out.println("CommitRecord txID=" + transactionID);
+ }
+
+ public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ {
+ out.println("AddRecordTX, txID=" + transactionID + ", " + recordInfo);
+ }
+
+ public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+ {
+ out.println("AddRecord " + recordInfo);
+ }
+
+ public void markAsDataFile(JournalFile file)
+ {
+ }
+ });
+ }
+ }
+
+
+
+ /** this method is used internally only however tools may use it to maintenance. */
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
final JournalReaderCallback reader) throws Exception
@@ -703,7 +840,11 @@
return lastDataPos;
}
-
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ throw new Exception (e.getMessage(), e);
+ }
finally
{
if (wholeFileBuffer != null)
@@ -1476,7 +1617,10 @@
try
{
- JournalImpl.trace("Starting compacting operation on journal");
+ if (trace)
+ {
+ JournalImpl.trace("Starting compacting operation on journal");
+ }
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
@@ -1579,7 +1723,10 @@
dataFiles.addFirst(fileToAdd);
}
- JournalImpl.trace("There are " + dataFiles.size() + " datafiles Now");
+ if (trace)
+ {
+ JournalImpl.trace("There are " + dataFiles.size() + " datafiles Now");
+ }
// Replay pending commands (including updates, deletes and commits)
@@ -1616,7 +1763,10 @@
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
- JournalImpl.trace("Finished compacting on journal");
+ if (trace)
+ {
+ JournalImpl.trace("Finished compacting on journal");
+ }
}
finally
@@ -1698,6 +1848,8 @@
final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
+
+ calculateNextfileID(orderedFiles);
int lastDataPos = JournalImpl.SIZE_HEADER;
@@ -2646,10 +2798,26 @@
/** being protected as testcases can override this method */
protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
{
- for (JournalFile file : oldFiles)
+
+ // addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong order
+ filesExecutor.execute(new Runnable()
{
- addFreeFile(file);
- }
+ public void run()
+ {
+ for (JournalFile file : oldFiles)
+ {
+ try
+ {
+ addFreeFile(file);
+ }
+ catch (Exception e)
+ {
+ log.warn("Error reinitializing file " + file, e);
+ }
+ }
+ }
+ });
+
for (JournalFile file : newFiles)
{
@@ -2795,52 +2963,6 @@
return recordSize;
}
- private List<JournalFile> orderFiles() throws Exception
- {
- List<String> fileNames = fileFactory.listFiles(fileExtension);
-
- List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
-
- for (String fileName : fileNames)
- {
- SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
-
- file.open(1, false);
-
- try
- {
- long fileID = readFileHeader(file);
-
- if (nextFileID.get() < fileID)
- {
- nextFileID.set(fileID);
- }
-
- long fileNameID = getFileNameID(fileName);
-
- // The compactor could create a fileName but use a previously assigned ID.
- // Because of that we need to take both parts into account
- if (nextFileID.get() < fileNameID)
- {
- nextFileID.set(fileNameID);
- }
-
- orderedFiles.add(new JournalFileImpl(file, fileID));
- }
- finally
- {
- file.close();
- }
- }
-
- // Now order them by ordering id - we can't use the file name for ordering
- // since we can re-use dataFiles
-
- Collections.sort(orderedFiles, new JournalFileComparator());
-
- return orderedFiles;
- }
-
/**
* @param file
* @return
@@ -3117,7 +3239,7 @@
if (JournalImpl.trace)
{
- JournalImpl.trace("moveNextFile: " + currentFile.getFile().getFileName() + " sync: " + synchronous);
+ JournalImpl.trace("moveNextFile: " + currentFile + " sync: " + synchronous);
}
fileFactory.activateBuffer(currentFile.getFile());
@@ -3175,6 +3297,11 @@
new Exception("Warning: Couldn't open a file in 60 Seconds"));
}
}
+
+ if (trace)
+ {
+ JournalImpl.trace("Returning file " + nextFile);
+ }
return nextFile;
}
@@ -3212,6 +3339,11 @@
private void pushOpenedFile() throws Exception
{
JournalFile nextOpenedFile = getFile(true, true, true, false);
+
+ if (trace)
+ {
+ JournalImpl.trace("pushing openFile " + nextOpenedFile);
+ }
openedFiles.offer(nextOpenedFile);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-07-27 19:14:32 UTC (rev 9471)
@@ -320,6 +320,7 @@
// Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
// Using bufferToFlush.put(buffer) would make several append calls for each byte
+ // We also transfer the content of this buffer to the native file's buffer
bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
@@ -348,6 +349,7 @@
pendingSync = false;
+ // swap the instance as the previous callback list is being used asynchronously
callbacks = new LinkedList<IOAsyncTask>();
buffer.clear();
Modified: trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java 2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/tests/src/org/hornetq/tests/soak/journal/SoakJournal.java 2010-07-27 19:14:32 UTC (rev 9471)
@@ -118,7 +118,7 @@
updaters[i].start();
}
- Thread.sleep(3600000000l);
+ Thread.sleep(TimeUnit.HOURS.toMillis(24));
running = false;
Modified: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2010-07-27 15:54:03 UTC (rev 9470)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2010-07-27 19:14:32 UTC (rev 9471)
@@ -13,19 +13,21 @@
package org.hornetq.tests.util;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
import java.util.ArrayList;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
/**
* Lists the journal content for debug purposes.
*
* This is just a class useful on debug during development,
- * listing journal contents (As we don't have access to SQL on Journal :-) ).
*
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*
@@ -52,15 +54,19 @@
{
FileConfiguration fileConf = new FileConfiguration();
+ fileConf.setJournalDirectory("/work/projects/trunk/journal");
+
// fileConf.setConfigurationUrl(arg[0]);
fileConf.start();
+ SequentialFileFactory fileFactory = new AIOSequentialFileFactory(fileConf.getJournalDirectory());
+
JournalImpl journal = new JournalImpl(fileConf.getJournalFileSize(),
- fileConf.getJournalMinFiles(),
+ 10,
0,
0,
- new NIOSequentialFileFactory(fileConf.getJournalDirectory()),
+ fileFactory,
"hornetq-data",
"hq",
fileConf.getJournalMaxIO_NIO());
@@ -69,18 +75,31 @@
ArrayList<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
journal.start();
+
+
+ PrintStream out = new PrintStream(new FileOutputStream("/tmp/file.out"));
+
+ out.println("######### Journal records per file");
+
+ JournalImpl.listJournalFiles(out, journal);
+
journal.load(records, prepared, null);
+
+ out.println();
+
+ out.println("##########################################");
+ out.println("# T O T A L L I S T #");
if (prepared.size() > 0)
{
- System.out.println("There are " + prepared.size() + " prepared transactions on the journal");
+ out.println("There are " + prepared.size() + " prepared transactions on the journal");
}
- System.out.println("Total of " + records.size() + " committed records");
+ out.println("Total of " + records.size() + " committed records");
for (RecordInfo record : records)
{
- System.out.println("user record: " + record);
+ out.println("user record: " + record);
}
journal.checkReclaimStatus();
@@ -89,6 +108,9 @@
journal.stop();
+ journal.stop();
+
+ out.close();
}
catch (Exception e)
{
More information about the hornetq-commits
mailing list