[hornetq-commits] JBoss hornetq SVN: r7963 - in trunk: src/main/org/hornetq/core/journal/impl and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Sep 16 17:29:29 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-09-16 17:29:29 -0400 (Wed, 16 Sep 2009)
New Revision: 7963
Added:
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
Modified:
trunk/src/main/org/hornetq/core/journal/Journal.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
trunk/tests/src/org/hornetq/tests/util/ListJournal.java
Log:
HORNETQ-35 - Journal cleanup
Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -81,8 +81,9 @@
void perfBlast(int pages) throws Exception;
- /** This method is called automatically when a new file is opened */
- void checkAndReclaimFiles() throws Exception;
+ /** This method is called automatically when a new file is opened.
+ * @return true if it needs to re-check due to cleanup or other factors */
+ boolean checkReclaimStatus() throws Exception;
/** This method check for the need of compacting based on the minCompactPercentage
* This method is usually called automatically when new files are opened
Added: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -0,0 +1,238 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.hornetq.core.buffers.ChannelBuffer;
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.ConcurrentHashSet;
+import org.hornetq.utils.Pair;
+
+/**
+ *
+ * Super class for Journal maintenances such as clean up and Compactor
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ protected static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
+
+ private static final Logger log = Logger.getLogger(AbstractJournalUpdateTask.class);
+
+ protected final JournalImpl journal;
+
+ protected final SequentialFileFactory fileFactory;
+
+ protected JournalFile currentFile;
+
+ protected SequentialFile sequentialFile;
+
+ protected int fileID;
+
+ protected int nextOrderingID;
+
+ private ChannelBuffer writingChannel;
+
+ private final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();
+
+ protected final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
+ final JournalImpl journal,
+ final Set<Long> recordsSnapshot,
+ final int nextOrderingID)
+ {
+ super();
+ this.journal = journal;
+ this.fileFactory = fileFactory;
+ this.nextOrderingID = nextOrderingID;
+ this.recordsSnapshot.addAll(recordsSnapshot);
+ }
+
+ // Public --------------------------------------------------------
+
+ /**
+ * @param tmpRenameFile
+ * @param files
+ * @param newFiles
+ */
+ public static SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
+ final List<JournalFile> files,
+ final List<JournalFile> newFiles,
+ final List<Pair<String, String>> renames) throws Exception
+ {
+
+ SequentialFile controlFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
+
+ try
+ {
+ controlFile.open(1);
+
+ ChannelBuffer renameBuffer = ChannelBuffers.dynamicBuffer(1);
+
+ renameBuffer.writeInt(-1);
+ renameBuffer.writeInt(-1);
+
+ HornetQBuffer filesToRename = ChannelBuffers.dynamicBuffer(1);
+
+ // DataFiles first
+
+ if (files == null)
+ {
+ filesToRename.writeInt(0);
+ }
+ else
+ {
+ filesToRename.writeInt(files.size());
+
+ for (JournalFile file : files)
+ {
+ filesToRename.writeUTF(file.getFile().getFileName());
+ }
+ }
+
+ // New Files second
+
+ if (newFiles == null)
+ {
+ filesToRename.writeInt(0);
+ }
+ else
+ {
+ filesToRename.writeInt(newFiles.size());
+
+ for (JournalFile file : newFiles)
+ {
+ filesToRename.writeUTF(file.getFile().getFileName());
+ }
+ }
+
+ // Renames from clean up third
+ if (renames == null)
+ {
+ filesToRename.writeInt(0);
+ }
+ else
+ {
+ filesToRename.writeInt(renames.size());
+ for (Pair<String, String> rename : renames)
+ {
+ filesToRename.writeUTF(rename.a);
+ filesToRename.writeUTF(rename.b);
+ }
+ }
+
+ JournalImpl.writeAddRecord(-1,
+ 1,
+ (byte)0,
+ new JournalImpl.ByteArrayEncoding(filesToRename.array()),
+ JournalImpl.SIZE_ADD_RECORD + filesToRename.array().length,
+ renameBuffer);
+
+ ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
+
+ writeBuffer.put(renameBuffer.array(), 0, renameBuffer.writerIndex());
+
+ writeBuffer.rewind();
+
+ controlFile.write(writeBuffer, true);
+
+ return controlFile;
+ }
+ finally
+ {
+ controlFile.close();
+ }
+ }
+
+ /** Write pending output into file */
+ public void flush() throws Exception
+ {
+ if (writingChannel != null)
+ {
+ sequentialFile.position(0);
+ sequentialFile.write(writingChannel.toByteBuffer(), true);
+ sequentialFile.close();
+ newDataFiles.add(currentFile);
+ }
+
+ writingChannel = null;
+ }
+
+ public boolean lookupRecord(final long id)
+ {
+ return recordsSnapshot.contains(id);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+ /**
+ * @throws Exception
+ */
+
+ protected void openFile() throws Exception
+ {
+ flush();
+
+ ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
+ writingChannel = ChannelBuffers.wrappedBuffer(bufferWrite);
+
+ currentFile = journal.getFile(false, false, false, true);
+ sequentialFile = currentFile.getFile();
+
+ sequentialFile.open(1);
+ fileID = nextOrderingID++;
+ currentFile = new JournalFileImpl(sequentialFile, fileID);
+
+ writingChannel.writeInt(fileID);
+ }
+
+ protected void addToRecordsSnaptsho(long id)
+ {
+ recordsSnapshot.add(id);
+ }
+
+ /**
+ * @return the writingChannel
+ */
+ protected ChannelBuffer getWritingChannel()
+ {
+ return writingChannel;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A JournalCleaner
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCleaner extends AbstractJournalUpdateTask
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final HashMap<Long, AtomicInteger> transactionCounter = new HashMap<Long, AtomicInteger>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+ /**
+ * @param fileFactory
+ * @param journal
+ * @param nextOrderingID
+ */
+ protected JournalCleaner(final SequentialFileFactory fileFactory,
+ final JournalImpl journal,
+ final Set<Long> recordsSnapshot,
+ final int nextOrderingID) throws Exception
+ {
+ super(fileFactory, journal, recordsSnapshot, nextOrderingID);
+ openFile();
+ }
+
+ // Public --------------------------------------------------------
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#markAsDataFile(org.hornetq.core.journal.impl.JournalFile)
+ */
+ public void markAsDataFile(final JournalFile file)
+ {
+ // nothing to be done here
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecord(org.hornetq.core.journal.RecordInfo)
+ */
+ public void onReadAddRecord(final RecordInfo info) throws Exception
+ {
+ if (lookupRecord(info.id))
+ {
+ int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
+
+ JournalImpl.writeAddRecord(fileID,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data),
+ size,
+ getWritingChannel());
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadAddRecordTX(long, org.hornetq.core.journal.RecordInfo)
+ */
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ if (lookupRecord(recordInfo.id))
+ {
+ incrementTransactionCounter(transactionID);
+
+ int size = JournalImpl.SIZE_ADD_RECORD_TX + recordInfo.data.length;
+
+ JournalImpl.writeAddRecordTX(fileID,
+ transactionID,
+ recordInfo.id,
+ recordInfo.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(recordInfo.data),
+ size,
+ getWritingChannel());
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadCommitRecord(long, int)
+ */
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
+ {
+ int txcounter = getTransactionCounter(transactionID);
+
+ JournalImpl.writeTransaction(fileID,
+ JournalImpl.COMMIT_RECORD,
+ transactionID,
+ null,
+ JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD,
+ txcounter,
+ getWritingChannel());
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecord(long)
+ */
+ public void onReadDeleteRecord(final long recordID) throws Exception
+ {
+ JournalImpl.writeDeleteRecord(fileID, recordID, JournalImpl.SIZE_DELETE_RECORD, getWritingChannel());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadDeleteRecordTX(long, org.hornetq.core.journal.RecordInfo)
+ */
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ int size = JournalImpl.SIZE_DELETE_RECORD_TX + recordInfo.data.length;
+
+ incrementTransactionCounter(transactionID);
+
+ JournalImpl.writeDeleteRecordTransactional(fileID,
+ transactionID,
+ recordInfo.id,
+ new JournalImpl.ByteArrayEncoding(recordInfo.data),
+ size,
+ getWritingChannel());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadPrepareRecord(long, byte[], int)
+ */
+ public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
+ {
+ int txcounter = getTransactionCounter(transactionID);
+
+ int size = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + DataConstants.SIZE_INT;
+
+ JournalImpl.writeTransaction(fileID,
+ JournalImpl.PREPARE_RECORD,
+ transactionID,
+ new JournalImpl.ByteArrayEncoding(extraData),
+ size,
+ txcounter,
+ getWritingChannel());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadRollbackRecord(long)
+ */
+ public void onReadRollbackRecord(final long transactionID) throws Exception
+ {
+ JournalImpl.writeRollback(fileID, transactionID, getWritingChannel());
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecord(org.hornetq.core.journal.RecordInfo)
+ */
+ public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
+ {
+ if (lookupRecord(recordInfo.id))
+ {
+ int size = JournalImpl.SIZE_UPDATE_RECORD + recordInfo.data.length;
+ JournalImpl.writeUpdateRecord(fileID,
+ recordInfo.id,
+ recordInfo.userRecordType,
+ new JournalImpl.ByteArrayEncoding(recordInfo.data),
+ size,
+ getWritingChannel());
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalReaderCallback#onReadUpdateRecordTX(long, org.hornetq.core.journal.RecordInfo)
+ */
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
+ {
+ if (lookupRecord(recordInfo.id))
+ {
+ incrementTransactionCounter(transactionID);
+ int size = JournalImpl.SIZE_UPDATE_RECORD_TX + recordInfo.data.length;
+ JournalImpl.writeUpdateRecordTX(fileID,
+ transactionID,
+ recordInfo.id,
+ recordInfo.userRecordType,
+ new JournalImpl.ByteArrayEncoding(recordInfo.data),
+ size,
+ getWritingChannel());
+ }
+ }
+
+ /**
+ * Read files that depend on this file.
+ * Commits and rollbacks are also counted as negatives. We need to fix those also.
+ * @param dependencies
+ */
+ public void fixDependencies(final JournalFile originalFile, final ArrayList<JournalFile> dependencies) throws Exception
+ {
+ for (JournalFile dependency : dependencies)
+ {
+ fixDependency(originalFile, dependency);
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected int incrementTransactionCounter(final long transactionID)
+ {
+ AtomicInteger counter = transactionCounter.get(transactionID);
+ if (counter == null)
+ {
+ counter = new AtomicInteger(0);
+ transactionCounter.put(transactionID, counter);
+ }
+
+ return counter.incrementAndGet();
+ }
+
+ protected int getTransactionCounter(final long transactionID)
+ {
+ AtomicInteger counter = transactionCounter.get(transactionID);
+ if (counter == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return counter.intValue();
+ }
+ }
+
+ // Private -------------------------------------------------------
+ private void fixDependency(final JournalFile originalFile, final JournalFile dependency) throws Exception
+ {
+ JournalReaderCallback txfix = new JournalReaderCallbackAbstract()
+ {
+ public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ {
+ if (transactionCounter.containsKey(transactionID))
+ {
+ dependency.incNegCount(originalFile);
+ }
+ }
+
+ public void onReadRollbackRecord(long transactionID) throws Exception
+ {
+ if (transactionCounter.containsKey(transactionID))
+ {
+ dependency.incNegCount(originalFile);
+ }
+ }
+ };
+
+ JournalImpl.readJournalFile(fileFactory, dependency, txfix);
+ }
+
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -13,7 +13,6 @@
package org.hornetq.core.journal.impl;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -29,9 +28,8 @@
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.Pair;
/**
* A JournalCompactor
@@ -40,31 +38,11 @@
*
*
*/
-public class JournalCompactor implements JournalReaderCallback
+public class JournalCompactor extends AbstractJournalUpdateTask
{
- private static final String FILE_COMPACT_CONTROL = "journal-rename-control.ctr";
-
private static final Logger log = Logger.getLogger(JournalCompactor.class);
-
- private final JournalImpl journal;
-
- private final SequentialFileFactory fileFactory;
-
- private JournalFile currentFile;
-
- private SequentialFile sequentialFile;
-
- private int fileID;
-
- private ChannelBuffer writingChannel;
-
- private int nextOrderingID;
-
- private final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
-
- private final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();;
-
+
// Snapshot of transactions that were pending when the compactor started
private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<Long, PendingTransaction>();
@@ -77,71 +55,10 @@
* we cache those updates. As soon as we are done we take the right account. */
private final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
- /**
- * @param tmpRenameFile
- * @param files
- * @param newFiles
- */
- public static SequentialFile writeControlFile(final SequentialFileFactory fileFactory,
- final List<JournalFile> files,
- final List<JournalFile> newFiles) throws Exception
- {
-
- SequentialFile controlFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
-
- try
- {
- controlFile.open(1);
-
- ChannelBuffer renameBuffer = ChannelBuffers.dynamicBuffer(1);
-
- renameBuffer.writeInt(-1);
- renameBuffer.writeInt(-1);
-
- HornetQBuffer filesToRename = ChannelBuffers.dynamicBuffer(1);
-
- // DataFiles first
-
- filesToRename.writeInt(files.size());
-
- for (JournalFile file : files)
- {
- filesToRename.writeUTF(file.getFile().getFileName());
- }
-
- filesToRename.writeInt(newFiles.size());
-
- for (JournalFile file : newFiles)
- {
- filesToRename.writeUTF(file.getFile().getFileName());
- }
-
- JournalImpl.writeAddRecord(-1,
- 1,
- (byte)0,
- new JournalImpl.ByteArrayEncoding(filesToRename.array()),
- JournalImpl.SIZE_ADD_RECORD + filesToRename.array().length,
- renameBuffer);
-
- ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
-
- writeBuffer.put(renameBuffer.array(), 0, renameBuffer.writerIndex());
-
- writeBuffer.rewind();
-
- controlFile.write(writeBuffer, true);
-
- return controlFile;
- }
- finally
- {
- controlFile.close();
- }
- }
-
public static SequentialFile readControlFile(final SequentialFileFactory fileFactory,
final List<String> dataFiles,
- final List<String> newFiles) throws Exception
+ final List<String> newFiles,
+ final List<Pair<String, String>> renameFile) throws Exception
{
SequentialFile controlFile = fileFactory.createSequentialFile(FILE_COMPACT_CONTROL, 1);
@@ -182,6 +99,14 @@
newFiles.add(input.readUTF());
}
+ int numberRenames = input.readInt();
+ for (int i = 0; i < numberRenames; i++)
+ {
+ String from = input.readUTF();
+ String to = input.readUTF();
+ renameFile.add(new Pair<String, String>(from, to));
+ }
+
}
return controlFile;
@@ -212,10 +137,7 @@
final Set<Long> recordsSnapshot,
final int firstFileID)
{
- this.fileFactory = fileFactory;
- this.journal = journal;
- this.recordsSnapshot.addAll(recordsSnapshot);
- nextOrderingID = firstFileID;
+ super(fileFactory, journal, recordsSnapshot, firstFileID);
}
/** This methods informs the Compactor about the existence of a pending (non committed) transaction */
@@ -248,7 +170,7 @@
{
for (long id : ids)
{
- recordsSnapshot.add(id);
+ addToRecordsSnaptsho(id);
}
}
@@ -256,7 +178,7 @@
{
for (long id : ids2)
{
- recordsSnapshot.add(id);
+ addToRecordsSnaptsho(id);
}
}
}
@@ -284,40 +206,20 @@
pendingCommands.add(new UpdateCompactCommand(id, usedFile, size));
}
- public boolean lookupRecord(final long id)
- {
- return recordsSnapshot.contains(id);
- }
-
private void checkSize(final int size) throws Exception
{
- if (writingChannel == null)
+ if (getWritingChannel() == null)
{
openFile();
}
else
{
- if (writingChannel.writerIndex() + size > writingChannel.capacity())
+ if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
{
openFile();
}
}
}
-
- /** Write pending output into file */
- public void flush() throws Exception
- {
- if (writingChannel != null)
- {
- sequentialFile.position(0);
- sequentialFile.write(writingChannel.toByteBuffer(), true);
- sequentialFile.close();
- newDataFiles.add(currentFile);
- }
-
- writingChannel = null;
- }
-
/**
* Replay pending counts that happened during compacting
*/
@@ -342,7 +244,7 @@
public void onReadAddRecord(final RecordInfo info) throws Exception
{
- if (recordsSnapshot.contains(info.id))
+ if (lookupRecord(info.id))
{
int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
@@ -353,7 +255,7 @@
info.getUserRecordType(),
new JournalImpl.ByteArrayEncoding(info.data),
size,
- writingChannel);
+ getWritingChannel());
newRecords.put(info.id, new JournalRecord(currentFile, size));
}
@@ -377,7 +279,7 @@
info.getUserRecordType(),
new JournalImpl.ByteArrayEncoding(info.data),
size,
- writingChannel);
+ getWritingChannel());
}
else
{
@@ -388,6 +290,7 @@
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
{
+
if (pendingTransactions.get(transactionID) != null)
{
// Sanity check, this should never happen
@@ -421,7 +324,7 @@
info.id,
new JournalImpl.ByteArrayEncoding(info.data),
size,
- writingChannel);
+ getWritingChannel());
newTransaction.addNegative(currentFile, info.id);
}
@@ -447,11 +350,10 @@
JournalImpl.writeTransaction(fileID,
JournalImpl.PREPARE_RECORD,
transactionID,
- newTransaction,
new JournalImpl.ByteArrayEncoding(extraData),
size,
newTransaction.getCounter(currentFile),
- writingChannel);
+ getWritingChannel());
newTransaction.prepare(currentFile);
@@ -470,7 +372,7 @@
public void onReadUpdateRecord(final RecordInfo info) throws Exception
{
- if (recordsSnapshot.contains(info.id))
+ if (lookupRecord(info.id))
{
int size = JournalImpl.SIZE_UPDATE_RECORD + info.data.length;
@@ -492,7 +394,7 @@
info.userRecordType,
new JournalImpl.ByteArrayEncoding(info.data),
size,
- writingChannel);
+ getWritingChannel());
}
}
@@ -513,7 +415,7 @@
info.userRecordType,
new JournalImpl.ByteArrayEncoding(info.data),
size,
- writingChannel);
+ getWritingChannel());
newTransaction.addPositive(currentFile, info.id, size);
}
@@ -538,26 +440,6 @@
return newTransaction;
}
- /**
- * @throws Exception
- */
- private void openFile() throws Exception
- {
- flush();
-
- ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
- writingChannel = ChannelBuffers.wrappedBuffer(bufferWrite);
-
- currentFile = journal.getFile(false, false, false, true);
- sequentialFile = currentFile.getFile();
-
- sequentialFile.open(1);
- fileID = nextOrderingID++;
- currentFile = new JournalFileImpl(sequentialFile, fileID);
-
- writingChannel.writeInt(fileID);
- }
-
private static abstract class CompactCommand
{
abstract void execute() throws Exception;
@@ -599,7 +481,7 @@
private long id;
private JournalFile usedFile;
-
+
private final int size;
public UpdateCompactCommand(final long id, final JournalFile usedFile, final int size)
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -19,8 +19,6 @@
*
* A JournalFile
*
- * TODO combine this with JournalFileImpl
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
@@ -33,6 +31,8 @@
int getNegCount(JournalFile file);
void incNegCount(JournalFile file);
+
+ boolean resetNegCount(JournalFile file);
int getPosCount();
@@ -49,6 +49,10 @@
void setCanReclaim(boolean canDelete);
boolean isCanReclaim();
+
+ void setNeedCleanup(boolean needCleanup);
+
+ boolean isNeedCleanup();
long getOffset();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -44,6 +44,8 @@
private final AtomicInteger liveBytes = new AtomicInteger(0);
private boolean canReclaim;
+
+ private boolean needCleanup;
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
@@ -70,7 +72,18 @@
{
return canReclaim;
}
+
+ public boolean isNeedCleanup()
+ {
+ return needCleanup;
+ }
+ public void setNeedCleanup(boolean needCleanup)
+ {
+ this.needCleanup = needCleanup;
+ }
+
+
public void setCanReclaim(final boolean canReclaim)
{
this.canReclaim = canReclaim;
@@ -94,6 +107,11 @@
return count.intValue();
}
}
+
+ public boolean resetNegCount(JournalFile file)
+ {
+ return negCounts.remove(file) != null;
+ }
public void incPosCount()
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -33,11 +33,11 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.core.buffers.ChannelBuffer;
@@ -81,7 +81,7 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = log.isTraceEnabled();
+ private static final boolean trace = false;
/** This is to be set to true at DEBUG & development only */
private static final boolean LOAD_TRACE = false;
@@ -197,13 +197,16 @@
// This will be set only while the JournalCompactor is being executed
private volatile JournalCompactor compactor;
- // Latch used to wait compactor finish, to make sure we won't stop the journal with the compactor running
private final AtomicBoolean compactorRunning = new AtomicBoolean();
private ExecutorService filesExecutor = null;
+
+ private ExecutorService compactorExecutor = null;
// Lock used during the append of records
- private final Semaphore lockAppend = new Semaphore(1);
+ // This lock doesn't represent a global lock.
+ // After a record is appended, the usedFile can't be changed until the positives and negatives are updated
+ private final ReentrantLock lockAppend = new ReentrantLock();
/** We don't lock the journal while compacting, however we need to lock it while taking and updating snapshots */
private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
@@ -313,7 +316,6 @@
public static void writeTransaction(final int fileID,
final byte recordType,
final long txID,
- final JournalTransaction tx,
final EncodingSupport transactionData,
final int size,
final int numberOfRecords,
@@ -322,7 +324,7 @@
bb.writeByte(recordType);
bb.writeInt(fileID); // skip ID part
bb.writeLong(txID);
- bb.writeInt(numberOfRecords); // skip number of pendingTransactions part
+ bb.writeInt(numberOfRecords);
if (transactionData != null)
{
@@ -364,6 +366,18 @@
}
/**
+ * @param txID
+ * @param bb
+ */
+ public static void writeRollback(final int fileID, final long txID, ChannelBuffer bb)
+ {
+ bb.writeByte(ROLLBACK_RECORD);
+ bb.writeInt(fileID);
+ bb.writeLong(txID);
+ bb.writeInt(SIZE_ROLLBACK_RECORD);
+ }
+
+ /**
* @param id
* @param recordType
* @param record
@@ -410,6 +424,19 @@
}
/**
+ * @param id
+ * @param size
+ * @param bb
+ */
+ public static void writeDeleteRecord(final int fileId, final long id, int size, ChannelBuffer bb)
+ {
+ bb.writeByte(DELETE_RECORD);
+ bb.writeInt(fileId);
+ bb.writeLong(id);
+ bb.writeInt(size);
+ }
+
+ /**
* @param txID
* @param id
* @param record
@@ -836,7 +863,7 @@
callback = getSyncCallback(sync);
- lockAppend.acquire();
+ lockAppend.lock();
try
{
JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -845,7 +872,7 @@
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
finally
@@ -896,7 +923,7 @@
callback = getSyncCallback(sync);
- lockAppend.acquire();
+ lockAppend.lock();
try
{
JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -914,7 +941,7 @@
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
finally
@@ -956,14 +983,11 @@
ChannelBuffer bb = newBuffer(size);
- bb.writeByte(DELETE_RECORD);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(id);
- bb.writeInt(size);
+ writeDeleteRecord(-1, id, size, bb);
callback = getSyncCallback(sync);
- lockAppend.acquire();
+ lockAppend.lock();
try
{
JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
@@ -982,7 +1006,7 @@
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
finally
@@ -1025,7 +1049,7 @@
JournalTransaction tx = getTransactionInfo(txID);
- lockAppend.acquire();
+ lockAppend.lock();
try
{
JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1034,7 +1058,7 @@
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
finally
@@ -1074,7 +1098,7 @@
JournalTransaction tx = getTransactionInfo(txID);
- lockAppend.acquire();
+ lockAppend.lock();
try
{
JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1083,7 +1107,7 @@
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
finally
@@ -1116,7 +1140,7 @@
JournalTransaction tx = getTransactionInfo(txID);
- lockAppend.acquire();
+ lockAppend.lock();
try
{
JournalFile usedFile = appendRecord(bb, false, false, tx, null);
@@ -1125,7 +1149,7 @@
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
finally
@@ -1174,9 +1198,9 @@
int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + DataConstants.SIZE_INT;
ChannelBuffer bb = newBuffer(size);
- writeTransaction(-1, PREPARE_RECORD, txID, tx, transactionData, size, -1, bb);
+ writeTransaction(-1, PREPARE_RECORD, txID, transactionData, size, -1, bb);
- lockAppend.acquire();
+ lockAppend.lock();
try
{
JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
@@ -1185,7 +1209,7 @@
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
@@ -1236,9 +1260,15 @@
ChannelBuffer bb = newBuffer(SIZE_COMPLETE_TRANSACTION_RECORD);
- writeTransaction(-1, COMMIT_RECORD, txID, tx, null, SIZE_COMPLETE_TRANSACTION_RECORD, -1, bb);
+ writeTransaction(-1,
+ COMMIT_RECORD,
+ txID,
+ null,
+ SIZE_COMPLETE_TRANSACTION_RECORD,
+ -1 /* number of records on this transaction will be filled later inside append record */,
+ bb);
- lockAppend.acquire();
+ lockAppend.lock();
try
{
JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
@@ -1247,7 +1277,7 @@
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
@@ -1283,16 +1313,11 @@
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- int size = SIZE_ROLLBACK_RECORD;
+ ChannelBuffer bb = newBuffer(SIZE_ROLLBACK_RECORD);
- ChannelBuffer bb = newBuffer(size);
+ writeRollback(-1, txID, bb);
- bb.writeByte(ROLLBACK_RECORD);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(txID);
- bb.writeInt(size);
-
- lockAppend.acquire();
+ lockAppend.lock();
try
{
JournalFile usedFile = appendRecord(bb, false, sync, tx, null);
@@ -1301,7 +1326,7 @@
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
@@ -1464,7 +1489,7 @@
// This is where most of the work is done, taking most of the time of the compacting routine.
// Notice there are no locks while this is being done.
-
+
// Read the files, and use the JournalCompactor class to create the new outputFiles, and the new collections as
// well
for (final JournalFile file : dataFilesToProcess)
@@ -1479,12 +1504,12 @@
// Usually tests will use this to hold the compacting while other structures are being updated.
onCompactDone();
- SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles());
-
List<JournalFile> newDatafiles = null;
JournalCompactor localCompactor = compactor;
+ SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles(), null);
+
compactingLock.writeLock().lock();
try
{
@@ -2005,18 +2030,21 @@
state = STATE_LOADED;
- checkAndReclaimFiles();
+ checkReclaimStatus();
return maxID;
}
- public void checkAndReclaimFiles() throws Exception
+ /**
+ * @return true if cleanup was called
+ */
+ public boolean checkReclaimStatus() throws Exception
{
- // We can't start compacting while compacting is working
+ // We can't start reclaim while compacting is working
compactingLock.readLock().lock();
try
{
- checkReclaimStatus();
+ reclaimer.scan(getDataFiles());
for (JournalFile file : dataFiles)
{
@@ -2037,13 +2065,145 @@
addFreeFile(file);
}
}
+
+ int nCleanup = 0;
+ for (JournalFile file : dataFiles)
+ {
+ if (file.isNeedCleanup())
+ {
+ nCleanup++;
+ }
+ }
+
+ // TODO: make this configurable
+ if (nCleanup > 5)
+ {
+ for (JournalFile file : dataFiles)
+ {
+ if (file.isNeedCleanup())
+ {
+ final JournalFile cleanupFile = file;
+
+ if (compactorRunning.compareAndSet(false, true))
+ {
+ // The cleanup should happen rarely.
+ // but when it happens it needs to use a different thread,
+ // or opening new files or any other executor's usage will be blocked while the cleanUp is being
+ // processed.
+
+ compactorExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ cleanUp(cleanupFile);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ finally
+ {
+ compactorRunning.set(false);
+ if (autoReclaim)
+ {
+ scheduleReclaim();
+ }
+ }
+ }
+ });
+ }
+ return true;
+ }
+ }
+ }
}
finally
{
compactingLock.readLock().unlock();
}
+
+ return false;
}
+ public synchronized void cleanUp(final JournalFile file) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ return;
+ }
+
+ compactingLock.readLock().lock();
+
+ try
+ {
+ JournalCleaner cleaner = null;
+ ArrayList<JournalFile> dependencies = new ArrayList<JournalFile>();
+ lockAppend.lock();
+
+ try
+ {
+
+ log.info("Cleaning up file " + file);
+
+ if (file.getPosCount() == 0)
+ {
+ // nothing to be done
+ return;
+ }
+
+ // We don't want this file to be reclaimed during the cleanup
+ file.incPosCount();
+
+ // The file will have all the deleted records removed, so all the NegCount towards the file being cleaned up
+ // could be reset
+ for (JournalFile jrnFile : dataFiles)
+ {
+ if (jrnFile.resetNegCount(file))
+ {
+ dependencies.add(jrnFile);
+ jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
+ }
+ }
+
+ cleaner = new JournalCleaner(fileFactory, this, records.keySet(), file.getFileID());
+ }
+ finally
+ {
+ lockAppend.unlock();
+ }
+
+ readJournalFile(fileFactory, file, cleaner);
+
+ cleaner.flush();
+
+ cleaner.fixDependencies(file, dependencies);
+
+ for (JournalFile jrnfile : dependencies)
+ {
+ jrnfile.decPosCount();
+ }
+ file.decPosCount();
+
+ SequentialFile tmpFile = cleaner.currentFile.getFile();
+ String tmpFileName = tmpFile.getFileName();
+ String cleanedFileName = file.getFile().getFileName();
+
+ SequentialFile controlFile = createControlFile(null, null, new Pair<String, String>(tmpFileName,
+ cleanedFileName));
+ file.getFile().delete();
+ tmpFile.renameTo(cleanedFileName);
+ controlFile.delete();
+ }
+ finally
+ {
+ compactingLock.readLock().unlock();
+ log.info("Clean up on file " + file + " done");
+ }
+
+ }
+
public void checkCompact() throws Exception
{
if (compactMinFiles == 0)
@@ -2072,10 +2232,9 @@
return;
}
- // We can't use the executor for the compacting... or we would lock files opening and creation (besides other
- // operations)
- // that would freeze the journal while compacting
- Thread t = new Thread()
+ // We can't use the executor for the compacting... or we would dead lock because of file open and creation
+ // operations (that will use the executor)
+ compactorExecutor.execute(new Runnable()
{
public void run()
{
@@ -2093,9 +2252,7 @@
compactorRunning.set(false);
}
}
- };
-
- t.start();
+ });
}
}
@@ -2114,7 +2271,7 @@
public String debug() throws Exception
{
- checkReclaimStatus();
+ reclaimer.scan(getDataFiles());
StringBuilder builder = new StringBuilder();
@@ -2245,19 +2402,19 @@
compactingLock.readLock().lock();
try
{
- lockAppend.acquire();
+ lockAppend.lock();
try
{
moveNextFile(true);
if (autoReclaim)
{
- checkAndReclaimFiles();
+ checkReclaimStatus();
}
debugWait();
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
finally
@@ -2287,6 +2444,8 @@
}
filesExecutor = Executors.newSingleThreadExecutor();
+
+ compactorExecutor = Executors.newCachedThreadPool();
fileFactory.start();
@@ -2302,7 +2461,7 @@
throw new IllegalStateException("Journal is already stopped");
}
- lockAppend.acquire();
+ lockAppend.lock();
try
{
@@ -2337,7 +2496,7 @@
}
finally
{
- lockAppend.release();
+ lockAppend.unlock();
}
}
@@ -2347,6 +2506,23 @@
// Protected
// -----------------------------------------------------------------------------
+ protected SequentialFile createControlFile(List<JournalFile> files,
+ List<JournalFile> newFiles,
+ Pair<String, String> cleanupRename) throws Exception
+ {
+ ArrayList<Pair<String, String>> cleanupList;
+ if (cleanupRename == null)
+ {
+ cleanupList = null;
+ }
+ else
+ {
+ cleanupList = new ArrayList<Pair<String, String>>();
+ cleanupList.add(cleanupRename);
+ }
+ return AbstractJournalUpdateTask.writeControlFile(fileFactory, files, newFiles, cleanupList);
+ }
+
protected void deleteControlFile(final SequentialFile controlFile) throws Exception
{
controlFile.delete();
@@ -2374,14 +2550,6 @@
{
}
- /**
- * @throws Exception
- */
- protected SequentialFile createControlFile(final List<JournalFile> files, final List<JournalFile> newFiles) throws Exception
- {
- return JournalCompactor.writeControlFile(fileFactory, files, newFiles);
- }
-
// Private
// -----------------------------------------------------------------------------
@@ -2406,11 +2574,6 @@
}
}
- private void checkReclaimStatus() throws Exception
- {
- reclaimer.scan(getDataFiles());
- }
-
// Discard the old JournalFile and set it with a new ID
private JournalFile reinitializeFile(final JournalFile file) throws Exception
{
@@ -2535,7 +2698,7 @@
file.read(bb);
int fileID = bb.getInt();
-
+
fileFactory.releaseBuffer(bb);
bb = null;
@@ -2546,7 +2709,7 @@
}
int fileNameID = getFileNameID(fileName);
-
+
// The compactor could create a fileName but use a previously assigned ID.
// Because of that we need to take both parts into account
if (nextFileID.get() < fileNameID)
@@ -2554,7 +2717,6 @@
nextFileID.set(fileNameID);
}
-
orderedFiles.add(new JournalFileImpl(file, fileID));
file.close();
@@ -2594,7 +2756,7 @@
throw new IllegalArgumentException("Record is too large to store " + size);
}
- // Disable auto flush on the timer. The Timer should'nt flush anything
+ // Disable auto flush on the timer. The Timer should'nt flush anything
currentFile.getFile().disableAutoFlush();
if (!currentFile.getFile().fits(size))
@@ -2669,13 +2831,13 @@
}
}
-
+
/** Get the ID part of the name */
private int getFileNameID(String fileName)
{
try
{
- return Integer.parseInt(fileName.substring(filePrefix.length()+1, fileName.indexOf('.')));
+ return Integer.parseInt(fileName.substring(filePrefix.length() + 1, fileName.indexOf('.')));
}
catch (Throwable e)
{
@@ -2819,34 +2981,7 @@
if (autoReclaim && !synchronous)
{
- filesExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- checkAndReclaimFiles();
- }
- catch (Exception e)
- {
- log.error(e.getMessage(), e);
- }
- }
- });
- filesExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- checkCompact();
- }
- catch (Exception e)
- {
- log.error(e.getMessage(), e);
- }
- }
- });
+ scheduleReclaim();
}
JournalFile nextFile = null;
@@ -2863,6 +2998,32 @@
return nextFile;
}
+ private void scheduleReclaim()
+ {
+ if (state != STATE_LOADED)
+ {
+ return;
+ }
+
+ filesExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ if (!checkReclaimStatus())
+ {
+ checkCompact();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
+ }
+ });
+ }
+
/**
*
* Open a file and place it into the openedFiles queue
@@ -3002,8 +3163,9 @@
{
ArrayList<String> dataFiles = new ArrayList<String>();
ArrayList<String> newFiles = new ArrayList<String>();
+ ArrayList<Pair<String, String>> renames = new ArrayList<Pair<String, String>>();
- SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles);
+ SequentialFile controlFile = JournalCompactor.readControlFile(fileFactory, dataFiles, newFiles, renames);
if (controlFile != null)
{
for (String dataFile : dataFiles)
@@ -3026,6 +3188,19 @@
}
}
+ for (Pair<String, String> rename : renames)
+ {
+ SequentialFile fileTmp = fileFactory.createSequentialFile(rename.a, 1);
+ SequentialFile fileTo = fileFactory.createSequentialFile(rename.b, 1);
+ // We should do the rename only if the tmp file still exist, or else we could
+ // delete a valid file depending on where the crash occured during the control file delete
+ if (fileTmp.exists())
+ {
+ fileTo.delete();
+ fileTmp.renameTo(rename.b);
+ }
+ }
+
controlFile.delete();
}
@@ -3236,7 +3411,7 @@
{
try
{
- lockAppend.acquire();
+ lockAppend.lock();
HornetQBuffer bb = newBuffer(128 * 1024);
@@ -3245,7 +3420,7 @@
appendRecord(bb, false, false, null, null);
}
- lockAppend.release();
+ lockAppend.unlock();
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -281,7 +281,7 @@
}
/**
- * The caller of this method needs to guarantee lock.acquire at the journal. (unless this is being called from load what is a single thread process).
+ * The caller of this method needs to guarantee appendLock.lock at the journal. (unless this is being called from load what is a single thread process).
* */
public void commit(final JournalFile file)
{
@@ -371,7 +371,7 @@
}
/**
- * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+ * The caller of this method needs to guarantee appendLock.lock before calling this method if being used outside of the lock context.
* or else potFilesMap could be affected
* */
public void rollback(final JournalFile file)
@@ -402,7 +402,7 @@
}
/**
- * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+ * The caller of this method needs to guarantee appendLock.lock before calling this method if being used outside of the lock context.
* or else potFilesMap could be affected
* */
public void prepare(final JournalFile file)
Modified: trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -17,8 +17,6 @@
/**
*
- * <p>A ReclaimerTest</p>
- *
* <p>The journal consists of an ordered list of journal files Fn where 0 <= n <= N</p>
*
* <p>A journal file can contain either positives (pos) or negatives (neg)</p>
@@ -33,6 +31,7 @@
* which are also marked for deletion in the same pass of the algorithm.</p>
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public class Reclaimer
@@ -54,6 +53,8 @@
JournalFile currentFile = files[i];
+ currentFile.setNeedCleanup(false);
+
int posCount = currentFile.getPosCount();
int totNeg = 0;
@@ -101,6 +102,7 @@
trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
}
+ file.setNeedCleanup(true);
currentFile.setCanReclaim(false);
break;
Modified: trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/integration/client/CompactingTest.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -25,6 +25,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.Message;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
import org.hornetq.tests.util.ServiceTestBase;
@@ -67,6 +68,101 @@
// Public --------------------------------------------------------
+ public void testCleanupAIO() throws Throwable
+ {
+ for (int i = 0; i < 3; i++)
+ {
+ System.out.println("Test # " + i);
+ internalTestCleanup(JournalType.ASYNCIO);
+ tearDown();
+ setUp();
+ }
+ }
+
+ public void testCleanupNIO() throws Throwable
+ {
+ for (int i = 0; i < 3; i++)
+ {
+ System.out.println("Test # " + i);
+ internalTestCleanup(JournalType.NIO);
+ tearDown();
+ setUp();
+ }
+ }
+
+ private void internalTestCleanup(JournalType journalType) throws Throwable
+ {
+ setupServer(journalType);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ ClientProducer prod = session.createProducer(AD1);
+
+ for (int i = 0; i < 500; i++)
+ {
+ prod.send(session.createClientMessage(true));
+ }
+
+ session.commit();
+
+ prod.close();
+
+ ClientConsumer cons = session.createConsumer(Q2);
+ prod = session.createProducer(AD2);
+
+ session.start();
+
+ for (int i = 0; i < 200; i++)
+ {
+ System.out.println("Iteration " + i);
+ for (int j = 0; j < 1000; j++)
+ {
+ Message msg = session.createClientMessage(true);
+ msg.getBody().writeBytes(new byte[1024]);
+
+ prod.send(msg);
+ }
+
+ session.commit();
+
+ for (int j = 0; j < 1000; j++)
+ {
+ ClientMessage msg = cons.receive(2000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session.commit();
+
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ session.close();
+
+ server.stop();
+
+ server.start();
+
+ session = sf.createSession(false, true, true);
+ cons = session.createConsumer(Q1);
+ session.start();
+
+ for (int i = 0; i < 500; i++)
+ {
+ ClientMessage msg = cons.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+
+ prod = session.createProducer(AD2);
+
+ session.close();
+
+ }
+
public void testMultiProducerAndCompactAIO() throws Throwable
{
internalTestMultiProducer(JournalType.ASYNCIO);
@@ -107,11 +203,11 @@
{
session.close();
}
-
+
server.stop();
-
+
setupServer(journalType);
-
+
final AtomicInteger numberOfMessages = new AtomicInteger(0);
final int NUMBER_OF_FAST_MESSAGES = 100000;
final int SLOW_INTERVAL = 100;
@@ -292,7 +388,7 @@
assertNotNull(msg);
msg.acknowledge();
}
-
+
assertNull(cons.receiveImmediate());
}
@@ -327,7 +423,7 @@
config.setJournalType(journalType);
- config.setJournalCompactMinFiles(3);
+ config.setJournalCompactMinFiles(10);
config.setJournalCompactPercentage(50);
server = createServer(true, config);
@@ -363,16 +459,27 @@
}
sess.close();
-
- sf = createInVMFactory();
}
@Override
protected void tearDown() throws Exception
{
- sf.close();
+ try
+ {
+ if (sf != null)
+ {
+ sf.close();
+ }
- server.stop();
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); // system.out -> junit reports
+ }
server = null;
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -68,17 +68,26 @@
SequentialFile file = fileFactory.createSequentialFile("file-" + i + ".tst.new", 1);
newFiles.add(new JournalFileImpl(file, 0));
}
+
+ ArrayList<Pair<String, String>> renames = new ArrayList<Pair<String, String>>();
+ renames.add(new Pair<String, String>("a", "b"));
+ renames.add(new Pair<String, String>("c", "d"));
+
+
- JournalCompactor.writeControlFile(fileFactory, dataFiles, newFiles);
+ JournalCompactor.writeControlFile(fileFactory, dataFiles, newFiles, renames);
ArrayList<String> strDataFiles = new ArrayList<String>();
ArrayList<String> strNewFiles = new ArrayList<String>();
+
+ ArrayList<Pair<String, String>> renamesRead = new ArrayList<Pair<String, String>>();
- assertNotNull(JournalCompactor.readControlFile(fileFactory, strDataFiles, strNewFiles));
+ assertNotNull(JournalCompactor.readControlFile(fileFactory, strDataFiles, strNewFiles, renamesRead));
assertEquals(dataFiles.size(), strDataFiles.size());
assertEquals(newFiles.size(), strNewFiles.size());
+ assertEquals(renames.size(), renamesRead.size());
Iterator<String> iterDataFiles = strDataFiles.iterator();
for (JournalFile file : dataFiles)
@@ -94,6 +103,16 @@
}
assertFalse(iterNewFiles.hasNext());
+
+ Iterator<Pair<String,String>> iterRename = renames.iterator();
+ for (Pair<String,String> rename : renamesRead)
+ {
+ Pair<String, String> original = iterRename.next();
+ assertEquals(original.a, rename.a);
+ assertEquals(original.b, rename.b);
+ }
+ assertFalse(iterNewFiles.hasNext());
+
}
public void testCrashRenamingFiles() throws Exception
@@ -197,11 +216,11 @@
{
@Override
- protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles) throws Exception
+ protected SequentialFile createControlFile(List<JournalFile> files, List<JournalFile> newFiles, Pair<String, String> pair) throws Exception
{
if (createControlFile)
{
- return super.createControlFile(files, newFiles);
+ return super.createControlFile(files, newFiles, pair);
}
else
{
@@ -517,7 +536,7 @@
journal.forceMoveNextFile();
- journal.checkAndReclaimFiles();
+ journal.checkReclaimStatus();
}
long transactionID = 0;
@@ -666,9 +685,9 @@
long id = idGenerator.generateID();
listToDelete.add(id);
+ // Append Record Transaction will make the recordSize as exactly recordLength (discounting SIZE_ADD_RECORD_TX)
addTx(tx, id);
- // Append Record Transaction will make the recordSize as exactly recordLength (discounting SIZE_ADD_RECORD_TX)
expectedSizes.add(recordLength);
journal.forceMoveNextFile();
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -204,7 +204,7 @@
}
impl.forceMoveNextFile();
- impl.checkAndReclaimFiles();
+ impl.checkReclaimStatus();
impl.stop();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -156,7 +156,7 @@
journalImpl.forceMoveNextFile();
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
setupAndLoadJournal(JOURNAL_SIZE, 10);
@@ -270,7 +270,7 @@
journalImpl.setAutoReclaim(false);
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
journalImpl.debugWait();
@@ -316,7 +316,7 @@
journalImpl.setAutoReclaim(false);
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
journalImpl.debugWait();
@@ -356,7 +356,7 @@
assertEquals(1000, records.get(0).id);
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
log.debug(journalImpl.debug());
@@ -484,7 +484,7 @@
assertEquals(10, records.size());
assertEquals(0, transactions.size());
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
assertEquals(10, journalImpl.getDataFilesCount());
@@ -504,7 +504,7 @@
journalImpl.appendAddRecord(101, (byte)1, new SimpleEncoding(5, (byte)1), false);
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
assertEquals(1, journalImpl.getDataFilesCount());
@@ -595,7 +595,7 @@
assertEquals(0, records.size());
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
assertEquals(0, journalImpl.getDataFilesCount());
@@ -660,7 +660,7 @@
assertEquals(20, records.size());
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
}
@@ -695,7 +695,7 @@
journalImpl.forceMoveNextFile();
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
setupAndLoadJournal(JOURNAL_SIZE, 100, 2);
@@ -756,7 +756,7 @@
assertEquals(0, records.size());
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
assertEquals(0, journalImpl.getDataFilesCount());
@@ -798,7 +798,7 @@
// the
// file
journalImpl.forceMoveNextFile();
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
setupAndLoadJournal(JOURNAL_SIZE, 100);
@@ -836,7 +836,7 @@
journalImpl.appendCommitRecord(2l, false);
journalImpl.forceMoveNextFile();
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
setupAndLoadJournal(JOURNAL_SIZE, 100);
@@ -931,7 +931,7 @@
assertEquals((byte)1, transactions.get(0).extraData[i]);
}
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
assertEquals(10, journalImpl.getDataFilesCount());
@@ -943,7 +943,7 @@
assertEquals(10, records.size());
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
for (int i = 0; i < 10; i++)
{
@@ -980,7 +980,7 @@
// Reclaiming should still be able to reclaim a file if a transaction was
// ignored
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
assertEquals(2, factory.listFiles("tt").size());
@@ -1053,7 +1053,7 @@
journalImpl.forceMoveNextFile();
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
assertEquals(0, journalImpl.getDataFilesCount());
@@ -1147,7 +1147,7 @@
journalImpl.forceMoveNextFile();
journalImpl.debugWait();
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
assertEquals(0, transactions.size());
assertEquals(0, journalImpl.getDataFilesCount());
@@ -1238,7 +1238,7 @@
journalImpl.debugWait();
- journalImpl.checkAndReclaimFiles();
+ journalImpl.checkReclaimStatus();
assertEquals(0, journalImpl.getDataFilesCount());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -104,7 +104,7 @@
protected void checkAndReclaimFiles() throws Exception
{
journal.debugWait();
- journal.checkAndReclaimFiles();
+ journal.checkReclaimStatus();
journal.debugWait();
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -1057,7 +1057,7 @@
// Now restart
- journal.checkAndReclaimFiles();
+ journal.checkReclaimStatus();
System.out.println("journal:" + journal.debug());
@@ -3094,7 +3094,7 @@
System.out.println("*****************************************");
journal.forceMoveNextFile();
- journal.checkAndReclaimFiles();
+ journal.checkReclaimStatus();
assertEquals(0, journal.getDataFilesCount());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -354,6 +354,23 @@
assertCanDelete(1);
assertCantDelete(2);
}
+
+ public void testCleanup() throws Exception
+ {
+ setup(3);
+ setupPosNeg(0, 11, 0, 0, 0);
+ setupPosNeg(1, 1, 10, 0, 0);
+ setupPosNeg(2, 1, 0, 1, 0);
+
+ reclaimer.scan(files);
+
+ debugFiles();
+
+ assertCantDelete(0);
+ assertTrue(files[0].isNeedCleanup());
+ assertCantDelete(1);
+ assertCantDelete(2);
+ }
public void testThreeFiles10() throws Exception
{
@@ -708,6 +725,18 @@
}
}
}
+
+ private void debugFiles()
+ {
+ for (int i = 0 ; i < files.length; i++)
+ {
+ System.out.println("[" + i + "]=" + files[i].getPosCount() + ", canDelete = " + files[i].isCanReclaim() + ", cleanup = " + files[i].isNeedCleanup());
+ for (int j = 0 ; j <= i; j++)
+ {
+ System.out.println("..." + files[i].getNegCount(files[j]));
+ }
+ }
+ }
private void assertCanDelete(final int... fileNumber)
{
@@ -738,7 +767,10 @@
private int posCount;
private boolean canDelete;
+
+ private boolean needCleanup;
+
public void extendOffset(final int delta)
{
}
@@ -913,5 +945,30 @@
{
return 0;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#isNeedCleanup()
+ */
+ public boolean isNeedCleanup()
+ {
+ return this.needCleanup;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#resetNegCount(org.hornetq.core.journal.impl.JournalFile)
+ */
+ public boolean resetNegCount(JournalFile file)
+ {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#setNeedCleanup(boolean)
+ */
+ public void setNeedCleanup(boolean needCleanup)
+ {
+ this.needCleanup = needCleanup;
+
+ }
}
}
Modified: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2009-09-16 16:46:12 UTC (rev 7962)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2009-09-16 21:29:29 UTC (rev 7963)
@@ -83,7 +83,7 @@
System.out.println("user record: " + record);
}
- journal.checkAndReclaimFiles();
+ journal.checkReclaimStatus();
System.out.println("Data = " + journal.debug());
More information about the hornetq-commits
mailing list