[hornetq-commits] JBoss hornetq SVN: r9532 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Aug 12 15:35:54 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-12 15:35:52 -0400 (Thu, 12 Aug 2010)
New Revision: 9532
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-482 Improving startup time on large journal files
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-12 19:35:52 UTC (rev 9532)
@@ -20,6 +20,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -32,6 +33,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -65,7 +67,6 @@
import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
-import org.hornetq.utils.concurrent.LinkedBlockingDeque;
/**
*
@@ -385,8 +386,6 @@
return compactor;
}
-
-
/** this method is used internally only however tools may use it to maintenance.
* It won't be part of the interface as the tools should be specific to the implementation */
public List<JournalFile> orderFiles() throws Exception
@@ -400,11 +399,11 @@
SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
file.open(1, false);
-
+
try
{
long fileID = readFileHeader(file);
-
+
orderedFiles.add(new JournalFileImpl(file, fileID));
}
finally
@@ -420,10 +419,10 @@
return orderedFiles;
}
-
+
private void calculateNextfileID(List<JournalFile> files)
{
-
+
for (JournalFile file : files)
{
long fileID = file.getFileID();
@@ -431,9 +430,9 @@
{
nextFileID.set(fileID);
}
-
+
long fileNameID = getFileNameID(file.getFile().getFileName());
-
+
// The compactor could create a fileName but use a previously assigned ID.
// Because of that we need to take both parts into account
if (nextFileID.get() < fileNameID)
@@ -442,14 +441,8 @@
}
}
-
}
-
-
-
-
-
/** this method is used internally only however tools may use it to maintenance. */
public static int readJournalFile(final SequentialFileFactory fileFactory,
final JournalFile file,
@@ -502,7 +495,6 @@
// This is what supports us from not re-filling the whole file
int readFileId = wholeFileBuffer.getInt();
-
// This record is from a previous file-usage. The file was
// reused and we need to ignore this record
if (readFileId != file.getRecordID())
@@ -511,7 +503,6 @@
continue;
}
-
long transactionID = 0;
if (JournalImpl.isTransaction(recordType))
@@ -570,10 +561,10 @@
{
if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), 1))
{
- wholeFileBuffer.position(pos +1);
+ wholeFileBuffer.position(pos + 1);
continue;
}
-
+
userRecordType = wholeFileBuffer.get();
}
@@ -585,8 +576,8 @@
record = new byte[variableSize];
- wholeFileBuffer.get(record);
- }
+ wholeFileBuffer.get(record);
+ }
// Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
// currentFile
@@ -660,7 +651,7 @@
// checkSize by some sort of calculated hash)
if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
- JournalImpl.trace("Record at position " + pos +
+ JournalImpl.trace("Record at position " + pos +
" recordType = " +
recordType +
" possible transactionID = " + transactionID +
@@ -1058,7 +1049,7 @@
{
if (JournalImpl.LOAD_TRACE)
{
- JournalImpl.trace("appendAddRecordTransactional txID " + txID +
+ JournalImpl.trace("appendAddRecordTransactional txID " + txID +
", id = " +
id +
", recordType = " +
@@ -1326,7 +1317,6 @@
JournalImpl.trace("appendCommitRecord txID " + txID + ", compacting = " + (compactor != null));
}
-
if (tx == null)
{
throw new IllegalStateException("Cannot find tx with id " + txID);
@@ -1461,48 +1451,60 @@
final TransactionFailureCallback failureCallback) throws Exception
{
final Set<Long> recordsToDelete = new HashSet<Long>();
- final List<RecordInfo> records = new ArrayList<RecordInfo>();
+ // ArrayList was taking too long to delete elements on checkDeleteSize
+ final List<RecordInfo> records = new LinkedList<RecordInfo>();
final int DELETE_FLUSH = 20000;
JournalLoadInformation info = load(new LoaderCallback()
{
+ Runtime runtime = Runtime.getRuntime();
+
+ private void checkDeleteSize()
+ {
+ // HORNETQ-482 - Flush deletes only if memory is critical
+ if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() > (runtime.maxMemory() * 0.8)))
+ {
+ log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+ // Clean up when the list is too large, or it won't be possible to load large sets of files
+ // Done as part of JBMESSAGING-1678
+ Iterator<RecordInfo> iter = records.iterator();
+ while (iter.hasNext())
+ {
+ RecordInfo record = iter.next();
+
+ if (recordsToDelete.contains(record.id))
+ {
+ iter.remove();
+ }
+ }
+
+ recordsToDelete.clear();
+ }
+ }
+
public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
{
preparedTransactions.add(preparedTransaction);
+ checkDeleteSize();
}
public void addRecord(final RecordInfo info)
{
records.add(info);
+ checkDeleteSize();
}
public void updateRecord(final RecordInfo info)
{
records.add(info);
+ checkDeleteSize();
}
public void deleteRecord(final long id)
{
recordsToDelete.add(id);
-
- // Clean up when the list is too large, or it won't be possible to load large sets of files
- // Done as part of JBMESSAGING-1678
- if (recordsToDelete.size() == DELETE_FLUSH)
- {
- Iterator<RecordInfo> iter = records.iterator();
- while (iter.hasNext())
- {
- RecordInfo record = iter.next();
-
- if (recordsToDelete.contains(record.id))
- {
- iter.remove();
- }
- }
-
- recordsToDelete.clear();
- }
+ checkDeleteSize();
}
public void failedTransaction(final long transactionID,
@@ -1551,7 +1553,7 @@
JournalImpl.trace("Starting compacting operation on journal");
}
JournalImpl.log.debug("Starting compacting operation on journal");
-
+
onCompactStart();
// We need to guarantee that the journal is frozen for this short time
@@ -1774,7 +1776,7 @@
final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
final List<JournalFile> orderedFiles = orderFiles();
-
+
calculateNextfileID(orderedFiles);
int lastDataPos = JournalImpl.SIZE_HEADER;
@@ -2290,7 +2292,7 @@
{
return;
}
-
+
compactingLock.readLock().lock();
try
@@ -2307,7 +2309,7 @@
JournalImpl.trace("Cleaning up file " + file);
}
JournalImpl.log.debug("Cleaning up file " + file);
-
+
if (file.getPosCount() == 0)
{
// nothing to be done
@@ -2327,7 +2329,7 @@
jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
}
}
-
+
currentFile.resetNegCount(file);
currentFile.incPosCount();
dependencies.add(currentFile);
@@ -2360,7 +2362,7 @@
SequentialFile controlFile = createControlFile(null, null, new Pair<String, String>(tmpFileName,
cleanedFileName));
-
+
SequentialFile returningFile = fileFactory.createSequentialFile(file.getFile().getFileName(), maxAIO);
returningFile.renameTo(renameExtensionFile(tmpFileName, ".cmp") + ".tmp");
@@ -2368,9 +2370,9 @@
tmpFile.renameTo(cleanedFileName);
controlFile.delete();
-
+
final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
-
+
filesExecutor.execute(new Runnable()
{
public void run()
@@ -2395,7 +2397,7 @@
}
}
-
+
private boolean needsCompact() throws Exception
{
JournalFile[] dataFiles = getDataFiles();
@@ -2422,7 +2424,7 @@
// compacting is disabled
return;
}
-
+
if (state != JournalImpl.STATE_LOADED)
{
return;
@@ -2598,7 +2600,7 @@
{
return maxAIO;
}
-
+
public int getUserVersion()
{
return userVersion;
@@ -2675,14 +2677,14 @@
{
state = JournalImpl.STATE_STOPPED;
-
+
compactorExecutor.shutdown();
-
+
if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS))
{
JournalImpl.log.warn("Couldn't stop compactor executor after 120 seconds");
}
-
+
filesExecutor.shutdown();
if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
@@ -2754,13 +2756,14 @@
/** being protected as testcases can override this method */
protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
{
-
- // addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong order
- // These files are already freed, and are described on the compactor file control.
+
+ // addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong
+ // order
+ // These files are already freed, and are described on the compactor file control.
// In case of crash they will be cleared anyways
-
+
final CountDownLatch done = new CountDownLatch(1);
-
+
filesExecutor.execute(new Runnable()
{
public void run()
@@ -2773,13 +2776,13 @@
}
catch (Throwable e)
{
- log.warn("Error reinitializing file " + file, e);
+ log.warn("Error reinitializing file " + file, e);
}
}
done.countDown();
}
});
-
+
// need to wait all old files to be freed
// to avoid a race where the CTR file is deleted before the init for these files is already done
// what could cause a duplicate in case of a crash after the CTR is deleted and before the file is initialized
@@ -2812,7 +2815,7 @@
protected void onCompactDone()
{
}
-
+
// Private
// -----------------------------------------------------------------------------
@@ -2824,7 +2827,7 @@
{
if (file.getFile().size() != this.getFileSize())
{
- log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception ("trace"));
+ log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
file.getFile().delete();
}
else
@@ -2834,7 +2837,7 @@
// Re-initialise it
JournalFile jf = reinitializeFile(file);
-
+
if (renameTmp)
{
jf.getFile().renameTo(renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
@@ -2959,20 +2962,19 @@
file.read(bb);
int journalVersion = bb.getInt();
-
+
if (journalVersion != FORMAT_VERSION)
{
throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch");
}
-
-
+
int readUserVersion = bb.getInt();
-
+
if (readUserVersion != userVersion)
{
throw new HornetQException(HornetQException.IO_ERROR, "Journal data belong to a different version");
}
-
+
long fileID = bb.getLong();
fileFactory.releaseBuffer(bb);
@@ -2986,15 +2988,18 @@
* @param sequentialFile
* @throws Exception
*/
- public static int initFileHeader(final SequentialFileFactory fileFactory, final SequentialFile sequentialFile, final int userVersion, final long fileID) throws Exception
+ public static int initFileHeader(final SequentialFileFactory fileFactory,
+ final SequentialFile sequentialFile,
+ final int userVersion,
+ final long fileID) throws Exception
{
// We don't need to release buffers while writing.
ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
-
+
HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
writeHeader(buffer, userVersion, fileID);
-
+
bb.rewind();
int bufferSize = bb.limit();
@@ -3014,12 +3019,12 @@
public static void writeHeader(HornetQBuffer buffer, final int userVersion, final long fileID)
{
buffer.writeInt(FORMAT_VERSION);
-
+
buffer.writeInt(userVersion);
buffer.writeLong(fileID);
}
-
+
/**
*
* @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
@@ -3293,7 +3298,7 @@
new Exception("Warning: Couldn't open a file in 60 Seconds"));
}
}
-
+
if (trace)
{
JournalImpl.trace("Returning file " + nextFile);
@@ -3337,7 +3342,7 @@
private void pushOpenedFile() throws Exception
{
JournalFile nextOpenedFile = getFile(true, true, true, false);
-
+
if (trace)
{
JournalImpl.trace("pushing openFile " + nextOpenedFile);
@@ -3407,7 +3412,7 @@
}
}
-
+
private void drainClosedFiles()
{
JournalFile file;
@@ -3548,7 +3553,7 @@
else
{
final int position = bufferPos + size;
-
+
return position > fileSize || position < 0;
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-12 19:35:52 UTC (rev 9532)
@@ -16,12 +16,11 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -775,9 +774,6 @@
Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
- // used to identify messages that are not referenced
- Set<Long> referencedMessages = new HashSet<Long>();
-
JournalLoadInformation info = messageJournal.load(records,
preparedTransactions,
new LargeMessageTXFailureCallback(messages));
@@ -786,8 +782,19 @@
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
- for (RecordInfo record : records)
+ final int totalSize = records.size();
+
+ for (int reccount = 0 ; reccount < totalSize; reccount++)
{
+ // It will show log.info only with large journals (more than 1 million records)
+ if (reccount> 0 && reccount % 1000000 == 0)
+ {
+ long percent = (long)((((double)reccount) / ((double)totalSize)) * 100f);
+
+ log.info(percent + "% loaded");
+ }
+
+ RecordInfo record = records.get(reccount);
byte[] data = record.data;
HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
@@ -839,8 +846,6 @@
{
throw new IllegalStateException("Cannot find message " + record.id);
}
-
- referencedMessages.add(messageID);
queueMessages.put(messageID, new AddMessageRecord(message));
@@ -969,7 +974,16 @@
throw new IllegalStateException("Invalid record type " + recordType);
}
}
+
+ // This will free up memory sooner. The record is not needed any more
+ // and its byte array would consume memory during the load process even though it's not necessary any longer
+ // what would delay processing time during load
+ records.set(reccount, null);
}
+
+ // Release the memory as soon as not needed any longer
+ records.clear();
+ records = null;
for (Map.Entry<Long, Map<Long, AddMessageRecord>> entry : queueMap.entrySet())
{
@@ -978,8 +992,10 @@
Map<Long, AddMessageRecord> queueRecords = entry.getValue();
Queue queue = queues.get(queueID);
+
+ Collection<AddMessageRecord> valueRecords = queueRecords.values();
- for (AddMessageRecord record : queueRecords.values())
+ for (AddMessageRecord record : valueRecords)
{
long scheduledDeliveryTime = record.scheduledDeliveryTime;
@@ -1013,11 +1029,9 @@
for (ServerMessage msg : messages.values())
{
- if (!referencedMessages.contains(msg.getMessageID()))
+ if (msg.getRefCount() == 0)
{
log.info("Deleting unreferenced message id=" + msg.getMessageID() + " from the journal");
- // Something after routing could delete messages
- // So we ignore eventual ignores
try
{
deleteMessage(msg.getMessageID());
@@ -2128,6 +2142,8 @@
long scheduledDeliveryTime;
int deliveryCount;
+
+ boolean referenced = false;
}
private class LargeMessageTXFailureCallback implements TransactionFailureCallback
Modified: trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2010-08-12 19:35:52 UTC (rev 9532)
@@ -52,7 +52,7 @@
{
long deliveryTime = ref.getScheduledDeliveryTime();
- if (deliveryTime > 0 && scheduledExecutor != null)
+ if (deliveryTime > System.currentTimeMillis() && scheduledExecutor != null)
{
if (ScheduledDeliveryHandlerImpl.trace)
{
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-08-12 19:35:52 UTC (rev 9532)
@@ -36,7 +36,6 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
-import org.hornetq.core.server.JournalType;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.jms.client.HornetQTextMessage;
More information about the hornetq-commits
mailing list